Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Elexa Guardian integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Coroutine
7 from dataclasses import dataclass
8 from typing import Any
9 
10 from aioguardian import Client
11 from aioguardian.errors import GuardianError
12 import voluptuous as vol
13 
14 from homeassistant.config_entries import ConfigEntry, ConfigEntryState
15 from homeassistant.const import (
16  ATTR_DEVICE_ID,
17  CONF_DEVICE_ID,
18  CONF_FILENAME,
19  CONF_IP_ADDRESS,
20  CONF_PORT,
21  CONF_URL,
22  Platform,
23 )
24 from homeassistant.core import HomeAssistant, ServiceCall, callback
25 from homeassistant.exceptions import HomeAssistantError
26 from homeassistant.helpers import config_validation as cv, device_registry as dr
27 from homeassistant.helpers.dispatcher import async_dispatcher_send
28 
29 from .const import (
30  API_SENSOR_PAIR_DUMP,
31  API_SENSOR_PAIRED_SENSOR_STATUS,
32  API_SYSTEM_DIAGNOSTICS,
33  API_SYSTEM_ONBOARD_SENSOR_STATUS,
34  API_VALVE_STATUS,
35  API_WIFI_STATUS,
36  CONF_UID,
37  DOMAIN,
38  LOGGER,
39  SIGNAL_PAIRED_SENSOR_COORDINATOR_ADDED,
40 )
41 from .coordinator import GuardianDataUpdateCoordinator
42 
43 DATA_PAIRED_SENSOR_MANAGER = "paired_sensor_manager"
44 
45 SERVICE_NAME_PAIR_SENSOR = "pair_sensor"
46 SERVICE_NAME_UNPAIR_SENSOR = "unpair_sensor"
47 SERVICE_NAME_UPGRADE_FIRMWARE = "upgrade_firmware"
48 
49 SERVICES = (
50  SERVICE_NAME_PAIR_SENSOR,
51  SERVICE_NAME_UNPAIR_SENSOR,
52  SERVICE_NAME_UPGRADE_FIRMWARE,
53 )
54 
55 SERVICE_BASE_SCHEMA = vol.Schema(
56  {
57  vol.Required(ATTR_DEVICE_ID): cv.string,
58  }
59 )
60 
61 SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA = vol.Schema(
62  {
63  vol.Required(ATTR_DEVICE_ID): cv.string,
64  vol.Required(CONF_UID): cv.string,
65  }
66 )
67 
68 SERVICE_UPGRADE_FIRMWARE_SCHEMA = vol.Schema(
69  {
70  vol.Required(ATTR_DEVICE_ID): cv.string,
71  vol.Optional(CONF_URL): cv.url,
72  vol.Optional(CONF_PORT): cv.port,
73  vol.Optional(CONF_FILENAME): cv.string,
74  },
75 )
76 
77 PLATFORMS = [
78  Platform.BINARY_SENSOR,
79  Platform.BUTTON,
80  Platform.SENSOR,
81  Platform.SWITCH,
82  Platform.VALVE,
83 ]
84 
85 
86 @dataclass
88  """Define an object to be stored in `hass.data`."""
89 
90  entry: ConfigEntry
91  client: Client
92  valve_controller_coordinators: dict[str, GuardianDataUpdateCoordinator]
93  paired_sensor_manager: PairedSensorManager
94 
95 
96 @callback
97 def async_get_entry_id_for_service_call(hass: HomeAssistant, call: ServiceCall) -> str:
98  """Get the entry ID related to a service call (by device ID)."""
99  device_id = call.data[CONF_DEVICE_ID]
100  device_registry = dr.async_get(hass)
101 
102  if (device_entry := device_registry.async_get(device_id)) is None:
103  raise ValueError(f"Invalid Guardian device ID: {device_id}")
104 
105  for entry_id in device_entry.config_entries:
106  if (entry := hass.config_entries.async_get_entry(entry_id)) is None:
107  continue
108  if entry.domain == DOMAIN:
109  return entry_id
110 
111  raise ValueError(f"No config entry for device ID: {device_id}")
112 
113 
114 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
115  """Set up Elexa Guardian from a config entry."""
116  client = Client(entry.data[CONF_IP_ADDRESS], port=entry.data[CONF_PORT])
117 
118  # The valve controller's UDP-based API can't handle concurrent requests very well,
119  # so we use a lock to ensure that only one API request is reaching it at a time:
120  api_lock = asyncio.Lock()
121 
122  async def async_init_coordinator(
123  coordinator: GuardianDataUpdateCoordinator,
124  ) -> None:
125  """Initialize a GuardianDataUpdateCoordinator."""
126  await coordinator.async_initialize()
127  await coordinator.async_config_entry_first_refresh()
128 
129  # Set up GuardianDataUpdateCoordinators for the valve controller:
130  valve_controller_coordinators: dict[str, GuardianDataUpdateCoordinator] = {}
131  init_valve_controller_tasks = []
132  for api, api_coro in (
133  (API_SENSOR_PAIR_DUMP, client.sensor.pair_dump),
134  (API_SYSTEM_DIAGNOSTICS, client.system.diagnostics),
135  (API_SYSTEM_ONBOARD_SENSOR_STATUS, client.system.onboard_sensor_status),
136  (API_VALVE_STATUS, client.valve.status),
137  (API_WIFI_STATUS, client.wifi.status),
138  ):
139  coordinator = valve_controller_coordinators[api] = (
141  hass,
142  entry=entry,
143  client=client,
144  api_name=api,
145  api_coro=api_coro,
146  api_lock=api_lock,
147  valve_controller_uid=entry.data[CONF_UID],
148  )
149  )
150  init_valve_controller_tasks.append(async_init_coordinator(coordinator))
151 
152  await asyncio.gather(*init_valve_controller_tasks)
153 
154  # Set up an object to evaluate each batch of paired sensor UIDs and add/remove
155  # devices as appropriate:
156  paired_sensor_manager = PairedSensorManager(
157  hass,
158  entry,
159  client,
160  api_lock,
161  valve_controller_coordinators[API_SENSOR_PAIR_DUMP],
162  )
163  await paired_sensor_manager.async_initialize()
164 
165  hass.data.setdefault(DOMAIN, {})
166  hass.data[DOMAIN][entry.entry_id] = GuardianData(
167  entry=entry,
168  client=client,
169  valve_controller_coordinators=valve_controller_coordinators,
170  paired_sensor_manager=paired_sensor_manager,
171  )
172 
173  # Set up all of the Guardian entity platforms:
174  await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
175 
176  @callback
177  def call_with_data(
178  func: Callable[[ServiceCall, GuardianData], Coroutine[Any, Any, None]],
179  ) -> Callable[[ServiceCall], Coroutine[Any, Any, None]]:
180  """Hydrate a service call with the appropriate GuardianData object."""
181 
182  async def wrapper(call: ServiceCall) -> None:
183  """Wrap the service function."""
184  entry_id = async_get_entry_id_for_service_call(hass, call)
185  data = hass.data[DOMAIN][entry_id]
186 
187  try:
188  async with data.client:
189  await func(call, data)
190  except GuardianError as err:
191  raise HomeAssistantError(
192  f"Error while executing {func.__name__}: {err}"
193  ) from err
194 
195  return wrapper
196 
197  @call_with_data
198  async def async_pair_sensor(call: ServiceCall, data: GuardianData) -> None:
199  """Add a new paired sensor."""
200  uid = call.data[CONF_UID]
201  await data.client.sensor.pair_sensor(uid)
202  await data.paired_sensor_manager.async_pair_sensor(uid)
203 
204  @call_with_data
205  async def async_unpair_sensor(call: ServiceCall, data: GuardianData) -> None:
206  """Remove a paired sensor."""
207  uid = call.data[CONF_UID]
208  await data.client.sensor.unpair_sensor(uid)
209  await data.paired_sensor_manager.async_unpair_sensor(uid)
210 
211  @call_with_data
212  async def async_upgrade_firmware(call: ServiceCall, data: GuardianData) -> None:
213  """Upgrade the device firmware."""
214  await data.client.system.upgrade_firmware(
215  url=call.data[CONF_URL],
216  port=call.data[CONF_PORT],
217  filename=call.data[CONF_FILENAME],
218  )
219 
220  for service_name, schema, method in (
221  (
222  SERVICE_NAME_PAIR_SENSOR,
223  SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA,
224  async_pair_sensor,
225  ),
226  (
227  SERVICE_NAME_UNPAIR_SENSOR,
228  SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA,
229  async_unpair_sensor,
230  ),
231  (
232  SERVICE_NAME_UPGRADE_FIRMWARE,
233  SERVICE_UPGRADE_FIRMWARE_SCHEMA,
234  async_upgrade_firmware,
235  ),
236  ):
237  if hass.services.has_service(DOMAIN, service_name):
238  continue
239  hass.services.async_register(DOMAIN, service_name, method, schema=schema)
240 
241  return True
242 
243 
244 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
245  """Unload a config entry."""
246  unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
247  if unload_ok:
248  hass.data[DOMAIN].pop(entry.entry_id)
249 
250  loaded_entries = [
251  entry
252  for entry in hass.config_entries.async_entries(DOMAIN)
253  if entry.state == ConfigEntryState.LOADED
254  ]
255  if len(loaded_entries) == 1:
256  # If this is the last loaded instance of Guardian, deregister any services
257  # defined during integration setup:
258  for service_name in SERVICES:
259  hass.services.async_remove(DOMAIN, service_name)
260 
261  return unload_ok
262 
263 
265  """Define an object that manages the addition/removal of paired sensors."""
266 
267  def __init__(
268  self,
269  hass: HomeAssistant,
270  entry: ConfigEntry,
271  client: Client,
272  api_lock: asyncio.Lock,
273  sensor_pair_dump_coordinator: GuardianDataUpdateCoordinator,
274  ) -> None:
275  """Initialize."""
276  self._api_lock_api_lock = api_lock
277  self._client_client = client
278  self._entry_entry = entry
279  self._hass_hass = hass
280  self._paired_uids_paired_uids: set[str] = set()
281  self._sensor_pair_dump_coordinator_sensor_pair_dump_coordinator = sensor_pair_dump_coordinator
282  self.coordinators: dict[str, GuardianDataUpdateCoordinator] = {}
283 
284  async def async_initialize(self) -> None:
285  """Initialize the manager."""
286 
287  @callback
288  def async_create_process_task() -> None:
289  """Define a callback for when new paired sensor data is received."""
290  self._hass_hass.async_create_task(self.async_process_latest_paired_sensor_uidsasync_process_latest_paired_sensor_uids())
291 
292  self._entry_entry.async_on_unload(
293  self._sensor_pair_dump_coordinator_sensor_pair_dump_coordinator.async_add_listener(
294  async_create_process_task
295  )
296  )
297 
298  async def async_pair_sensor(self, uid: str) -> None:
299  """Add a new paired sensor coordinator."""
300  LOGGER.debug("Adding paired sensor: %s", uid)
301 
302  self._paired_uids_paired_uids.add(uid)
303 
304  coordinator = self.coordinators[uid] = GuardianDataUpdateCoordinator(
305  self._hass_hass,
306  entry=self._entry_entry,
307  client=self._client_client,
308  api_name=f"{API_SENSOR_PAIRED_SENSOR_STATUS}_{uid}",
309  api_coro=lambda: self._client_client.sensor.paired_sensor_status(uid),
310  api_lock=self._api_lock_api_lock,
311  valve_controller_uid=self._entry_entry.data[CONF_UID],
312  )
313  await coordinator.async_request_refresh()
314 
316  self._hass_hass,
317  SIGNAL_PAIRED_SENSOR_COORDINATOR_ADDED.format(self._entry_entry.data[CONF_UID]),
318  uid,
319  )
320 
322  """Process a list of new UIDs."""
323  try:
324  uids = set(self._sensor_pair_dump_coordinator_sensor_pair_dump_coordinator.data["paired_uids"])
325  except KeyError:
326  # Sometimes the paired_uids key can fail to exist; the user can't do anything
327  # about it, so in this case, we quietly abort and return:
328  return
329 
330  if uids == self._paired_uids_paired_uids:
331  return
332 
333  old = self._paired_uids_paired_uids
334  new = self._paired_uids_paired_uids = set(uids)
335 
336  tasks = [self.async_pair_sensorasync_pair_sensor(uid) for uid in new.difference(old)]
337  tasks += [self.async_unpair_sensorasync_unpair_sensor(uid) for uid in old.difference(new)]
338 
339  if tasks:
340  await asyncio.gather(*tasks)
341 
342  async def async_unpair_sensor(self, uid: str) -> None:
343  """Remove a paired sensor coordinator."""
344  LOGGER.debug("Removing paired sensor: %s", uid)
345 
346  # Clear out objects related to this paired sensor:
347  self._paired_uids_paired_uids.remove(uid)
348  self.coordinators.pop(uid)
349 
350  # Remove the paired sensor device from the device registry (which will
351  # clean up entities and the entity registry):
352  dev_reg = dr.async_get(self._hass_hass)
353  device = dev_reg.async_get_or_create(
354  config_entry_id=self._entry_entry.entry_id, identifiers={(DOMAIN, uid)}
355  )
356  dev_reg.async_remove_device(device.id)
None __init__(self, HomeAssistant hass, ConfigEntry entry, Client client, asyncio.Lock api_lock, GuardianDataUpdateCoordinator sensor_pair_dump_coordinator)
Definition: __init__.py:274
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:114
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:244
str async_get_entry_id_for_service_call(HomeAssistant hass, ServiceCall call)
Definition: __init__.py:97
None async_add_listener(HomeAssistant hass, Callable[[], None] listener)
Definition: __init__.py:82
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193