1 """Update coordinator for Ruuvi Gateway."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from aioruuvigateway.api
import get_gateway_history_data
9 from aioruuvigateway.models
import TagData
17 """Polls the gateway for data and returns a list of TagData objects that have changed since the last poll."""
22 logger: logging.Logger,
25 update_interval: timedelta |
None =
None,
29 """Initialize the coordinator using the given configuration (host, token)."""
30 super().
__init__(hass, logger, name=name, update_interval=update_interval)
33 self.last_tag_datas: dict[str, TagData] = {}
36 changed_tag_datas: list[TagData] = []
38 data = await get_gateway_history_data(
41 bearer_token=self.
tokentoken,
45 tag.mac
not in self.last_tag_datas
46 or self.last_tag_datas[tag.mac].data != tag.data
48 changed_tag_datas.append(tag)
49 self.last_tag_datas[tag.mac] = tag
50 return changed_tag_datas
list[TagData] _async_update_data(self)
None __init__(self, HomeAssistant hass, logging.Logger logger, *str name, timedelta|None update_interval=None, str host, str token)
httpx.AsyncClient get_async_client(HomeAssistant hass, bool verify_ssl=True)