1 """Contains the shared Coordinator for Starlink systems."""
3 from __future__
import annotations
6 from dataclasses
import dataclass
7 from datetime
import timedelta
9 from zoneinfo
import ZoneInfo
11 from starlink_grpc
import (
30 _LOGGER = logging.getLogger(__name__)
35 """Contains data pulled from the Starlink system."""
37 location: LocationDict
38 sleep: tuple[int, int, bool]
40 obstruction: ObstructionDict
45 """Coordinates updates between all Starlink sensors defined in this file."""
47 def __init__(self, hass: HomeAssistant, name: str, url: str) ->
None:
48 """Initialize an UpdateCoordinator for a group of sensors."""
50 self.
timezonetimezone = ZoneInfo(hass.config.time_zone)
59 """Retrieve Starlink data."""
61 status = status_data(channel_context)
62 location = location_data(channel_context)
63 sleep = get_sleep_config(channel_context)
67 async
with asyncio.timeout(4):
70 except GrpcError
as exc:
71 raise UpdateFailed
from exc
75 """Set whether Starlink system tied to this coordinator should be stowed."""
76 async
with asyncio.timeout(4):
78 await self.
hasshass.async_add_executor_job(
81 except GrpcError
as exc:
82 raise HomeAssistantError
from exc
85 """Reboot the Starlink system tied to this coordinator."""
86 async
with asyncio.timeout(4):
89 except GrpcError
as exc:
90 raise HomeAssistantError
from exc
93 """Set whether Starlink system uses the configured sleep schedule."""
94 async
with asyncio.timeout(4):
96 await self.
hasshass.async_add_executor_job(
98 self.
datadata.sleep[0],
99 self.
datadata.sleep[1],
103 except GrpcError
as exc:
104 raise HomeAssistantError
from exc
107 """Set Starlink system sleep schedule start time."""
108 async
with asyncio.timeout(4):
110 await self.
hasshass.async_add_executor_job(
113 self.
datadata.sleep[1],
114 self.
datadata.sleep[2],
117 except GrpcError
as exc:
118 raise HomeAssistantError
from exc
121 """Set Starlink system sleep schedule end time."""
122 duration = end - self.
datadata.sleep[0]
126 async
with asyncio.timeout(4):
128 await self.
hasshass.async_add_executor_job(
130 self.
datadata.sleep[0],
132 self.
datadata.sleep[2],
135 except GrpcError
as exc:
136 raise HomeAssistantError
from exc
None async_set_sleep_duration(self, int end)
StarlinkData _get_starlink_data(self)
None async_stow_starlink(self, bool stow)
StarlinkData _async_update_data(self)
None __init__(self, HomeAssistant hass, str name, str url)
None async_reboot_starlink(self)
None async_set_sleep_start(self, int start)
None async_set_sleep_schedule_enabled(self, bool sleep_schedule)