Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the P1 Monitor integration."""
2 
3 from __future__ import annotations
4 
5 from typing import TypedDict
6 
7 from p1monitor import (
8  P1Monitor,
9  P1MonitorConnectionError,
10  P1MonitorNoDataError,
11  Phases,
12  Settings,
13  SmartMeter,
14  WaterMeter,
15 )
16 
17 from homeassistant.config_entries import ConfigEntry
18 from homeassistant.const import CONF_HOST, CONF_PORT
19 from homeassistant.core import HomeAssistant
20 from homeassistant.helpers.aiohttp_client import async_get_clientsession
21 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
22 
23 from .const import (
24  DOMAIN,
25  LOGGER,
26  SCAN_INTERVAL,
27  SERVICE_PHASES,
28  SERVICE_SETTINGS,
29  SERVICE_SMARTMETER,
30  SERVICE_WATERMETER,
31 )
32 
33 
34 class P1MonitorData(TypedDict):
35  """Class for defining data in dict."""
36 
37  smartmeter: SmartMeter
38  phases: Phases
39  settings: Settings
40  watermeter: WaterMeter | None
41 
42 
44  """Class to manage fetching P1 Monitor data from single endpoint."""
45 
46  config_entry: ConfigEntry
47  has_water_meter: bool | None = None
48 
49  def __init__(
50  self,
51  hass: HomeAssistant,
52  ) -> None:
53  """Initialize global P1 Monitor data updater."""
54  super().__init__(
55  hass,
56  LOGGER,
57  name=DOMAIN,
58  update_interval=SCAN_INTERVAL,
59  )
60 
61  self.p1monitorp1monitor = P1Monitor(
62  host=self.config_entryconfig_entry.data[CONF_HOST],
63  port=self.config_entryconfig_entry.data[CONF_PORT],
64  session=async_get_clientsession(hass),
65  )
66 
67  async def _async_update_data(self) -> P1MonitorData:
68  """Fetch data from P1 Monitor."""
69  data: P1MonitorData = {
70  SERVICE_SMARTMETER: await self.p1monitorp1monitor.smartmeter(),
71  SERVICE_PHASES: await self.p1monitorp1monitor.phases(),
72  SERVICE_SETTINGS: await self.p1monitorp1monitor.settings(),
73  SERVICE_WATERMETER: None,
74  }
75 
76  if self.has_water_meterhas_water_meter or self.has_water_meterhas_water_meter is None:
77  try:
78  data[SERVICE_WATERMETER] = await self.p1monitorp1monitor.watermeter()
79  self.has_water_meterhas_water_meter = True
80  except (P1MonitorNoDataError, P1MonitorConnectionError):
81  LOGGER.debug("No water meter data received from P1 Monitor")
82  if self.has_water_meterhas_water_meter is None:
83  self.has_water_meterhas_water_meter = False
84 
85  return data
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)