1 """DataUpdateCoordinator for the israel rail integration."""
3 from __future__
import annotations
5 from dataclasses
import dataclass
6 from datetime
import datetime
9 from israelrailapi
import TrainSchedule
10 from israelrailapi.api
import TrainRoute
11 from israelrailapi.train_station
import station_name_to_id
18 from .const
import DEFAULT_SCAN_INTERVAL, DEPARTURES_COUNT, DOMAIN
20 _LOGGER = logging.getLogger(__name__)
25 """A connection data class."""
27 departure: datetime |
None
36 """Get departure time."""
37 start_datetime = dt_util.parse_datetime(train_route.start_time)
38 return start_datetime.astimezone()
if start_datetime
else None
42 """A IsraelRail Data Update Coordinator."""
44 config_entry: ConfigEntry
49 train_schedule: TrainSchedule,
53 """Initialize the IsraelRail data coordinator."""
58 update_interval=DEFAULT_SCAN_INTERVAL,
66 train_routes = await self.
hasshass.async_add_executor_job(
70 datetime.now().strftime(
"%Y-%m-%d"),
71 datetime.now().strftime(
"%H:%M"),
73 except Exception
as e:
75 "Unable to connect and retrieve data from israelrail api",
81 train_number=train_routes[i].trains[0].data[
"trainNumber"],
82 platform=train_routes[i].trains[0].platform,
83 trains=len(train_routes[i].trains),
84 start=station_name_to_id(train_routes[i].trains[0].src),
85 destination=station_name_to_id(train_routes[i].trains[-1].dst),
87 for i
in range(DEPARTURES_COUNT)
88 if len(train_routes) > i
and train_routes[i]
is not None
list[DataConnection] _async_update_data(self)
None __init__(self, HomeAssistant hass, TrainSchedule train_schedule, str start, str destination)
datetime|None departure_time(TrainRoute train_route)