1 """DataUpdateCoordinator for the Trafikverket Camera integration."""
3 from __future__
import annotations
5 from dataclasses
import dataclass
6 from datetime
import timedelta
9 from typing
import TYPE_CHECKING
12 from pytrafikverket
import (
14 InvalidAuthentication,
27 from .const
import DOMAIN
30 from .
import TVCameraConfigEntry
32 _LOGGER = logging.getLogger(__name__)
38 """Dataclass for Camera data."""
45 """A Trafikverket Data Update Coordinator."""
47 def __init__(self, hass: HomeAssistant, config_entry: TVCameraConfigEntry) ->
None:
48 """Initialize the Trafikverket coordinator."""
52 config_entry=config_entry,
54 update_interval=TIME_BETWEEN_UPDATES,
58 self.
sessionsession, config_entry.data[CONF_API_KEY]
60 self.
_id_id = config_entry.data[CONF_ID]
63 """Fetch data from Trafikverket."""
64 camera_data: CameraInfoModel
65 image: bytes |
None =
None
67 camera_data = await self.
_camera_api_camera_api.async_get_camera(self.
_id_id)
68 except (NoCameraFound, MultipleCamerasFound, UnknownError)
as error:
69 raise UpdateFailed
from error
70 except InvalidAuthentication
as error:
71 raise ConfigEntryAuthFailed
from error
73 if camera_data.photourl
is None:
74 return CameraData(data=camera_data, image=
None)
76 image_url = camera_data.photourl
77 if camera_data.fullsizephoto:
78 image_url = f
"{camera_data.photourl}?type=fullsize"
81 image_url, timeout=aiohttp.ClientTimeout(total=10)
83 if get_image.status
not in range(200, 299):
85 image = BytesIO(await get_image.read()).getvalue()
87 return CameraData(data=camera_data, image=image)
None __init__(self, HomeAssistant hass, TVCameraConfigEntry config_entry)
CameraData _async_update_data(self)
web.Response get(self, web.Request request, str config_key)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)