Home Assistant Unofficial Reference 2024.12.1
data_handler.py
Go to the documentation of this file.
1 """The Netatmo data handler."""
2 
3 from __future__ import annotations
4 
5 from collections import deque
6 from dataclasses import dataclass
7 from datetime import datetime, timedelta
8 from itertools import islice
9 import logging
10 from time import time
11 from typing import Any
12 
13 import aiohttp
14 import pyatmo
15 from pyatmo.modules.device_types import (
16  DeviceCategory as NetatmoDeviceCategory,
17  DeviceType as NetatmoDeviceType,
18 )
19 
20 from homeassistant.components import cloud
21 from homeassistant.config_entries import ConfigEntry
22 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
24  async_dispatcher_connect,
25  async_dispatcher_send,
26 )
27 from homeassistant.helpers.event import async_track_time_interval
28 
29 from .const import (
30  AUTH,
31  DATA_PERSONS,
32  DATA_SCHEDULES,
33  DOMAIN,
34  MANUFACTURER,
35  NETATMO_CREATE_BATTERY,
36  NETATMO_CREATE_CAMERA,
37  NETATMO_CREATE_CAMERA_LIGHT,
38  NETATMO_CREATE_CLIMATE,
39  NETATMO_CREATE_COVER,
40  NETATMO_CREATE_FAN,
41  NETATMO_CREATE_LIGHT,
42  NETATMO_CREATE_ROOM_SENSOR,
43  NETATMO_CREATE_SELECT,
44  NETATMO_CREATE_SENSOR,
45  NETATMO_CREATE_SWITCH,
46  NETATMO_CREATE_WEATHER_SENSOR,
47  PLATFORMS,
48  WEBHOOK_ACTIVATION,
49  WEBHOOK_DEACTIVATION,
50  WEBHOOK_NACAMERA_CONNECTION,
51  WEBHOOK_PUSH_TYPE,
52 )
53 
54 _LOGGER = logging.getLogger(__name__)
55 
56 SIGNAL_NAME = "signal_name"
57 ACCOUNT = "account"
58 HOME = "home"
59 WEATHER = "weather"
60 AIR_CARE = "air_care"
61 PUBLIC = NetatmoDeviceType.public
62 EVENT = "event"
63 
64 PUBLISHERS = {
65  ACCOUNT: "async_update_topology",
66  HOME: "async_update_status",
67  WEATHER: "async_update_weather_stations",
68  AIR_CARE: "async_update_air_care",
69  PUBLIC: "async_update_public_weather",
70  EVENT: "async_update_events",
71 }
72 
73 BATCH_SIZE = 3
74 DEV_FACTOR = 7
75 DEV_LIMIT = 400
76 CLOUD_FACTOR = 2
77 CLOUD_LIMIT = 150
78 DEFAULT_INTERVALS = {
79  ACCOUNT: 10800,
80  HOME: 300,
81  WEATHER: 600,
82  AIR_CARE: 300,
83  PUBLIC: 600,
84  EVENT: 600,
85 }
86 SCAN_INTERVAL = 60
87 
88 
89 @dataclass
91  """Netatmo device class."""
92 
93  data_handler: NetatmoDataHandler
94  device: pyatmo.modules.Module
95  parent_id: str
96  signal_name: str
97 
98 
99 @dataclass
101  """Netatmo home class."""
102 
103  data_handler: NetatmoDataHandler
104  home: pyatmo.Home
105  parent_id: str
106  signal_name: str
107 
108 
109 @dataclass
111  """Netatmo room class."""
112 
113  data_handler: NetatmoDataHandler
114  room: pyatmo.Room
115  parent_id: str
116  signal_name: str
117 
118 
119 @dataclass
121  """Class for keeping track of Netatmo data class metadata."""
122 
123  name: str
124  interval: int
125  next_scan: float
126  subscriptions: set[CALLBACK_TYPE | None]
127  method: str
128  kwargs: dict
129 
130 
132  """Manages the Netatmo data handling."""
133 
134  account: pyatmo.AsyncAccount
135  _interval_factor: int
136 
137  def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
138  """Initialize self."""
139  self.hasshass = hass
140  self.config_entryconfig_entry = config_entry
141  self._auth_auth = hass.data[DOMAIN][config_entry.entry_id][AUTH]
142  self.publisher: dict[str, NetatmoPublisher] = {}
143  self._queue: deque = deque()
144  self._webhook_webhook: bool = False
145  if config_entry.data["auth_implementation"] == cloud.DOMAIN:
146  self._interval_factor_interval_factor = CLOUD_FACTOR
147  self._rate_limit_rate_limit = CLOUD_LIMIT
148  else:
149  self._interval_factor_interval_factor = DEV_FACTOR
150  self._rate_limit_rate_limit = DEV_LIMIT
151  self.poll_startpoll_start = time()
152  self.poll_countpoll_count = 0
153 
154  async def async_setup(self) -> None:
155  """Set up the Netatmo data handler."""
156  self.config_entryconfig_entry.async_on_unload(
158  self.hasshass, self.async_updateasync_update, timedelta(seconds=SCAN_INTERVAL)
159  )
160  )
161 
162  self.config_entryconfig_entry.async_on_unload(
164  self.hasshass,
165  f"signal-{DOMAIN}-webhook-None",
166  self.handle_eventhandle_event,
167  )
168  )
169 
170  self.accountaccount = pyatmo.AsyncAccount(self._auth_auth)
171 
172  await self.subscribesubscribe(ACCOUNT, ACCOUNT, None)
173 
174  await self.hasshass.config_entries.async_forward_entry_setups(
175  self.config_entryconfig_entry, PLATFORMS
176  )
177  await self.async_dispatchasync_dispatch()
178 
179  async def async_update(self, event_time: datetime) -> None:
180  """Update device.
181 
182  We do up to BATCH_SIZE calls in one update in order
183  to minimize the calls on the api service.
184  """
185  for data_class in islice(self._queue, 0, BATCH_SIZE * self._interval_factor_interval_factor):
186  if data_class.next_scan > time():
187  continue
188 
189  if publisher := data_class.name:
190  error = await self.async_fetch_dataasync_fetch_data(publisher)
191 
192  if error:
193  self.publisher[publisher].next_scan = (
194  time() + data_class.interval * 10
195  )
196  else:
197  self.publisher[publisher].next_scan = time() + data_class.interval
198 
199  self._queue.rotate(BATCH_SIZE)
200  cph = self.poll_countpoll_count / (time() - self.poll_startpoll_start) * 3600
201  _LOGGER.debug("Calls per hour: %i", cph)
202  if cph > self._rate_limit_rate_limit:
203  for publisher in self.publisher.values():
204  publisher.next_scan += 60
205  if (time() - self.poll_startpoll_start) > 3600:
206  self.poll_startpoll_start = time()
207  self.poll_countpoll_count = 0
208 
209  @callback
210  def async_force_update(self, signal_name: str) -> None:
211  """Prioritize data retrieval for given data class entry."""
212  self.publisher[signal_name].next_scan = time()
213  self._queue.rotate(-(self._queue.index(self.publisher[signal_name])))
214 
215  async def handle_event(self, event: dict) -> None:
216  """Handle webhook events."""
217  if event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_ACTIVATION:
218  _LOGGER.debug("%s webhook successfully registered", MANUFACTURER)
219  self._webhook_webhook = True
220 
221  elif event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_DEACTIVATION:
222  _LOGGER.debug("%s webhook unregistered", MANUFACTURER)
223  self._webhook_webhook = False
224 
225  elif event["data"][WEBHOOK_PUSH_TYPE] == WEBHOOK_NACAMERA_CONNECTION:
226  _LOGGER.debug("%s camera reconnected", MANUFACTURER)
227  self.async_force_updateasync_force_update(ACCOUNT)
228 
229  async def async_fetch_data(self, signal_name: str) -> bool:
230  """Fetch data and notify."""
231  self.poll_countpoll_count += 1
232  has_error = False
233  try:
234  await getattr(self.accountaccount, self.publisher[signal_name].method)(
235  **self.publisher[signal_name].kwargs
236  )
237 
238  except (pyatmo.NoDevice, pyatmo.ApiError) as err:
239  _LOGGER.debug(err)
240  has_error = True
241 
242  except (TimeoutError, aiohttp.ClientConnectorError) as err:
243  _LOGGER.debug(err)
244  return True
245 
246  for update_callback in self.publisher[signal_name].subscriptions:
247  if update_callback:
248  update_callback()
249 
250  return has_error
251 
252  async def subscribe(
253  self,
254  publisher: str,
255  signal_name: str,
256  update_callback: CALLBACK_TYPE | None,
257  **kwargs: Any,
258  ) -> None:
259  """Subscribe to publisher."""
260  if signal_name in self.publisher:
261  if update_callback not in self.publisher[signal_name].subscriptions:
262  self.publisher[signal_name].subscriptions.add(update_callback)
263  return
264 
265  if publisher == "public":
266  kwargs = {"area_id": self.accountaccount.register_public_weather_area(**kwargs)}
267 
268  interval = int(DEFAULT_INTERVALS[publisher] / self._interval_factor_interval_factor)
269  self.publisher[signal_name] = NetatmoPublisher(
270  name=signal_name,
271  interval=interval,
272  next_scan=time() + interval,
273  subscriptions={update_callback},
274  method=PUBLISHERS[publisher],
275  kwargs=kwargs,
276  )
277 
278  try:
279  await self.async_fetch_dataasync_fetch_data(signal_name)
280  except KeyError:
281  self.publisher.pop(signal_name)
282  raise
283 
284  self._queue.append(self.publisher[signal_name])
285  _LOGGER.debug("Publisher %s added", signal_name)
286 
287  async def unsubscribe(
288  self, signal_name: str, update_callback: CALLBACK_TYPE | None
289  ) -> None:
290  """Unsubscribe from publisher."""
291  if update_callback not in self.publisher[signal_name].subscriptions:
292  return
293 
294  self.publisher[signal_name].subscriptions.remove(update_callback)
295 
296  if not self.publisher[signal_name].subscriptions:
297  self._queue.remove(self.publisher[signal_name])
298  self.publisher.pop(signal_name)
299  _LOGGER.debug("Publisher %s removed", signal_name)
300 
301  @property
302  def webhook(self) -> bool:
303  """Return the webhook state."""
304  return self._webhook_webhook
305 
306  async def async_dispatch(self) -> None:
307  """Dispatch the creation of entities."""
308  await self.subscribesubscribe(WEATHER, WEATHER, None)
309  await self.subscribesubscribe(AIR_CARE, AIR_CARE, None)
310 
311  self.setup_air_caresetup_air_care()
312 
313  for home in self.accountaccount.homes.values():
314  signal_home = f"{HOME}-{home.entity_id}"
315 
316  await self.subscribesubscribe(HOME, signal_home, None, home_id=home.entity_id)
317  await self.subscribesubscribe(EVENT, signal_home, None, home_id=home.entity_id)
318 
319  self.setup_climate_schedule_selectsetup_climate_schedule_select(home, signal_home)
320  self.setup_roomssetup_rooms(home, signal_home)
321  self.setup_modulessetup_modules(home, signal_home)
322 
323  self.hasshass.data[DOMAIN][DATA_PERSONS][home.entity_id] = {
324  person.entity_id: person.pseudo for person in home.persons.values()
325  }
326 
327  await self.unsubscribeunsubscribe(WEATHER, None)
328  await self.unsubscribeunsubscribe(AIR_CARE, None)
329 
330  def setup_air_care(self) -> None:
331  """Set up home coach/air care modules."""
332  for module in self.accountaccount.modules.values():
333  if module.device_category is NetatmoDeviceCategory.air_care:
335  self.hasshass,
336  NETATMO_CREATE_WEATHER_SENSOR,
338  self,
339  module,
340  AIR_CARE,
341  AIR_CARE,
342  ),
343  )
344 
345  def setup_modules(self, home: pyatmo.Home, signal_home: str) -> None:
346  """Set up modules."""
347  netatmo_type_signal_map = {
348  NetatmoDeviceCategory.camera: [
349  NETATMO_CREATE_CAMERA,
350  NETATMO_CREATE_CAMERA_LIGHT,
351  ],
352  NetatmoDeviceCategory.dimmer: [NETATMO_CREATE_LIGHT],
353  NetatmoDeviceCategory.shutter: [NETATMO_CREATE_COVER],
354  NetatmoDeviceCategory.switch: [
355  NETATMO_CREATE_LIGHT,
356  NETATMO_CREATE_SWITCH,
357  NETATMO_CREATE_SENSOR,
358  ],
359  NetatmoDeviceCategory.meter: [NETATMO_CREATE_SENSOR],
360  NetatmoDeviceCategory.fan: [NETATMO_CREATE_FAN],
361  }
362  for module in home.modules.values():
363  if not module.device_category:
364  continue
365 
366  for signal in netatmo_type_signal_map.get(module.device_category, []):
368  self.hasshass,
369  signal,
371  self,
372  module,
373  home.entity_id,
374  signal_home,
375  ),
376  )
377  if module.device_category is NetatmoDeviceCategory.weather:
379  self.hasshass,
380  NETATMO_CREATE_WEATHER_SENSOR,
382  self,
383  module,
384  home.entity_id,
385  WEATHER,
386  ),
387  )
388 
389  def setup_rooms(self, home: pyatmo.Home, signal_home: str) -> None:
390  """Set up rooms."""
391  for room in home.rooms.values():
392  if NetatmoDeviceCategory.climate in room.features:
394  self.hasshass,
395  NETATMO_CREATE_CLIMATE,
396  NetatmoRoom(
397  self,
398  room,
399  home.entity_id,
400  signal_home,
401  ),
402  )
403 
404  for module in room.modules.values():
405  if module.device_category is NetatmoDeviceCategory.climate:
407  self.hasshass,
408  NETATMO_CREATE_BATTERY,
410  self,
411  module,
412  room.entity_id,
413  signal_home,
414  ),
415  )
416 
417  if "humidity" in room.features:
419  self.hasshass,
420  NETATMO_CREATE_ROOM_SENSOR,
421  NetatmoRoom(
422  self,
423  room,
424  room.entity_id,
425  signal_home,
426  ),
427  )
428 
430  self, home: pyatmo.Home, signal_home: str
431  ) -> None:
432  """Set up climate schedule per home."""
433  if NetatmoDeviceCategory.climate in [
434  next(iter(x)) for x in [room.features for room in home.rooms.values()] if x
435  ]:
436  self.hasshass.data[DOMAIN][DATA_SCHEDULES][home.entity_id] = self.accountaccount.homes[
437  home.entity_id
438  ].schedules
439 
441  self.hasshass,
442  NETATMO_CREATE_SELECT,
443  NetatmoHome(
444  self,
445  home,
446  home.entity_id,
447  signal_home,
448  ),
449  )
None setup_rooms(self, pyatmo.Home home, str signal_home)
None subscribe(self, str publisher, str signal_name, CALLBACK_TYPE|None update_callback, **Any kwargs)
None unsubscribe(self, str signal_name, CALLBACK_TYPE|None update_callback)
None setup_modules(self, pyatmo.Home home, str signal_home)
None setup_climate_schedule_select(self, pyatmo.Home home, str signal_home)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry)
bool remove(self, _T matcher)
Definition: match.py:214
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Definition: condition.py:802
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
Definition: dispatcher.py:103
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679