1 """Define a Notion data coordinator."""
4 from dataclasses
import dataclass, field
5 from datetime
import timedelta
8 from aionotion.bridge.models
import Bridge
9 from aionotion.client
import Client
10 from aionotion.errors
import InvalidCredentialsError, NotionError
11 from aionotion.listener.models
import Listener
12 from aionotion.sensor.models
import Sensor
13 from aionotion.user.models
import UserPreferences
22 from .const
import DOMAIN, LOGGER
24 DATA_BRIDGES =
"bridges"
25 DATA_LISTENERS =
"listeners"
26 DATA_SENSORS =
"sensors"
27 DATA_USER_PREFERENCES =
"user_preferences"
34 hass: HomeAssistant, entry: ConfigEntry, bridge: Bridge
36 """Register a new bridge."""
37 if name := bridge.name:
38 bridge_name = name.capitalize()
40 bridge_name =
str(bridge.id)
42 device_registry = dr.async_get(hass)
43 device_registry.async_get_or_create(
44 config_entry_id=entry.entry_id,
45 identifiers={(DOMAIN, bridge.hardware_id)},
46 manufacturer=
"Silicon Labs",
47 model=
str(bridge.hardware_revision),
49 sw_version=bridge.firmware_version.wifi,
55 """Define a manager class for Notion data."""
61 bridges: dict[int, Bridge] = field(default_factory=dict)
64 listeners: dict[str, Listener] = field(default_factory=dict)
67 sensors: dict[str, Sensor] = field(default_factory=dict)
70 user_preferences: UserPreferences |
None = field(default=
None)
73 """Update the bridges."""
74 for bridge
in bridges:
76 if bridge.id
not in self.bridges:
78 self.bridges[bridge.id] = bridge
81 """Update the listeners."""
82 self.
listenerslisteners = {listener.id: listener
for listener
in listeners}
85 """Update the sensors."""
86 self.
sensorssensors = {sensor.uuid: sensor
for sensor
in sensors}
89 """Update the user preferences."""
93 """Represent this dataclass (and its Pydantic contents) as a dict."""
94 data: dict[str, Any] = {
95 DATA_BRIDGES: [item.to_dict()
for item
in self.bridges.values()],
96 DATA_LISTENERS: [item.to_dict()
for item
in self.
listenerslisteners.values()],
97 DATA_SENSORS: [item.to_dict()
for item
in self.
sensorssensors.values()],
100 data[DATA_USER_PREFERENCES] = self.
user_preferencesuser_preferences.to_dict()
105 """Define a Notion data coordinator."""
107 config_entry: ConfigEntry
120 name=entry.data[CONF_USERNAME],
121 update_interval=DEFAULT_SCAN_INTERVAL,
128 """Fetch data from Notion."""
132 async
with asyncio.TaskGroup()
as tg:
133 bridges = tg.create_task(self.
_client_client.bridge.async_all())
134 listeners = tg.create_task(self.
_client_client.listener.async_all())
135 sensors = tg.create_task(self.
_client_client.sensor.async_all())
136 user_preferences = tg.create_task(self.
_client_client.user.async_preferences())
137 except BaseExceptionGroup
as err:
138 result = err.exceptions[0]
139 if isinstance(result, InvalidCredentialsError):
141 "Invalid username and/or password"
143 if isinstance(result, NotionError):
145 f
"There was a Notion error while updating: {result}"
147 if isinstance(result, Exception):
149 "There was an unknown error while updating: %s",
154 f
"There was an unknown error while updating: {result}"
156 if isinstance(result, BaseException):
157 raise result
from None
159 data.update_bridges(bridges.result())
160 data.update_listeners(listeners.result())
161 data.update_sensors(sensors.result())
162 data.update_user_preferences(user_preferences.result())
None __init__(self, HomeAssistant hass, *ConfigEntry entry, Client client)
NotionData _async_update_data(self)
None update_listeners(self, list[Listener] listeners)
None update_bridges(self, list[Bridge] bridges)
None update_sensors(self, list[Sensor] sensors)
None update_user_preferences(self, UserPreferences user_preferences)
dict[str, Any] asdict(self)
None _async_register_new_bridge(HomeAssistant hass, ConfigEntry entry, Bridge bridge)