1 """Class representing Sonos alarms."""
3 from __future__
import annotations
5 from collections.abc
import Iterator
7 from typing
import TYPE_CHECKING, Any
10 from soco.alarms
import Alarm, Alarms
11 from soco.events_base
import Event
as SonosEvent
15 from .const
import DATA_SONOS, SONOS_ALARMS_UPDATED, SONOS_CREATE_ALARM
16 from .helpers
import soco_error
17 from .household_coordinator
import SonosHouseholdCoordinator
20 from .speaker
import SonosSpeaker
22 _LOGGER = logging.getLogger(__name__)
26 """Coordinator class for Sonos alarms."""
29 """Initialize the data."""
31 self.alarms: Alarms = Alarms()
32 self.created_alarm_ids: set[str] = set()
35 """Return an iterator for the known alarms."""
36 return iter(self.alarms)
38 def get(self, alarm_id: str) -> Alarm |
None:
39 """Get an Alarm instance."""
40 return self.alarms.
get(alarm_id)
43 self, soco: SoCo, update_id: int |
None =
None
45 """Create and update alarms entities, return success."""
46 updated = await self.
hasshass.async_add_executor_job(
52 for alarm_id, alarm
in self.alarms.alarms.items():
53 if alarm_id
in self.created_alarm_ids:
55 speaker = self.
hasshass.data[DATA_SONOS].discovered.get(alarm.zone.uid)
58 self.
hasshass, SONOS_CREATE_ALARM, speaker, [alarm_id]
63 self, event: SonosEvent, speaker: SonosSpeaker
65 """Process the event payload in an async lock and update entities."""
66 event_id = event.variables[
"alarm_list_version"].split(
":")[-1]
67 event_id =
int(event_id)
72 speaker.event_stats.process(event)
76 def update_cache(self, soco: SoCo, update_id: int |
None =
None) -> bool:
77 """Update cache of known alarms and return if cache has changed."""
80 if update_id
and self.alarms.last_id < update_id:
92 "Updating processed event %s from %s (was %s)",
None async_update_entities(self, SoCo soco, int|None update_id=None)
Alarm|None get(self, str alarm_id)
None __init__(self, *Any args)
bool update_cache(self, SoCo soco, int|None update_id=None)
None async_process_event(self, SonosEvent event, SonosSpeaker speaker)
None async_update_entities(self, SoCo soco, int|None update_id=None)
bool update_cache(self, SoCo soco, int|None update_id=None)
IssData update(pyiss.ISS iss)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)