1 """DataUpdateCoordinator for the Squeezebox integration."""
3 from asyncio
import timeout
4 from collections.abc
import Callable
5 from datetime
import timedelta
10 from pysqueezebox
import Player, Server
18 PLAYER_UPDATE_INTERVAL,
19 SENSOR_UPDATE_INTERVAL,
20 SIGNAL_PLAYER_REDISCOVERED,
22 STATUS_SENSOR_LASTSCAN,
23 STATUS_SENSOR_NEEDSRESTART,
27 _LOGGER = logging.getLogger(__name__)
31 """LMS Status custom coordinator."""
33 def __init__(self, hass: HomeAssistant, lms: Server) ->
None:
34 """Initialize my coordinator."""
39 update_interval=
timedelta(seconds=SENSOR_UPDATE_INTERVAL),
46 """Fetch data from LMS status call.
48 Then we process only a subset to make then nice for HA
50 async
with timeout(STATUS_API_TIMEOUT):
51 data = await self.
lmslms.async_status()
55 _LOGGER.debug(
"Raw serverstatus %s=%s", self.
lmslms.name, data)
60 """Sensors that need the data changing for HA presentation."""
64 data[STATUS_SENSOR_RESCAN] = STATUS_SENSOR_RESCAN
in data
66 data[STATUS_SENSOR_NEEDSRESTART] = STATUS_SENSOR_NEEDSRESTART
in data
70 data[STATUS_SENSOR_LASTSCAN] = (
71 dt_util.utc_from_timestamp(
int(data[STATUS_SENSOR_LASTSCAN]))
72 if STATUS_SENSOR_LASTSCAN
in data
76 _LOGGER.debug(
"Processed serverstatus %s=%s", self.
lmslms.name, data)
81 """Coordinator for Squeezebox players."""
83 def __init__(self, hass: HomeAssistant, player: Player, server_uuid: str) ->
None:
84 """Initialize the coordinator."""
89 update_interval=
timedelta(seconds=PLAYER_UPDATE_INTERVAL),
98 """Update Player if available, or listen for rediscovery if not."""
103 if self.
playerplayer.connected
is False:
104 _LOGGER.debug(
"Player %s is not available", self.
namename)
115 """Make a player available again."""
116 if unique_id == self.
playerplayer.player_id
and connected:
118 _LOGGER.debug(
"Player %s is available again", self.
namename)
dict _async_update_data(self)
dict _prepare_status_data(self, dict data)
None __init__(self, HomeAssistant hass, Server lms)
None __init__(self, HomeAssistant hass, Player player, str server_uuid)
None rediscovered(self, str unique_id, bool connected)
dict[str, Any] _async_update_data(self)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)