Home Assistant Unofficial Reference 2024.12.1
event_source.py
Go to the documentation of this file.
1 """Axis network device abstraction."""
2 
3 from __future__ import annotations
4 
5 import axis
6 from axis.errors import Unauthorized
7 from axis.interfaces.mqtt import mqtt_json_to_event
8 from axis.models.mqtt import ClientState
9 from axis.stream_manager import Signal, State
10 
11 from homeassistant.components import mqtt
12 from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN, ReceiveMessage
13 from homeassistant.config_entries import ConfigEntry
14 from homeassistant.core import HomeAssistant, callback
15 from homeassistant.helpers.dispatcher import async_dispatcher_send
16 from homeassistant.setup import async_when_setup
17 
18 
20  """Manage connection to event sources from an Axis device."""
21 
22  def __init__(
23  self, hass: HomeAssistant, config_entry: ConfigEntry, api: axis.AxisDevice
24  ) -> None:
25  """Initialize the device."""
26  self.hasshass = hass
27  self.config_entryconfig_entry = config_entry
28  self.apiapi = api
29 
30  self.signal_reachablesignal_reachable = f"axis_reachable_{config_entry.entry_id}"
31 
32  self.availableavailable = True
33 
34  @callback
35  def setup(self) -> None:
36  """Set up the device events."""
37  self.apiapi.stream.connection_status_callback.append(self._connection_status_cb_connection_status_cb)
38  self.apiapi.enable_events()
39  self.apiapi.stream.start()
40 
41  if self.apiapi.vapix.mqtt.supported:
42  async_when_setup(self.hasshass, MQTT_DOMAIN, self._async_use_mqtt_async_use_mqtt)
43 
44  @callback
45  def teardown(self) -> None:
46  """Tear down connections."""
47  self._disconnect_from_stream_disconnect_from_stream()
48 
49  @callback
50  def _disconnect_from_stream(self) -> None:
51  """Stop stream."""
52  if self.apiapi.stream.state != State.STOPPED:
53  self.apiapi.stream.connection_status_callback.clear()
54  self.apiapi.stream.stop()
55 
56  async def _async_use_mqtt(self, hass: HomeAssistant, component: str) -> None:
57  """Set up to use MQTT."""
58  try:
59  status = await self.apiapi.vapix.mqtt.get_client_status()
60  except Unauthorized:
61  # This means the user has too low privileges
62  return
63 
64  if status.status.state == ClientState.ACTIVE:
65  self.config_entryconfig_entry.async_on_unload(
66  await mqtt.async_subscribe(
67  hass, f"{status.config.device_topic_prefix}/#", self._mqtt_message_mqtt_message
68  )
69  )
70 
71  @callback
72  def _mqtt_message(self, message: ReceiveMessage) -> None:
73  """Receive Axis MQTT message."""
74  self._disconnect_from_stream_disconnect_from_stream()
75 
76  if message.topic.endswith("event/connection"):
77  return
78 
79  event = mqtt_json_to_event(message.payload)
80  self.apiapi.event.handler(event)
81 
82  @callback
83  def _connection_status_cb(self, status: Signal) -> None:
84  """Handle signals of device connection status.
85 
86  This is called on every RTSP keep-alive message.
87  Only signal state change if state change is true.
88  """
89 
90  if self.availableavailable != (status == Signal.PLAYING):
91  self.availableavailable = not self.availableavailable
92  async_dispatcher_send(self.hasshass, self.signal_reachablesignal_reachable)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, axis.AxisDevice api)
Definition: event_source.py:24
None _async_use_mqtt(self, HomeAssistant hass, str component)
Definition: event_source.py:56
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
None async_when_setup(core.HomeAssistant hass, str component, Callable[[core.HomeAssistant, str], Awaitable[None]] when_setup_cb)
Definition: setup.py:587