1 """Provides the Toon DataUpdateCoordinator."""
3 from __future__
import annotations
8 from aiohttp
import web
9 from toonapi
import Status, Toon, ToonError
13 async_register
as webhook_register,
14 async_unregister
as webhook_unregister,
23 from .const
import CONF_CLOUDHOOK_URL, DEFAULT_SCAN_INTERVAL, DOMAIN
25 _LOGGER = logging.getLogger(__name__)
29 """Class to manage fetching Toon data from single endpoint."""
32 self, hass: HomeAssistant, *, entry: ConfigEntry, session: OAuth2Session
34 """Initialize global Toon data updater."""
38 async
def async_token_refresh() -> str:
39 await session.async_ensure_token_valid()
40 return session.token[
"access_token"]
43 token=session.token[
"access_token"],
45 token_refresh_method=async_token_refresh,
49 hass, _LOGGER, name=DOMAIN, update_interval=DEFAULT_SCAN_INTERVAL
53 """Register a webhook with Toon to get live updates."""
54 if CONF_WEBHOOK_ID
not in self.
entryentry.data:
55 data = {**self.
entryentry.data, CONF_WEBHOOK_ID: secrets.token_hex()}
56 self.
hasshass.config_entries.async_update_entry(self.
entryentry, data=data)
58 if cloud.async_active_subscription(self.
hasshass):
59 if CONF_CLOUDHOOK_URL
not in self.
entryentry.data:
61 webhook_url = await cloud.async_create_cloudhook(
62 self.
hasshass, self.
entryentry.data[CONF_WEBHOOK_ID]
65 webhook_url = webhook.async_generate_url(
66 self.
hasshass, self.
entryentry.data[CONF_WEBHOOK_ID]
69 data = {**self.
entryentry.data, CONF_CLOUDHOOK_URL: webhook_url}
70 self.
hasshass.config_entries.async_update_entry(self.
entryentry, data=data)
72 webhook_url = self.
entryentry.data[CONF_CLOUDHOOK_URL]
74 webhook_url = webhook.async_generate_url(
75 self.
hasshass, self.
entryentry.data[CONF_WEBHOOK_ID]
79 webhook_unregister(self.
hasshass, self.
entryentry.data[CONF_WEBHOOK_ID])
85 self.
entryentry.data[CONF_WEBHOOK_ID],
90 await self.
toontoon.subscribe_webhook(
91 application_id=self.
entryentry.entry_id, url=webhook_url
93 _LOGGER.debug(
"Registered Toon webhook: %s", webhook_url)
94 except ToonError
as err:
95 _LOGGER.error(
"Error during webhook registration - %s", err)
97 self.
hasshass.bus.async_listen_once(
102 self, hass: HomeAssistant, webhook_id: str, request: web.Request
104 """Handle webhook callback."""
106 data = await request.json()
110 _LOGGER.debug(
"Got webhook data: %s", data)
113 if data.get(
"code") == 510:
118 "updateDataSet" not in data
119 or "commonName" not in data
120 or self.
datadata.agreement.display_common_name != data[
"commonName"]
122 _LOGGER.warning(
"Received invalid data from Toon webhook - %s", data)
126 await self.
toontoon.
update(data[
"updateDataSet"])
128 except ToonError
as err:
129 _LOGGER.error(
"Could not process data received from Toon webhook - %s", err)
132 """Remove / Unregister webhook for toon."""
134 "Unregistering Toon webhook (%s)", self.
entryentry.data[CONF_WEBHOOK_ID]
137 await self.
toontoon.unsubscribe_webhook(self.
entryentry.entry_id)
138 except ToonError
as err:
139 _LOGGER.error(
"Failed unregistering Toon webhook - %s", err)
141 webhook_unregister(self.
hasshass, self.
entryentry.data[CONF_WEBHOOK_ID])
144 """Fetch data from Toon."""
147 except ToonError
as error:
148 raise UpdateFailed(f
"Invalid response from API: {error}")
from error
None handle_webhook(self, HomeAssistant hass, str webhook_id, web.Request request)
None __init__(self, HomeAssistant hass, *ConfigEntry entry, OAuth2Session session)
None unregister_webhook(self, Event|None event=None)
Status _async_update_data(self)
None register_webhook(self, Event|None event=None)
None async_update_listeners(self)
IssData update(pyiss.ISS iss)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)