Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for the uptimerobot integration."""
2 
3 from __future__ import annotations
4 
5 from pyuptimerobot import (
6  UptimeRobot,
7  UptimeRobotAuthenticationException,
8  UptimeRobotException,
9  UptimeRobotMonitor,
10 )
11 
12 from homeassistant.config_entries import ConfigEntry
13 from homeassistant.core import HomeAssistant
14 from homeassistant.exceptions import ConfigEntryAuthFailed
15 from homeassistant.helpers import device_registry as dr
16 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
17 
18 from .const import API_ATTR_OK, COORDINATOR_UPDATE_INTERVAL, DOMAIN, LOGGER
19 
20 
22  """Data update coordinator for UptimeRobot."""
23 
24  config_entry: ConfigEntry
25 
26  def __init__(
27  self,
28  hass: HomeAssistant,
29  config_entry_id: str,
30  dev_reg: dr.DeviceRegistry,
31  api: UptimeRobot,
32  ) -> None:
33  """Initialize coordinator."""
34  super().__init__(
35  hass,
36  LOGGER,
37  name=DOMAIN,
38  update_interval=COORDINATOR_UPDATE_INTERVAL,
39  )
40  self._config_entry_id_config_entry_id = config_entry_id
41  self._device_registry_device_registry = dev_reg
42  self.apiapi = api
43 
44  async def _async_update_data(self) -> list[UptimeRobotMonitor]:
45  """Update data."""
46  try:
47  response = await self.apiapi.async_get_monitors()
48  except UptimeRobotAuthenticationException as exception:
49  raise ConfigEntryAuthFailed(exception) from exception
50  except UptimeRobotException as exception:
51  raise UpdateFailed(exception) from exception
52 
53  if response.status != API_ATTR_OK:
54  raise UpdateFailed(response.error.message)
55 
56  monitors: list[UptimeRobotMonitor] = response.data
57 
58  current_monitors = {
59  list(device.identifiers)[0][1]
60  for device in dr.async_entries_for_config_entry(
61  self._device_registry_device_registry, self._config_entry_id_config_entry_id
62  )
63  }
64  new_monitors = {str(monitor.id) for monitor in monitors}
65  if stale_monitors := current_monitors - new_monitors:
66  for monitor_id in stale_monitors:
67  if device := self._device_registry_device_registry.async_get_device(
68  identifiers={(DOMAIN, monitor_id)}
69  ):
70  self._device_registry_device_registry.async_remove_device(device.id)
71 
72  # If there are new monitors, we should reload the config entry so we can
73  # create new devices and entities.
74  if self.datadata and new_monitors - {str(monitor.id) for monitor in self.datadata}:
75  self.hasshass.async_create_task(
76  self.hasshass.config_entries.async_reload(self._config_entry_id_config_entry_id)
77  )
78 
79  return monitors
None __init__(self, HomeAssistant hass, str config_entry_id, dr.DeviceRegistry dev_reg, UptimeRobot api)
Definition: coordinator.py:32