Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Update coordinator for Ruuvi Gateway."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from aioruuvigateway.api import get_gateway_history_data
9 from aioruuvigateway.models import TagData
10 
11 from homeassistant.core import HomeAssistant
12 from homeassistant.helpers.httpx_client import get_async_client
13 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
14 
15 
17  """Polls the gateway for data and returns a list of TagData objects that have changed since the last poll."""
18 
19  def __init__(
20  self,
21  hass: HomeAssistant,
22  logger: logging.Logger,
23  *,
24  name: str,
25  update_interval: timedelta | None = None,
26  host: str,
27  token: str,
28  ) -> None:
29  """Initialize the coordinator using the given configuration (host, token)."""
30  super().__init__(hass, logger, name=name, update_interval=update_interval)
31  self.hosthost = host
32  self.tokentoken = token
33  self.last_tag_datas: dict[str, TagData] = {}
34 
35  async def _async_update_data(self) -> list[TagData]:
36  changed_tag_datas: list[TagData] = []
37  async with get_async_client(self.hasshass) as client:
38  data = await get_gateway_history_data(
39  client,
40  host=self.hosthost,
41  bearer_token=self.tokentoken,
42  )
43  for tag in data.tags:
44  if (
45  tag.mac not in self.last_tag_datas
46  or self.last_tag_datas[tag.mac].data != tag.data
47  ):
48  changed_tag_datas.append(tag)
49  self.last_tag_datas[tag.mac] = tag
50  return changed_tag_datas
None __init__(self, HomeAssistant hass, logging.Logger logger, *str name, timedelta|None update_interval=None, str host, str token)
Definition: coordinator.py:28
httpx.AsyncClient get_async_client(HomeAssistant hass, bool verify_ssl=True)
Definition: httpx_client.py:41