Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinators for the System monitor integration."""
2 
3 from __future__ import annotations
4 
5 from dataclasses import dataclass
6 from datetime import datetime
7 import logging
8 import os
9 from typing import Any, NamedTuple
10 
11 from psutil import Process
12 from psutil._common import sdiskusage, shwtemp, snetio, snicaddr, sswap
13 import psutil_home_assistant as ha_psutil
14 
15 from homeassistant.core import HomeAssistant
16 from homeassistant.helpers.entity_component import DEFAULT_SCAN_INTERVAL
17 from homeassistant.helpers.update_coordinator import TimestampDataUpdateCoordinator
18 from homeassistant.util import dt as dt_util
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 
23 @dataclass(frozen=True, kw_only=True, slots=True)
24 class SensorData:
25  """Sensor data."""
26 
27  disk_usage: dict[str, sdiskusage]
28  swap: sswap
29  memory: VirtualMemory
30  io_counters: dict[str, snetio]
31  addresses: dict[str, list[snicaddr]]
32  load: tuple[float, float, float]
33  cpu_percent: float | None
34  boot_time: datetime
35  processes: list[Process]
36  temperatures: dict[str, list[shwtemp]]
37 
38  def as_dict(self) -> dict[str, Any]:
39  """Return as dict."""
40  disk_usage = None
41  if self.disk_usage:
42  disk_usage = {k: str(v) for k, v in self.disk_usage.items()}
43  io_counters = None
44  if self.io_counters:
45  io_counters = {k: str(v) for k, v in self.io_counters.items()}
46  addresses = None
47  if self.addresses:
48  addresses = {k: str(v) for k, v in self.addresses.items()}
49  temperatures = None
50  if self.temperatures:
51  temperatures = {k: str(v) for k, v in self.temperatures.items()}
52  return {
53  "disk_usage": disk_usage,
54  "swap": str(self.swap),
55  "memory": str(self.memory),
56  "io_counters": io_counters,
57  "addresses": addresses,
58  "load": str(self.load),
59  "cpu_percent": str(self.cpu_percent),
60  "boot_time": str(self.boot_time),
61  "processes": str(self.processes),
62  "temperatures": temperatures,
63  }
64 
65 
66 class VirtualMemory(NamedTuple):
67  """Represents virtual memory.
68 
69  psutil defines virtual memory by platform.
70  Create our own definition here to be platform independent.
71  """
72 
73  total: float
74  available: float
75  percent: float
76  used: float
77  free: float
78 
79 
81  """A System monitor Data Update Coordinator."""
82 
83  def __init__(
84  self,
85  hass: HomeAssistant,
86  psutil_wrapper: ha_psutil.PsutilWrapper,
87  arguments: list[str],
88  ) -> None:
89  """Initialize the coordinator."""
90  super().__init__(
91  hass,
92  _LOGGER,
93  name="System Monitor update coordinator",
94  update_interval=DEFAULT_SCAN_INTERVAL,
95  always_update=False,
96  )
97  self._psutil_psutil = psutil_wrapper.psutil
98  self._arguments_arguments = arguments
99  self.boot_timeboot_time: datetime | None = None
100 
101  self._initial_update_initial_update: bool = True
102  self.update_subscribers: dict[tuple[str, str], set[str]] = (
103  self.set_subscribers_tuplesset_subscribers_tuples(arguments)
104  )
105 
107  self, arguments: list[str]
108  ) -> dict[tuple[str, str], set[str]]:
109  """Set tuples in subscribers dictionary."""
110  _disk_defaults: dict[tuple[str, str], set[str]] = {}
111  for argument in arguments:
112  _disk_defaults[("disks", argument)] = set()
113  return {
114  **_disk_defaults,
115  ("swap", ""): set(),
116  ("memory", ""): set(),
117  ("io_counters", ""): set(),
118  ("addresses", ""): set(),
119  ("load", ""): set(),
120  ("cpu_percent", ""): set(),
121  ("boot", ""): set(),
122  ("processes", ""): set(),
123  ("temperatures", ""): set(),
124  }
125 
126  async def _async_update_data(self) -> SensorData:
127  """Fetch data."""
128  _LOGGER.debug("Update list is: %s", self.update_subscribers)
129 
130  _data = await self.hasshass.async_add_executor_job(self.update_dataupdate_data)
131 
132  load: tuple = (None, None, None)
133  if self.update_subscribers[("load", "")] or self._initial_update_initial_update:
134  load = os.getloadavg()
135  _LOGGER.debug("Load: %s", load)
136 
137  cpu_percent: float | None = None
138  if self.update_subscribers[("cpu_percent", "")] or self._initial_update_initial_update:
139  cpu_percent = self._psutil_psutil.cpu_percent(interval=None)
140  _LOGGER.debug("cpu_percent: %s", cpu_percent)
141 
142  self._initial_update_initial_update = False
143  return SensorData(
144  disk_usage=_data["disks"],
145  swap=_data["swap"],
146  memory=_data["memory"],
147  io_counters=_data["io_counters"],
148  addresses=_data["addresses"],
149  load=load,
150  cpu_percent=cpu_percent,
151  boot_time=_data["boot_time"],
152  processes=_data["processes"],
153  temperatures=_data["temperatures"],
154  )
155 
156  def update_data(self) -> dict[str, Any]:
157  """To be extended by data update coordinators."""
158  disks: dict[str, sdiskusage] = {}
159  for argument in self._arguments_arguments:
160  if self.update_subscribers[("disks", argument)] or self._initial_update_initial_update:
161  try:
162  usage: sdiskusage = self._psutil_psutil.disk_usage(argument)
163  _LOGGER.debug("sdiskusagefor %s: %s", argument, usage)
164  except PermissionError as err:
165  _LOGGER.warning(
166  "No permission to access %s, error %s", argument, err
167  )
168  except OSError as err:
169  _LOGGER.warning("OS error for %s, error %s", argument, err)
170  else:
171  disks[argument] = usage
172 
173  swap: sswap | None = None
174  if self.update_subscribers[("swap", "")] or self._initial_update_initial_update:
175  swap = self._psutil_psutil.swap_memory()
176  _LOGGER.debug("sswap: %s", swap)
177 
178  memory = None
179  if self.update_subscribers[("memory", "")] or self._initial_update_initial_update:
180  memory = self._psutil_psutil.virtual_memory()
181  _LOGGER.debug("memory: %s", memory)
182  memory = VirtualMemory(
183  memory.total, memory.available, memory.percent, memory.used, memory.free
184  )
185 
186  io_counters: dict[str, snetio] | None = None
187  if self.update_subscribers[("io_counters", "")] or self._initial_update_initial_update:
188  io_counters = self._psutil_psutil.net_io_counters(pernic=True)
189  _LOGGER.debug("io_counters: %s", io_counters)
190 
191  addresses: dict[str, list[snicaddr]] | None = None
192  if self.update_subscribers[("addresses", "")] or self._initial_update_initial_update:
193  addresses = self._psutil_psutil.net_if_addrs()
194  _LOGGER.debug("ip_addresses: %s", addresses)
195 
196  if self._initial_update_initial_update:
197  # Boot time only needs to refresh on first pass
198  self.boot_timeboot_time = dt_util.utc_from_timestamp(self._psutil_psutil.boot_time())
199  _LOGGER.debug("boot time: %s", self.boot_timeboot_time)
200 
201  processes = None
202  if self.update_subscribers[("processes", "")] or self._initial_update_initial_update:
203  processes = self._psutil_psutil.process_iter()
204  _LOGGER.debug("processes: %s", processes)
205  processes = list(processes)
206 
207  temps: dict[str, list[shwtemp]] = {}
208  if self.update_subscribers[("temperatures", "")] or self._initial_update_initial_update:
209  try:
210  temps = self._psutil_psutil.sensors_temperatures()
211  _LOGGER.debug("temps: %s", temps)
212  except AttributeError:
213  _LOGGER.debug("OS does not provide temperature sensors")
214 
215  return {
216  "disks": disks,
217  "swap": swap,
218  "memory": memory,
219  "io_counters": io_counters,
220  "addresses": addresses,
221  "boot_time": self.boot_timeboot_time,
222  "processes": processes,
223  "temperatures": temps,
224  }
dict[tuple[str, str], set[str]] set_subscribers_tuples(self, list[str] arguments)
Definition: coordinator.py:108
None __init__(self, HomeAssistant hass, ha_psutil.PsutilWrapper psutil_wrapper, list[str] arguments)
Definition: coordinator.py:88