Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Support for Comelit."""
2 
3 from abc import abstractmethod
4 from datetime import timedelta
5 from typing import Any
6 
7 from aiocomelit import (
8  ComeliteSerialBridgeApi,
9  ComelitSerialBridgeObject,
10  ComelitVedoApi,
11  ComelitVedoAreaObject,
12  ComelitVedoZoneObject,
13  exceptions,
14 )
15 from aiocomelit.api import ComelitCommonApi
16 from aiocomelit.const import BRIDGE, VEDO
17 
18 from homeassistant.config_entries import ConfigEntry
19 from homeassistant.core import HomeAssistant
20 from homeassistant.exceptions import ConfigEntryAuthFailed
21 from homeassistant.helpers import device_registry as dr
22 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
23 
24 from .const import _LOGGER, DOMAIN
25 
26 
28  """Base coordinator for Comelit Devices."""
29 
30  _hw_version: str
31  config_entry: ConfigEntry
32  api: ComelitCommonApi
33 
34  def __init__(self, hass: HomeAssistant, device: str, host: str) -> None:
35  """Initialize the scanner."""
36 
37  self._device_device = device
38  self._host_host = host
39 
40  super().__init__(
41  hass=hass,
42  logger=_LOGGER,
43  name=f"{DOMAIN}-{host}-coordinator",
44  update_interval=timedelta(seconds=5),
45  )
46  device_registry = dr.async_get(self.hasshass)
47  device_registry.async_get_or_create(
48  config_entry_id=self.config_entryconfig_entry.entry_id,
49  identifiers={(DOMAIN, self.config_entryconfig_entry.entry_id)},
50  model=device,
51  name=f"{device} ({self._host})",
52  manufacturer="Comelit",
53  hw_version=self._hw_version,
54  )
55 
57  self,
58  object_class: ComelitVedoZoneObject
59  | ComelitVedoAreaObject
60  | ComelitSerialBridgeObject,
61  object_type: str,
62  ) -> dr.DeviceInfo:
63  """Set platform device info."""
64 
65  return dr.DeviceInfo(
66  identifiers={
67  (
68  DOMAIN,
69  f"{self.config_entry.entry_id}-{object_type}-{object_class.index}",
70  )
71  },
72  via_device=(DOMAIN, self.config_entryconfig_entry.entry_id),
73  name=object_class.name,
74  model=f"{self._device} {object_type}",
75  manufacturer="Comelit",
76  hw_version=self._hw_version,
77  )
78 
79  async def _async_update_data(self) -> dict[str, Any]:
80  """Update device data."""
81  _LOGGER.debug("Polling Comelit %s host: %s", self._device_device, self._host_host)
82  try:
83  await self.api.login()
84  return await self._async_update_system_data_async_update_system_data()
85  except (exceptions.CannotConnect, exceptions.CannotRetrieveData) as err:
86  raise UpdateFailed(repr(err)) from err
87  except exceptions.CannotAuthenticate as err:
88  raise ConfigEntryAuthFailed from err
89 
90  @abstractmethod
91  async def _async_update_system_data(self) -> dict[str, Any]:
92  """Class method for updating data."""
93 
94 
95 class ComelitSerialBridge(ComelitBaseCoordinator):
96  """Queries Comelit Serial Bridge."""
97 
98  _hw_version = "20003101"
99  api: ComeliteSerialBridgeApi
100 
101  def __init__(self, hass: HomeAssistant, host: str, port: int, pin: int) -> None:
102  """Initialize the scanner."""
103  self.apiapi = ComeliteSerialBridgeApi(host, port, pin)
104  super().__init__(hass, BRIDGE, host)
105 
106  async def _async_update_system_data(self) -> dict[str, Any]:
107  """Specific method for updating data."""
108  return await self.apiapi.get_all_devices()
109 
110 
112  """Queries Comelit VEDO system."""
113 
114  _hw_version = "VEDO IP"
115  api: ComelitVedoApi
116 
117  def __init__(self, hass: HomeAssistant, host: str, port: int, pin: int) -> None:
118  """Initialize the scanner."""
119  self.apiapi = ComelitVedoApi(host, port, pin)
120  super().__init__(hass, VEDO, host)
121 
122  async def _async_update_system_data(self) -> dict[str, Any]:
123  """Specific method for updating data."""
124  return await self.apiapi.get_all_areas_and_zones()
dr.DeviceInfo platform_device_info(self, ComelitVedoZoneObject|ComelitVedoAreaObject|ComelitSerialBridgeObject object_class, str object_type)
Definition: coordinator.py:62
None __init__(self, HomeAssistant hass, str device, str host)
Definition: coordinator.py:34
None __init__(self, HomeAssistant hass, str host, int port, int pin)
Definition: coordinator.py:101
None __init__(self, HomeAssistant hass, str host, int port, int pin)
Definition: coordinator.py:117