1 """UPnP/IGD coordinator."""
3 from collections
import defaultdict
4 from collections.abc
import Callable
5 from datetime
import datetime, timedelta
7 from async_upnp_client.exceptions
import UpnpCommunicationError
13 from .const
import LOGGER
14 from .device
import Device
18 DataUpdateCoordinator[dict[str, str | datetime | int | float |
None]]
20 """Define an object to update data from UPNP device."""
26 device_entry: DeviceEntry,
27 update_interval: timedelta,
32 self._features_by_entity_id: defaultdict[str, set[str]] = defaultdict(set)
38 update_interval=update_interval,
42 """Register an entity."""
43 self._features_by_entity_id[key].
add(entity_id)
45 def unregister_entity() -> None:
46 """Unregister entity."""
47 self._features_by_entity_id[key].
remove(entity_id)
49 if not self._features_by_entity_id[key]:
50 del self._features_by_entity_id[key]
52 return unregister_entity
56 """Return a list of entity description keys for which data is required."""
57 if not self._features_by_entity_id:
61 return list(self._features_by_entity_id)
65 ) -> dict[str, str | datetime | int | float | None]:
69 except UpnpCommunicationError
as exception:
71 "Caught exception when updating device: %s, exception: %s",
76 f
"Unable to communicate with IGD at: {self.device.device_url}"
None __init__(self, HomeAssistant hass, Device device, DeviceEntry device_entry, timedelta update_interval)
dict[str, str|datetime|int|float|None] _async_update_data(self)
Callable[[], None] register_entity(self, str key, str entity_id)
list[str]|None _entity_description_keys(self)
bool add(self, _T matcher)
bool remove(self, _T matcher)
RadioThermUpdate async_get_data(HomeAssistant hass, CommonThermostat device)