1 """Class representing a Sonos household storage helper."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
15 from .const
import DATA_SONOS
16 from .exception
import SonosUpdateError
18 _LOGGER = logging.getLogger(__name__)
22 """Base class for Sonos household-level storage."""
24 cache_update_lock: asyncio.Lock
26 def __init__(self, hass: HomeAssistant, household_id: str) ->
None:
27 """Initialize the data."""
30 self.
async_pollasync_poll: Callable[[], Coroutine[
None,
None,
None]] |
None =
None
31 self.last_processed_event_id: int |
None =
None
33 def setup(self, soco: SoCo) ->
None:
34 """Set up the SonosAlarm instance."""
40 """Finish setup in async context."""
42 self.
async_pollasync_poll = Debouncer[Coroutine[Any, Any,
None]](
52 """Return the class type of this instance."""
53 return type(self).__name__
56 """Poll any known speaker."""
57 discovered = self.
hasshass.data[DATA_SONOS].discovered
59 for uid, speaker
in discovered.items():
60 _LOGGER.debug(
"Polling %s using %s", self.
class_typeclass_type, speaker.soco)
63 except SonosUpdateError
as err:
65 "Could not refresh %s: %s",
71 discovered.move_to_end(uid, last=
False)
75 self, soco: SoCo, update_id: int |
None =
None
77 """Update the cache and update entities."""
78 raise NotImplementedError
80 def update_cache(self, soco: SoCo, update_id: int |
None =
None) -> bool:
81 """Update the cache of the household-level feature and return if cache has changed."""
82 raise NotImplementedError
None async_update_entities(self, SoCo soco, int|None update_id=None)
None setup(self, SoCo soco)
None __init__(self, HomeAssistant hass, str household_id)
bool update_cache(self, SoCo soco, int|None update_id=None)