1 """Data used by this integration."""
3 from __future__
import annotations
6 from collections
import defaultdict
7 from typing
import NamedTuple, cast
9 from async_upnp_client.aiohttp
import AiohttpNotifyServer, AiohttpSessionRequester
10 from async_upnp_client.client
import UpnpRequester
11 from async_upnp_client.client_factory
import UpnpFactory
12 from async_upnp_client.event_handler
import UpnpEventHandler
18 from .const
import DOMAIN, LOGGER
22 """Unique identifier for an event listener."""
26 callback_url: str |
None
30 """Storage class for domain global data."""
33 requester: UpnpRequester
34 upnp_factory: UpnpFactory
35 event_notifiers: dict[EventListenAddr, AiohttpNotifyServer]
36 event_notifier_refs: defaultdict[EventListenAddr, int]
37 stop_listener_remove: CALLBACK_TYPE |
None =
None
39 def __init__(self, hass: HomeAssistant) ->
None:
40 """Initialize global data."""
41 self.
locklock = asyncio.Lock()
42 session = aiohttp_client.async_get_clientsession(hass, verify_ssl=
False)
43 self.
requesterrequester = AiohttpSessionRequester(session, with_sleep=
True)
49 """Clean up resources when Home Assistant is stopped."""
50 LOGGER.debug(
"Cleaning resources in DlnaDmrData")
51 async
with self.
locklock:
53 server.async_stop_server()
for server
in self.
event_notifiersevent_notifiers.values()
55 asyncio.gather(*tasks)
60 self, listen_addr: EventListenAddr, hass: HomeAssistant
61 ) -> UpnpEventHandler:
62 """Return existing event notifier for the listen_addr, or create one.
64 Only one event notify server is kept for each listen_addr. Must call
65 async_release_event_notifier when done to cleanup resources.
67 LOGGER.debug(
"Getting event handler for %s", listen_addr)
69 async
with self.
locklock:
84 source = (listen_addr.host
or "0.0.0.0", listen_addr.port)
85 server = AiohttpNotifyServer(
88 callback_url=listen_addr.callback_url,
91 await server.async_start_server()
92 LOGGER.debug(
"Started event handler at %s", server.callback_url)
96 return server.event_handler
99 """Indicate that the event notifier for listen_addr is not used anymore.
101 This is called once by each caller of async_get_event_notifier, and will
102 stop the listening server when all users are done.
104 async
with self.
locklock:
111 await server.async_stop_server()
121 """Obtain this integration's domain data, creating it if needed."""
122 if DOMAIN
in hass.data:
123 return cast(DlnaDmrData, hass.data[DOMAIN])
126 hass.data[DOMAIN] = data
UpnpEventHandler async_get_event_notifier(self, EventListenAddr listen_addr, HomeAssistant hass)
None async_cleanup_event_notifiers(self, Event event)
None __init__(self, HomeAssistant hass)
None async_release_event_notifier(self, EventListenAddr listen_addr)
DlnaDmrData get_domain_data(HomeAssistant hass)