Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for the Schlage integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from dataclasses import dataclass
8 
9 from pyschlage import Lock, Schlage
10 from pyschlage.exceptions import Error as SchlageError, NotAuthorizedError
11 from pyschlage.log import LockLog
12 
13 from homeassistant.config_entries import ConfigEntry
14 from homeassistant.core import HomeAssistant, callback
15 from homeassistant.exceptions import ConfigEntryAuthFailed
17 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
18 
19 from .const import DOMAIN, LOGGER, UPDATE_INTERVAL
20 
21 
22 @dataclass
23 class LockData:
24  """Container for cached lock data from the Schlage API."""
25 
26  lock: Lock
27  logs: list[LockLog]
28 
29 
30 @dataclass
32  """Container for cached data from the Schlage API."""
33 
34  locks: dict[str, LockData]
35 
36 
38  """The Schlage data update coordinator."""
39 
40  config_entry: ConfigEntry
41 
42  def __init__(self, hass: HomeAssistant, username: str, api: Schlage) -> None:
43  """Initialize the class."""
44  super().__init__(
45  hass, LOGGER, name=f"{DOMAIN} ({username})", update_interval=UPDATE_INTERVAL
46  )
47  self.apiapi = api
48  self.new_locks_callbacks: list[Callable[[dict[str, LockData]], None]] = []
49  self.async_add_listenerasync_add_listenerasync_add_listener(self._add_remove_locks_add_remove_locks)
50 
51  async def _async_update_data(self) -> SchlageData:
52  """Fetch the latest data from the Schlage API."""
53  try:
54  locks = await self.hasshass.async_add_executor_job(self.apiapi.locks)
55  except NotAuthorizedError as ex:
56  raise ConfigEntryAuthFailed from ex
57  except SchlageError as ex:
58  raise UpdateFailed("Failed to refresh Schlage data") from ex
59  lock_data = await asyncio.gather(
60  *(
61  self.hasshass.async_add_executor_job(self._get_lock_data_get_lock_data, lock)
62  for lock in locks
63  )
64  )
65  return SchlageData(locks={ld.lock.device_id: ld for ld in lock_data})
66 
67  def _get_lock_data(self, lock: Lock) -> LockData:
68  logs: list[LockLog] = []
69  previous_lock_data = None
70  if self.datadata and (previous_lock_data := self.datadata.locks.get(lock.device_id)):
71  # Default to the previous data, in case a refresh fails.
72  # It's not critical if we don't have the freshest data.
73  logs = previous_lock_data.logs
74  try:
75  logs = lock.logs()
76  except NotAuthorizedError as ex:
77  raise ConfigEntryAuthFailed from ex
78  except SchlageError as ex:
79  LOGGER.debug('Failed to read logs for lock "%s": %s', lock.name, ex)
80 
81  return LockData(lock=lock, logs=logs)
82 
83  @callback
84  def _add_remove_locks(self) -> None:
85  """Add newly discovered locks and remove nonexistent locks."""
86  if self.datadata is None:
87  return
88 
89  device_registry = dr.async_get(self.hasshass)
90  devices = dr.async_entries_for_config_entry(
91  device_registry, self.config_entryconfig_entry.entry_id
92  )
93  previous_locks = set()
94  previous_locks_by_lock_id = {}
95  for device in devices:
96  for domain, identifier in device.identifiers:
97  if domain == DOMAIN:
98  previous_locks.add(identifier)
99  previous_locks_by_lock_id[identifier] = device
100  continue
101  current_locks = set(self.datadata.locks.keys())
102 
103  if removed_locks := previous_locks - current_locks:
104  LOGGER.debug("Removed locks: %s", ", ".join(removed_locks))
105  for lock_id in removed_locks:
106  device_registry.async_update_device(
107  device_id=previous_locks_by_lock_id[lock_id].id,
108  remove_config_entry_id=self.config_entryconfig_entry.entry_id,
109  )
110 
111  if new_lock_ids := current_locks - previous_locks:
112  LOGGER.debug("New locks found: %s", ", ".join(new_lock_ids))
113  new_locks = {lock_id: self.datadata.locks[lock_id] for lock_id in new_lock_ids}
114  for new_lock_callback in self.new_locks_callbacks:
115  new_lock_callback(new_locks)
None __init__(self, HomeAssistant hass, str username, Schlage api)
Definition: coordinator.py:42
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)