Home Assistant Unofficial Reference 2024.12.1
data.py
Go to the documentation of this file.
1 """Data used by this integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import defaultdict
7 from typing import NamedTuple, cast
8 
9 from async_upnp_client.aiohttp import AiohttpNotifyServer, AiohttpSessionRequester
10 from async_upnp_client.client import UpnpRequester
11 from async_upnp_client.client_factory import UpnpFactory
12 from async_upnp_client.event_handler import UpnpEventHandler
13 
14 from homeassistant.const import EVENT_HOMEASSISTANT_STOP
15 from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant
16 from homeassistant.helpers import aiohttp_client
17 
18 from .const import DOMAIN, LOGGER
19 
20 
21 class EventListenAddr(NamedTuple):
22  """Unique identifier for an event listener."""
23 
24  host: str | None # Specific local IP(v6) address for listening on
25  port: int # Listening port, 0 means use an ephemeral port
26  callback_url: str | None
27 
28 
30  """Storage class for domain global data."""
31 
32  lock: asyncio.Lock
33  requester: UpnpRequester
34  upnp_factory: UpnpFactory
35  event_notifiers: dict[EventListenAddr, AiohttpNotifyServer]
36  event_notifier_refs: defaultdict[EventListenAddr, int]
37  stop_listener_remove: CALLBACK_TYPE | None = None
38 
39  def __init__(self, hass: HomeAssistant) -> None:
40  """Initialize global data."""
41  self.locklock = asyncio.Lock()
42  session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
43  self.requesterrequester = AiohttpSessionRequester(session, with_sleep=True)
44  self.upnp_factoryupnp_factory = UpnpFactory(self.requesterrequester, non_strict=True)
45  self.event_notifiersevent_notifiers = {}
46  self.event_notifier_refsevent_notifier_refs = defaultdict(int)
47 
48  async def async_cleanup_event_notifiers(self, event: Event) -> None:
49  """Clean up resources when Home Assistant is stopped."""
50  LOGGER.debug("Cleaning resources in DlnaDmrData")
51  async with self.locklock:
52  tasks = (
53  server.async_stop_server() for server in self.event_notifiersevent_notifiers.values()
54  )
55  asyncio.gather(*tasks)
56  self.event_notifiersevent_notifiers = {}
57  self.event_notifier_refsevent_notifier_refs = defaultdict(int)
58 
60  self, listen_addr: EventListenAddr, hass: HomeAssistant
61  ) -> UpnpEventHandler:
62  """Return existing event notifier for the listen_addr, or create one.
63 
64  Only one event notify server is kept for each listen_addr. Must call
65  async_release_event_notifier when done to cleanup resources.
66  """
67  LOGGER.debug("Getting event handler for %s", listen_addr)
68 
69  async with self.locklock:
70  # Stop all servers when HA shuts down, to release resources on devices
71  if not self.stop_listener_removestop_listener_remove:
72  self.stop_listener_removestop_listener_remove = hass.bus.async_listen_once(
73  EVENT_HOMEASSISTANT_STOP, self.async_cleanup_event_notifiersasync_cleanup_event_notifiers
74  )
75 
76  # Always increment the reference counter, for existing or new event handlers
77  self.event_notifier_refsevent_notifier_refs[listen_addr] += 1
78 
79  # Return an existing event handler if we can
80  if listen_addr in self.event_notifiersevent_notifiers:
81  return self.event_notifiersevent_notifiers[listen_addr].event_handler
82 
83  # Start event handler
84  source = (listen_addr.host or "0.0.0.0", listen_addr.port)
85  server = AiohttpNotifyServer(
86  requester=self.requesterrequester,
87  source=source,
88  callback_url=listen_addr.callback_url,
89  loop=hass.loop,
90  )
91  await server.async_start_server()
92  LOGGER.debug("Started event handler at %s", server.callback_url)
93 
94  self.event_notifiersevent_notifiers[listen_addr] = server
95 
96  return server.event_handler
97 
98  async def async_release_event_notifier(self, listen_addr: EventListenAddr) -> None:
99  """Indicate that the event notifier for listen_addr is not used anymore.
100 
101  This is called once by each caller of async_get_event_notifier, and will
102  stop the listening server when all users are done.
103  """
104  async with self.locklock:
105  assert self.event_notifier_refsevent_notifier_refs[listen_addr] > 0
106  self.event_notifier_refsevent_notifier_refs[listen_addr] -= 1
107 
108  # Shutdown the server when it has no more users
109  if self.event_notifier_refsevent_notifier_refs[listen_addr] == 0:
110  server = self.event_notifiersevent_notifiers.pop(listen_addr)
111  await server.async_stop_server()
112 
113  # Remove the cleanup listener when there's nothing left to cleanup
114  if not self.event_notifiersevent_notifiers:
115  assert self.stop_listener_removestop_listener_remove is not None
116  self.stop_listener_removestop_listener_remove()
117  self.stop_listener_removestop_listener_remove = None
118 
119 
120 def get_domain_data(hass: HomeAssistant) -> DlnaDmrData:
121  """Obtain this integration's domain data, creating it if needed."""
122  if DOMAIN in hass.data:
123  return cast(DlnaDmrData, hass.data[DOMAIN])
124 
125  data = DlnaDmrData(hass)
126  hass.data[DOMAIN] = data
127  return data
UpnpEventHandler async_get_event_notifier(self, EventListenAddr listen_addr, HomeAssistant hass)
Definition: data.py:61
None async_cleanup_event_notifiers(self, Event event)
Definition: data.py:48
None __init__(self, HomeAssistant hass)
Definition: data.py:39
None async_release_event_notifier(self, EventListenAddr listen_addr)
Definition: data.py:98
DlnaDmrData get_domain_data(HomeAssistant hass)
Definition: data.py:120