Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for monitoring the size of a file."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 import logging
7 import os
8 import pathlib
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 import homeassistant.util.dt as dt_util
13 
14 from .const import DOMAIN
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
19 class FileSizeCoordinator(DataUpdateCoordinator[dict[str, int | float | datetime]]):
20  """Filesize coordinator."""
21 
22  path: pathlib.Path
23 
24  def __init__(self, hass: HomeAssistant, unresolved_path: str) -> None:
25  """Initialize filesize coordinator."""
26  super().__init__(
27  hass,
28  _LOGGER,
29  name=DOMAIN,
30  update_interval=timedelta(seconds=60),
31  always_update=False,
32  )
33  self._unresolved_path_unresolved_path = unresolved_path
34 
35  def _get_full_path(self) -> pathlib.Path:
36  """Check if path is valid, allowed and return full path."""
37  path = self._unresolved_path_unresolved_path
38  get_path = pathlib.Path(path)
39  if not self.hasshass.config.is_allowed_path(path):
40  raise UpdateFailed(f"Filepath {path} is not valid or allowed")
41 
42  if not get_path.exists() or not get_path.is_file():
43  raise UpdateFailed(f"Can not access file {path}")
44 
45  return get_path.absolute()
46 
47  def _update(self) -> os.stat_result:
48  """Fetch file information."""
49  try:
50  return self.pathpath.stat()
51  except OSError as error:
52  raise UpdateFailed(f"Can not retrieve file statistics {error}") from error
53 
54  async def _async_setup(self) -> None:
55  """Set up path."""
56  self.pathpath = await self.hasshass.async_add_executor_job(self._get_full_path_get_full_path)
57 
58  async def _async_update_data(self) -> dict[str, float | int | datetime]:
59  """Fetch file information."""
60  statinfo = await self.hasshass.async_add_executor_job(self._update_update)
61  size = statinfo.st_size
62  last_updated = dt_util.utc_from_timestamp(statinfo.st_mtime)
63  created = dt_util.utc_from_timestamp(statinfo.st_ctime)
64 
65  _LOGGER.debug("size %s, last updated %s", size, last_updated)
66  data: dict[str, int | float | datetime] = {
67  "file": round(size / 1e6, 2),
68  "bytes": size,
69  "last_updated": last_updated,
70  "created": created,
71  }
72 
73  return data
None __init__(self, HomeAssistant hass, str unresolved_path)
Definition: coordinator.py:24