Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the Home Assistant alerts integration."""
2 
3 import dataclasses
4 import logging
5 
6 from awesomeversion import AwesomeVersion, AwesomeVersionStrategy
7 
8 from homeassistant.components.hassio import get_supervisor_info
9 from homeassistant.const import __version__
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.aiohttp_client import async_get_clientsession
12 from homeassistant.helpers.hassio import is_hassio
13 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
14 
15 from .const import DOMAIN, REQUEST_TIMEOUT, UPDATE_INTERVAL
16 
17 _LOGGER = logging.getLogger(__name__)
18 
19 
20 @dataclasses.dataclass(slots=True, frozen=True)
22  """Issue Registry Entry."""
23 
24  alert_id: str
25  integration: str
26  filename: str
27  date_updated: str | None
28 
29  @property
30  def issue_id(self) -> str:
31  """Return the issue id."""
32  return f"{self.filename}_{self.integration}"
33 
34 
35 class AlertUpdateCoordinator(DataUpdateCoordinator[dict[str, IntegrationAlert]]):
36  """Data fetcher for HA Alerts."""
37 
38  def __init__(self, hass: HomeAssistant) -> None:
39  """Initialize the data updater."""
40  super().__init__(
41  hass,
42  _LOGGER,
43  name=DOMAIN,
44  update_interval=UPDATE_INTERVAL,
45  )
46  self.ha_versionha_version = AwesomeVersion(
47  __version__,
48  ensure_strategy=AwesomeVersionStrategy.CALVER,
49  )
50  self.supervisorsupervisor = is_hassio(self.hasshass)
51 
52  async def _async_update_data(self) -> dict[str, IntegrationAlert]:
53  response = await async_get_clientsession(self.hasshass).get(
54  "https://alerts.home-assistant.io/alerts.json",
55  timeout=REQUEST_TIMEOUT,
56  )
57  alerts = await response.json()
58 
59  result = {}
60 
61  for alert in alerts:
62  if "integrations" not in alert:
63  continue
64 
65  if "homeassistant" in alert:
66  if "affected_from_version" in alert["homeassistant"]:
67  affected_from_version = AwesomeVersion(
68  alert["homeassistant"]["affected_from_version"],
69  )
70  if self.ha_versionha_version < affected_from_version:
71  continue
72  if "resolved_in_version" in alert["homeassistant"]:
73  resolved_in_version = AwesomeVersion(
74  alert["homeassistant"]["resolved_in_version"],
75  )
76  if self.ha_versionha_version >= resolved_in_version:
77  continue
78 
79  if self.supervisorsupervisor and "supervisor" in alert:
80  if (supervisor_info := get_supervisor_info(self.hasshass)) is None:
81  continue
82 
83  if "affected_from_version" in alert["supervisor"]:
84  affected_from_version = AwesomeVersion(
85  alert["supervisor"]["affected_from_version"],
86  )
87  if supervisor_info["version"] < affected_from_version:
88  continue
89  if "resolved_in_version" in alert["supervisor"]:
90  resolved_in_version = AwesomeVersion(
91  alert["supervisor"]["resolved_in_version"],
92  )
93  if supervisor_info["version"] >= resolved_in_version:
94  continue
95 
96  for integration in alert["integrations"]:
97  if "package" not in integration:
98  continue
99 
100  if integration["package"] not in self.hasshass.config.components:
101  continue
102 
103  integration_alert = IntegrationAlert(
104  alert_id=alert["id"],
105  integration=integration["package"],
106  filename=alert["filename"],
107  date_updated=alert.get("updated"),
108  )
109 
110  result[integration_alert.issue_id] = integration_alert
111 
112  return result
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
dict[str, Any]|None get_supervisor_info(HomeAssistant hass)
Definition: coordinator.py:99
bool is_hassio(HomeAssistant hass)
Definition: __init__.py:302
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)