Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for Acaia integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from aioacaia.acaiascale import AcaiaScale
9 from aioacaia.exceptions import AcaiaDeviceNotFound, AcaiaError
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import CONF_ADDRESS
13 from homeassistant.core import HomeAssistant
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
15 
16 from .const import CONF_IS_NEW_STYLE_SCALE
17 
18 SCAN_INTERVAL = timedelta(seconds=15)
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 type AcaiaConfigEntry = ConfigEntry[AcaiaCoordinator]
23 
24 
26  """Class to handle fetching data from the scale."""
27 
28  config_entry: AcaiaConfigEntry
29 
30  def __init__(self, hass: HomeAssistant, entry: AcaiaConfigEntry) -> None:
31  """Initialize coordinator."""
32  super().__init__(
33  hass,
34  _LOGGER,
35  name="acaia coordinator",
36  update_interval=SCAN_INTERVAL,
37  config_entry=entry,
38  )
39 
40  self._scale_scale = AcaiaScale(
41  address_or_ble_device=entry.data[CONF_ADDRESS],
42  name=entry.title,
43  is_new_style_scale=entry.data[CONF_IS_NEW_STYLE_SCALE],
44  notify_callback=self.async_update_listenersasync_update_listeners,
45  )
46 
47  @property
48  def scale(self) -> AcaiaScale:
49  """Return the scale object."""
50  return self._scale_scale
51 
52  async def _async_update_data(self) -> None:
53  """Fetch data."""
54 
55  # scale is already connected, return
56  if self._scale_scale.connected:
57  return
58 
59  # scale is not connected, try to connect
60  try:
61  await self._scale_scale.connect(setup_tasks=False)
62  except (AcaiaDeviceNotFound, AcaiaError, TimeoutError) as ex:
63  _LOGGER.debug(
64  "Could not connect to scale: %s, Error: %s",
65  self.config_entryconfig_entry.data[CONF_ADDRESS],
66  ex,
67  )
68  self._scale_scale.device_disconnected_handler(notify=False)
69  return
70 
71  # connected, set up background tasks
72  if not self._scale_scale.heartbeat_task or self._scale_scale.heartbeat_task.done():
73  self._scale_scale.heartbeat_task = self.config_entryconfig_entry.async_create_background_task(
74  hass=self.hasshass,
75  target=self._scale_scale.send_heartbeats(),
76  name="acaia_heartbeat_task",
77  )
78 
79  if not self._scale_scale.process_queue_task or self._scale_scale.process_queue_task.done():
80  self._scale_scale.process_queue_task = (
81  self.config_entryconfig_entry.async_create_background_task(
82  hass=self.hasshass,
83  target=self._scale_scale.process_queue(),
84  name="acaia_process_queue_task",
85  )
86  )
None __init__(self, HomeAssistant hass, AcaiaConfigEntry entry)
Definition: coordinator.py:30