Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """NextDns coordinator."""
2 
3 from datetime import timedelta
4 import logging
5 from typing import TypeVar
6 
7 from aiohttp.client_exceptions import ClientConnectorError
8 from nextdns import (
9  AnalyticsDnssec,
10  AnalyticsEncryption,
11  AnalyticsIpVersions,
12  AnalyticsProtocols,
13  AnalyticsStatus,
14  ApiError,
15  ConnectionStatus,
16  InvalidApiKeyError,
17  NextDns,
18  Settings,
19 )
20 from nextdns.model import NextDnsData
21 from tenacity import RetryError
22 
23 from homeassistant.core import HomeAssistant
24 from homeassistant.exceptions import ConfigEntryAuthFailed
25 from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
26 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
27 
28 from .const import DOMAIN
29 
30 _LOGGER = logging.getLogger(__name__)
31 
32 CoordinatorDataT = TypeVar("CoordinatorDataT", bound=NextDnsData)
33 
34 
36  """Class to manage fetching NextDNS data API."""
37 
38  def __init__(
39  self,
40  hass: HomeAssistant,
41  nextdns: NextDns,
42  profile_id: str,
43  update_interval: timedelta,
44  ) -> None:
45  """Initialize."""
46  self.nextdnsnextdns = nextdns
47  self.profile_idprofile_id = profile_id
48  self.profile_nameprofile_name = nextdns.get_profile_name(profile_id)
49  self.device_infodevice_info = DeviceInfo(
50  configuration_url=f"https://my.nextdns.io/{profile_id}/setup",
51  entry_type=DeviceEntryType.SERVICE,
52  identifiers={(DOMAIN, str(profile_id))},
53  manufacturer="NextDNS Inc.",
54  name=self.profile_nameprofile_name,
55  )
56 
57  super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
58 
59  async def _async_update_data(self) -> CoordinatorDataT:
60  """Update data via internal method."""
61  try:
62  return await self._async_update_data_internal_async_update_data_internal()
63  except (
64  ApiError,
65  ClientConnectorError,
66  RetryError,
67  ) as err:
68  raise UpdateFailed(err) from err
69  except InvalidApiKeyError as err:
70  raise ConfigEntryAuthFailed from err
71 
72  async def _async_update_data_internal(self) -> CoordinatorDataT:
73  """Update data via library."""
74  raise NotImplementedError("Update method not implemented")
75 
76 
78  """Class to manage fetching NextDNS analytics status data from API."""
79 
80  async def _async_update_data_internal(self) -> AnalyticsStatus:
81  """Update data via library."""
82  return await self.nextdnsnextdns.get_analytics_status(self.profile_idprofile_id)
83 
84 
86  """Class to manage fetching NextDNS analytics Dnssec data from API."""
87 
88  async def _async_update_data_internal(self) -> AnalyticsDnssec:
89  """Update data via library."""
90  return await self.nextdnsnextdns.get_analytics_dnssec(self.profile_idprofile_id)
91 
92 
94  """Class to manage fetching NextDNS analytics encryption data from API."""
95 
96  async def _async_update_data_internal(self) -> AnalyticsEncryption:
97  """Update data via library."""
98  return await self.nextdnsnextdns.get_analytics_encryption(self.profile_idprofile_id)
99 
100 
102  """Class to manage fetching NextDNS analytics IP versions data from API."""
103 
104  async def _async_update_data_internal(self) -> AnalyticsIpVersions:
105  """Update data via library."""
106  return await self.nextdnsnextdns.get_analytics_ip_versions(self.profile_idprofile_id)
107 
108 
110  """Class to manage fetching NextDNS analytics protocols data from API."""
111 
112  async def _async_update_data_internal(self) -> AnalyticsProtocols:
113  """Update data via library."""
114  return await self.nextdnsnextdns.get_analytics_protocols(self.profile_idprofile_id)
115 
116 
118  """Class to manage fetching NextDNS connection data from API."""
119 
120  async def _async_update_data_internal(self) -> Settings:
121  """Update data via library."""
122  return await self.nextdnsnextdns.get_settings(self.profile_idprofile_id)
123 
124 
126  """Class to manage fetching NextDNS connection data from API."""
127 
128  async def _async_update_data_internal(self) -> ConnectionStatus:
129  """Update data via library."""
130  return await self.nextdnsnextdns.connection_status(self.profile_idprofile_id)
None __init__(self, HomeAssistant hass, NextDns nextdns, str profile_id, timedelta update_interval)
Definition: coordinator.py:44