1 """The Elexa Guardian integration."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Coroutine
7 from dataclasses
import dataclass
10 from aioguardian
import Client
11 from aioguardian.errors
import GuardianError
12 import voluptuous
as vol
31 API_SENSOR_PAIRED_SENSOR_STATUS,
32 API_SYSTEM_DIAGNOSTICS,
33 API_SYSTEM_ONBOARD_SENSOR_STATUS,
39 SIGNAL_PAIRED_SENSOR_COORDINATOR_ADDED,
41 from .coordinator
import GuardianDataUpdateCoordinator
43 DATA_PAIRED_SENSOR_MANAGER =
"paired_sensor_manager"
45 SERVICE_NAME_PAIR_SENSOR =
"pair_sensor"
46 SERVICE_NAME_UNPAIR_SENSOR =
"unpair_sensor"
47 SERVICE_NAME_UPGRADE_FIRMWARE =
"upgrade_firmware"
50 SERVICE_NAME_PAIR_SENSOR,
51 SERVICE_NAME_UNPAIR_SENSOR,
52 SERVICE_NAME_UPGRADE_FIRMWARE,
55 SERVICE_BASE_SCHEMA = vol.Schema(
57 vol.Required(ATTR_DEVICE_ID): cv.string,
61 SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA = vol.Schema(
63 vol.Required(ATTR_DEVICE_ID): cv.string,
64 vol.Required(CONF_UID): cv.string,
68 SERVICE_UPGRADE_FIRMWARE_SCHEMA = vol.Schema(
70 vol.Required(ATTR_DEVICE_ID): cv.string,
71 vol.Optional(CONF_URL): cv.url,
72 vol.Optional(CONF_PORT): cv.port,
73 vol.Optional(CONF_FILENAME): cv.string,
78 Platform.BINARY_SENSOR,
88 """Define an object to be stored in `hass.data`."""
92 valve_controller_coordinators: dict[str, GuardianDataUpdateCoordinator]
93 paired_sensor_manager: PairedSensorManager
98 """Get the entry ID related to a service call (by device ID)."""
99 device_id = call.data[CONF_DEVICE_ID]
100 device_registry = dr.async_get(hass)
102 if (device_entry := device_registry.async_get(device_id))
is None:
103 raise ValueError(f
"Invalid Guardian device ID: {device_id}")
105 for entry_id
in device_entry.config_entries:
106 if (entry := hass.config_entries.async_get_entry(entry_id))
is None:
108 if entry.domain == DOMAIN:
111 raise ValueError(f
"No config entry for device ID: {device_id}")
115 """Set up Elexa Guardian from a config entry."""
116 client = Client(entry.data[CONF_IP_ADDRESS], port=entry.data[CONF_PORT])
120 api_lock = asyncio.Lock()
122 async
def async_init_coordinator(
123 coordinator: GuardianDataUpdateCoordinator,
125 """Initialize a GuardianDataUpdateCoordinator."""
126 await coordinator.async_initialize()
127 await coordinator.async_config_entry_first_refresh()
130 valve_controller_coordinators: dict[str, GuardianDataUpdateCoordinator] = {}
131 init_valve_controller_tasks = []
132 for api, api_coro
in (
133 (API_SENSOR_PAIR_DUMP, client.sensor.pair_dump),
134 (API_SYSTEM_DIAGNOSTICS, client.system.diagnostics),
135 (API_SYSTEM_ONBOARD_SENSOR_STATUS, client.system.onboard_sensor_status),
136 (API_VALVE_STATUS, client.valve.status),
137 (API_WIFI_STATUS, client.wifi.status),
139 coordinator = valve_controller_coordinators[api] = (
147 valve_controller_uid=entry.data[CONF_UID],
150 init_valve_controller_tasks.append(async_init_coordinator(coordinator))
152 await asyncio.gather(*init_valve_controller_tasks)
161 valve_controller_coordinators[API_SENSOR_PAIR_DUMP],
163 await paired_sensor_manager.async_initialize()
165 hass.data.setdefault(DOMAIN, {})
169 valve_controller_coordinators=valve_controller_coordinators,
170 paired_sensor_manager=paired_sensor_manager,
174 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
178 func: Callable[[ServiceCall, GuardianData], Coroutine[Any, Any,
None]],
179 ) -> Callable[[ServiceCall], Coroutine[Any, Any,
None]]:
180 """Hydrate a service call with the appropriate GuardianData object."""
182 async
def wrapper(call: ServiceCall) ->
None:
183 """Wrap the service function."""
185 data = hass.data[DOMAIN][entry_id]
188 async
with data.client:
189 await func(call, data)
190 except GuardianError
as err:
192 f
"Error while executing {func.__name__}: {err}"
198 async
def async_pair_sensor(call: ServiceCall, data: GuardianData) ->
None:
199 """Add a new paired sensor."""
200 uid = call.data[CONF_UID]
201 await data.client.sensor.pair_sensor(uid)
202 await data.paired_sensor_manager.async_pair_sensor(uid)
205 async
def async_unpair_sensor(call: ServiceCall, data: GuardianData) ->
None:
206 """Remove a paired sensor."""
207 uid = call.data[CONF_UID]
208 await data.client.sensor.unpair_sensor(uid)
209 await data.paired_sensor_manager.async_unpair_sensor(uid)
212 async
def async_upgrade_firmware(call: ServiceCall, data: GuardianData) ->
None:
213 """Upgrade the device firmware."""
214 await data.client.system.upgrade_firmware(
215 url=call.data[CONF_URL],
216 port=call.data[CONF_PORT],
217 filename=call.data[CONF_FILENAME],
220 for service_name, schema, method
in (
222 SERVICE_NAME_PAIR_SENSOR,
223 SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA,
227 SERVICE_NAME_UNPAIR_SENSOR,
228 SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA,
232 SERVICE_NAME_UPGRADE_FIRMWARE,
233 SERVICE_UPGRADE_FIRMWARE_SCHEMA,
234 async_upgrade_firmware,
237 if hass.services.has_service(DOMAIN, service_name):
239 hass.services.async_register(DOMAIN, service_name, method, schema=schema)
245 """Unload a config entry."""
246 unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
248 hass.data[DOMAIN].pop(entry.entry_id)
252 for entry
in hass.config_entries.async_entries(DOMAIN)
253 if entry.state == ConfigEntryState.LOADED
255 if len(loaded_entries) == 1:
258 for service_name
in SERVICES:
259 hass.services.async_remove(DOMAIN, service_name)
265 """Define an object that manages the addition/removal of paired sensors."""
272 api_lock: asyncio.Lock,
273 sensor_pair_dump_coordinator: GuardianDataUpdateCoordinator,
282 self.coordinators: dict[str, GuardianDataUpdateCoordinator] = {}
285 """Initialize the manager."""
288 def async_create_process_task() -> None:
289 """Define a callback for when new paired sensor data is received."""
292 self.
_entry_entry.async_on_unload(
294 async_create_process_task
299 """Add a new paired sensor coordinator."""
300 LOGGER.debug(
"Adding paired sensor: %s", uid)
308 api_name=f
"{API_SENSOR_PAIRED_SENSOR_STATUS}_{uid}",
309 api_coro=
lambda: self.
_client_client.sensor.paired_sensor_status(uid),
311 valve_controller_uid=self.
_entry_entry.data[CONF_UID],
313 await coordinator.async_request_refresh()
317 SIGNAL_PAIRED_SENSOR_COORDINATOR_ADDED.format(self.
_entry_entry.data[CONF_UID]),
322 """Process a list of new UIDs."""
336 tasks = [self.
async_pair_sensorasync_pair_sensor(uid)
for uid
in new.difference(old)]
340 await asyncio.gather(*tasks)
343 """Remove a paired sensor coordinator."""
344 LOGGER.debug(
"Removing paired sensor: %s", uid)
348 self.coordinators.pop(uid)
352 dev_reg = dr.async_get(self.
_hass_hass)
353 device = dev_reg.async_get_or_create(
354 config_entry_id=self.
_entry_entry.entry_id, identifiers={(DOMAIN, uid)}
356 dev_reg.async_remove_device(device.id)
None async_pair_sensor(self, str uid)
None async_process_latest_paired_sensor_uids(self)
None async_initialize(self)
None __init__(self, HomeAssistant hass, ConfigEntry entry, Client client, asyncio.Lock api_lock, GuardianDataUpdateCoordinator sensor_pair_dump_coordinator)
_sensor_pair_dump_coordinator
None async_unpair_sensor(self, str uid)
bool add(self, _T matcher)
bool remove(self, _T matcher)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
str async_get_entry_id_for_service_call(HomeAssistant hass, ServiceCall call)
None async_add_listener(HomeAssistant hass, Callable[[], None] listener)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)