Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The IntelliFire integration."""
2 
3 from __future__ import annotations
4 
5 from typing import Any
6 
7 import pyflume
8 from pyflume import FlumeAuth, FlumeData, FlumeDeviceList
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import (
14  _LOGGER,
15  DEVICE_CONNECTION_SCAN_INTERVAL,
16  DEVICE_SCAN_INTERVAL,
17  DOMAIN,
18  NOTIFICATION_SCAN_INTERVAL,
19 )
20 
21 
23  """Data update coordinator for an individual flume device."""
24 
25  def __init__(self, hass: HomeAssistant, flume_device: FlumeData) -> None:
26  """Initialize the Coordinator."""
27  super().__init__(
28  hass,
29  name=DOMAIN,
30  logger=_LOGGER,
31  update_interval=DEVICE_SCAN_INTERVAL,
32  )
33 
34  self.flume_deviceflume_device = flume_device
35 
36  async def _async_update_data(self) -> None:
37  """Get the latest data from the Flume."""
38  try:
39  await self.hasshass.async_add_executor_job(self.flume_deviceflume_device.update_force)
40  except Exception as ex:
41  raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
42  _LOGGER.debug(
43  "Flume Device Data Update values=%s query_payload=%s",
44  self.flume_deviceflume_device.values,
45  self.flume_deviceflume_device.query_payload,
46  )
47 
48 
50  """Date update coordinator to read connected status from Devices endpoint."""
51 
52  def __init__(self, hass: HomeAssistant, flume_devices: FlumeDeviceList) -> None:
53  """Initialize the Coordinator."""
54  super().__init__(
55  hass,
56  name=DOMAIN,
57  logger=_LOGGER,
58  update_interval=DEVICE_CONNECTION_SCAN_INTERVAL,
59  )
60 
61  self.flume_devicesflume_devices = flume_devices
62  self.connectedconnected: dict[str, bool] = {}
63 
64  def _update_connectivity(self) -> None:
65  """Update device connectivity.."""
66  self.connectedconnected = {
67  device["id"]: device["connected"]
68  for device in self.flume_devicesflume_devices.get_devices()
69  }
70  _LOGGER.debug("Connectivity %s", self.connectedconnected)
71 
72  async def _async_update_data(self) -> None:
73  """Update the device list."""
74  try:
75  await self.hasshass.async_add_executor_job(self._update_connectivity_update_connectivity)
76  except Exception as ex:
77  raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
78 
79 
81  """Data update coordinator for flume notifications."""
82 
83  def __init__(self, hass: HomeAssistant, auth: FlumeAuth) -> None:
84  """Initialize the Coordinator."""
85  super().__init__(
86  hass,
87  name=DOMAIN,
88  logger=_LOGGER,
89  update_interval=NOTIFICATION_SCAN_INTERVAL,
90  )
91  self.authauth = auth
92  self.active_notifications_by_deviceactive_notifications_by_device: dict[str, set[str]] = {}
93  self.notificationsnotifications: list[dict[str, Any]] = []
94 
95  def _update_lists(self) -> None:
96  """Query flume for notification list."""
97  # Get notifications (read or unread).
98  # The related binary sensors (leak detected, high flow, low battery)
99  # will be active until the notification is deleted in the Flume app.
100  self.notificationsnotifications = pyflume.FlumeNotificationList(
101  self.authauth, read=None
102  ).notification_list
103  _LOGGER.debug("Notifications %s", self.notificationsnotifications)
104 
105  active_notifications_by_device: dict[str, set[str]] = {}
106 
107  for notification in self.notificationsnotifications:
108  if (
109  not notification.get("device_id")
110  or not notification.get("extra")
111  or "event_rule_name" not in notification["extra"]
112  ):
113  continue
114  device_id = notification["device_id"]
115  rule = notification["extra"]["event_rule_name"]
116  active_notifications_by_device.setdefault(device_id, set()).add(rule)
117 
118  self.active_notifications_by_deviceactive_notifications_by_device = active_notifications_by_device
119 
120  async def _async_update_data(self) -> None:
121  """Update data."""
122  _LOGGER.debug("Updating Flume Notification")
123  try:
124  await self.hasshass.async_add_executor_job(self._update_lists_update_lists)
125  except Exception as ex:
126  raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
None __init__(self, HomeAssistant hass, FlumeDeviceList flume_devices)
Definition: coordinator.py:52
None __init__(self, HomeAssistant hass, FlumeData flume_device)
Definition: coordinator.py:25
bool add(self, _T matcher)
Definition: match.py:185