1 """The Things Network's integration DataUpdateCoordinator."""
3 from datetime
import timedelta
6 from ttn_client
import TTNAuthError, TTNClient
14 from .const
import CONF_APP_ID, POLLING_PERIOD_S
16 _LOGGER = logging.getLogger(__name__)
20 """TTN coordinator."""
22 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) ->
None:
23 """Initialize my coordinator."""
28 name=f
"TheThingsNetwork_{entry.data[CONF_APP_ID]}",
31 seconds=POLLING_PERIOD_S,
36 entry.data[CONF_HOST],
37 entry.data[CONF_APP_ID],
38 entry.data[CONF_API_KEY],
43 """Fetch data from API endpoint.
45 This is the place to pre-process the data to lookup tables
46 so entities can quickly look up their data.
52 except TTNAuthError
as err:
55 _LOGGER.error(
"TTNAuthError")
56 raise ConfigEntryAuthFailed
from err
59 _LOGGER.debug(
"fetched data: %s", measurements)
63 _LOGGER.debug(
"pushed data: %s", data)
TTNClient.DATA_TYPE _async_update_data(self)
None _push_callback(self, TTNClient.DATA_TYPE data)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
None async_set_updated_data(self, _DataT data)
MetOfficeData fetch_data(datapoint.Manager connection, Site site, str mode)