1 """Update coordinator for Knocki integration."""
3 from knocki
import Event, KnockiClient, KnockiConnectionError, Trigger
10 from .const
import DOMAIN, LOGGER
14 """The Knocki coordinator."""
16 def __init__(self, hass: HomeAssistant, client: KnockiClient) ->
None:
17 """Initialize the coordinator."""
28 triggers = await self.
clientclient.get_triggers()
29 except KnockiConnectionError
as exc:
30 raise UpdateFailed
from exc
32 (trigger.device_id, trigger.details.trigger_id)
for trigger
in triggers
34 removed_triggers = self.
_known_triggers_known_triggers - current_triggers
35 for trigger
in removed_triggers:
38 return {trigger.details.trigger_id: trigger
for trigger
in triggers}
41 """Add a trigger to the coordinator."""
43 {**self.
datadata, event.payload.details.trigger_id: event.payload}
46 (event.payload.device_id, event.payload.details.trigger_id)
51 """Delete a device from the coordinator."""
52 device_id, trigger_id = trigger
53 entity_registry = er.async_get(self.
hasshass)
54 entity_entry = entity_registry.async_get_entity_id(
55 EVENT_DOMAIN, DOMAIN, f
"{device_id}_{trigger_id}"
58 entity_registry.async_remove(entity_entry)
None __init__(self, HomeAssistant hass, KnockiClient client)
None add_trigger(self, Event event)
dict[int, Trigger] _async_update_data(self)
None _async_delete_device(self, tuple[str, int] trigger)
None async_set_updated_data(self, _DataT data)
bool add(self, _T matcher)