Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinator for the swiss_public_transport integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 import logging
7 from typing import TypedDict
8 
9 from opendata_transport import OpendataTransport
10 from opendata_transport.exceptions import (
11  OpendataTransportConnectionError,
12  OpendataTransportError,
13 )
14 
15 from homeassistant.config_entries import ConfigEntry
16 from homeassistant.core import HomeAssistant
17 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
18 import homeassistant.util.dt as dt_util
19 from homeassistant.util.json import JsonValueType
20 
21 from .const import CONNECTIONS_COUNT, DEFAULT_UPDATE_TIME, DOMAIN
22 from .helper import offset_opendata
23 
24 _LOGGER = logging.getLogger(__name__)
25 
26 type SwissPublicTransportConfigEntry = ConfigEntry[
27  SwissPublicTransportDataUpdateCoordinator
28 ]
29 
30 
31 class DataConnection(TypedDict):
32  """A connection data class."""
33 
34  departure: datetime | None
35  duration: int | None
36  platform: str
37  remaining_time: str
38  start: str
39  destination: str
40  train_number: str
41  transfers: int
42  delay: int
43  line: str
44 
45 
46 def calculate_duration_in_seconds(duration_text: str) -> int | None:
47  """Transform and calculate the duration into seconds."""
48  # Transform 01d03:21:23 into 01 days 03:21:23
49  duration_text_pg_format = duration_text.replace("d", " days ")
50  duration = dt_util.parse_duration(duration_text_pg_format)
51  return duration.seconds if duration else None
52 
53 
55  DataUpdateCoordinator[list[DataConnection]]
56 ):
57  """A SwissPublicTransport Data Update Coordinator."""
58 
59  config_entry: SwissPublicTransportConfigEntry
60 
61  def __init__(
62  self,
63  hass: HomeAssistant,
64  opendata: OpendataTransport,
65  time_offset: dict[str, int] | None,
66  ) -> None:
67  """Initialize the SwissPublicTransport data coordinator."""
68  super().__init__(
69  hass,
70  _LOGGER,
71  name=DOMAIN,
72  update_interval=timedelta(seconds=DEFAULT_UPDATE_TIME),
73  )
74  self._opendata_opendata = opendata
75  self._time_offset_time_offset = time_offset
76 
77  def remaining_time(self, departure) -> timedelta | None:
78  """Calculate the remaining time for the departure."""
79  departure_datetime = dt_util.parse_datetime(departure)
80 
81  if departure_datetime:
82  return departure_datetime - dt_util.as_local(dt_util.utcnow())
83  return None
84 
85  async def _async_update_data(self) -> list[DataConnection]:
86  return await self.fetch_connectionsfetch_connections(limit=CONNECTIONS_COUNT)
87 
88  async def fetch_connections(self, limit: int) -> list[DataConnection]:
89  """Fetch connections using the opendata api."""
90  self._opendata_opendata.limit = limit
91  if self._time_offset_time_offset:
92  offset_opendata(self._opendata_opendata, self._time_offset_time_offset)
93 
94  try:
95  await self._opendata_opendata.async_get_data()
96  except OpendataTransportConnectionError as e:
97  _LOGGER.warning("Connection to transport.opendata.ch cannot be established")
98  raise UpdateFailed from e
99  except OpendataTransportError as e:
100  _LOGGER.warning(
101  "Unable to connect and retrieve data from transport.opendata.ch"
102  )
103  raise UpdateFailed from e
104  connections = self._opendata_opendata.connections
105  return [
107  departure=dt_util.parse_datetime(connections[i]["departure"]),
108  train_number=connections[i]["number"],
109  platform=connections[i]["platform"],
110  transfers=connections[i]["transfers"],
111  duration=calculate_duration_in_seconds(connections[i]["duration"]),
112  start=self._opendata_opendata.from_name,
113  destination=self._opendata_opendata.to_name,
114  remaining_time=str(self.remaining_timeremaining_time(connections[i]["departure"])),
115  delay=connections[i]["delay"],
116  line=connections[i]["line"],
117  )
118  for i in range(limit)
119  if len(connections) > i and connections[i] is not None
120  ]
121 
122  async def fetch_connections_as_json(self, limit: int) -> list[JsonValueType]:
123  """Fetch connections using the opendata api."""
124  return [
125  {
126  "departure": connection["departure"].isoformat()
127  if connection["departure"]
128  else None,
129  "duration": connection["duration"],
130  "platform": connection["platform"],
131  "remaining_time": connection["remaining_time"],
132  "start": connection["start"],
133  "destination": connection["destination"],
134  "train_number": connection["train_number"],
135  "transfers": connection["transfers"],
136  "delay": connection["delay"],
137  "line": connection["line"],
138  }
139  for connection in await self.fetch_connectionsfetch_connections(limit)
140  ]
None __init__(self, HomeAssistant hass, OpendataTransport opendata, dict[str, int]|None time_offset)
Definition: coordinator.py:66
RadioThermUpdate async_get_data(HomeAssistant hass, CommonThermostat device)
Definition: data.py:73
None offset_opendata(OpendataTransport opendata, dict[str, int] offset)
Definition: helper.py:22