Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for the Trafikverket Camera integration."""
2 
3 from __future__ import annotations
4 
5 from dataclasses import dataclass
6 from datetime import timedelta
7 from io import BytesIO
8 import logging
9 from typing import TYPE_CHECKING
10 
11 import aiohttp
12 from pytrafikverket import (
13  CameraInfoModel,
14  InvalidAuthentication,
15  MultipleCamerasFound,
16  NoCameraFound,
17  TrafikverketCamera,
18  UnknownError,
19 )
20 
21 from homeassistant.const import CONF_API_KEY, CONF_ID
22 from homeassistant.core import HomeAssistant
23 from homeassistant.exceptions import ConfigEntryAuthFailed
24 from homeassistant.helpers.aiohttp_client import async_get_clientsession
25 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
26 
27 from .const import DOMAIN
28 
29 if TYPE_CHECKING:
30  from . import TVCameraConfigEntry
31 
32 _LOGGER = logging.getLogger(__name__)
33 TIME_BETWEEN_UPDATES = timedelta(minutes=5)
34 
35 
36 @dataclass
37 class CameraData:
38  """Dataclass for Camera data."""
39 
40  data: CameraInfoModel
41  image: bytes | None
42 
43 
45  """A Trafikverket Data Update Coordinator."""
46 
47  def __init__(self, hass: HomeAssistant, config_entry: TVCameraConfigEntry) -> None:
48  """Initialize the Trafikverket coordinator."""
49  super().__init__(
50  hass,
51  _LOGGER,
52  config_entry=config_entry,
53  name=DOMAIN,
54  update_interval=TIME_BETWEEN_UPDATES,
55  )
56  self.sessionsession = async_get_clientsession(hass)
57  self._camera_api_camera_api = TrafikverketCamera(
58  self.sessionsession, config_entry.data[CONF_API_KEY]
59  )
60  self._id_id = config_entry.data[CONF_ID]
61 
62  async def _async_update_data(self) -> CameraData:
63  """Fetch data from Trafikverket."""
64  camera_data: CameraInfoModel
65  image: bytes | None = None
66  try:
67  camera_data = await self._camera_api_camera_api.async_get_camera(self._id_id)
68  except (NoCameraFound, MultipleCamerasFound, UnknownError) as error:
69  raise UpdateFailed from error
70  except InvalidAuthentication as error:
71  raise ConfigEntryAuthFailed from error
72 
73  if camera_data.photourl is None:
74  return CameraData(data=camera_data, image=None)
75 
76  image_url = camera_data.photourl
77  if camera_data.fullsizephoto:
78  image_url = f"{camera_data.photourl}?type=fullsize"
79 
80  async with self.sessionsession.get(
81  image_url, timeout=aiohttp.ClientTimeout(total=10)
82  ) as get_image:
83  if get_image.status not in range(200, 299):
84  raise UpdateFailed("Could not retrieve image")
85  image = BytesIO(await get_image.read()).getvalue()
86 
87  return CameraData(data=camera_data, image=image)
None __init__(self, HomeAssistant hass, TVCameraConfigEntry config_entry)
Definition: coordinator.py:47
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)