1 """NextBus data update coordinator."""
3 from datetime
import timedelta
7 from py_nextbus
import NextBusClient
8 from py_nextbus.client
import NextBusFormatError, NextBusHTTPError
13 from .const
import DOMAIN
14 from .util
import RouteStop
16 _LOGGER = logging.getLogger(__name__)
20 """Class to manage fetching NextBus data."""
22 def __init__(self, hass: HomeAssistant, agency: str) ->
None:
23 """Initialize a global coordinator for fetching data for a given agency."""
31 self.
clientclient = NextBusClient(agency_id=agency)
33 self._route_stops: set[RouteStop] = set()
34 self.
_predictions_predictions: dict[RouteStop, dict[str, Any]] = {}
37 """Tell coordinator to start tracking a given stop and route."""
41 """Tell coordinator to stop tracking a given stop and route."""
45 """Get prediction result for a given stop and route."""
49 """Check if this coordinator is tracking any routes."""
50 return len(self._route_stops) > 0
53 """Fetch data from NextBus."""
55 _stops_to_route_stops: dict[str, set[RouteStop]] = {}
56 for route_stop
in self._route_stops:
57 _stops_to_route_stops.setdefault(route_stop.stop_id, set()).
add(route_stop)
60 "Updating data from API. Routes: %s",
str(_stops_to_route_stops)
64 """Fetch data from NextBus."""
65 self.
loggerlogger.debug(
"Updating data from API (executor)")
66 predictions: dict[RouteStop, dict[str, Any]] = {}
68 for stop_id, route_stops
in _stops_to_route_stops.items():
69 self.
loggerlogger.debug(
"Updating data from API (executor) %s", stop_id)
71 prediction_results = self.
clientclient.predictions_for_stop(stop_id)
72 except NextBusHTTPError
as ex:
74 "Error updating %s (executor): %s %s",
77 getattr(ex,
"response",
None),
79 raise UpdateFailed(
"Failed updating nextbus data", ex)
from ex
80 except NextBusFormatError
as ex:
81 raise UpdateFailed(
"Failed updating nextbus data", ex)
from ex
84 "Prediction results for %s (executor): %s",
86 str(prediction_results),
89 for route_stop
in route_stops:
90 for prediction_result
in prediction_results:
92 prediction_result[
"stop"][
"id"] == route_stop.stop_id
93 and prediction_result[
"route"][
"id"] == route_stop.route_id
95 predictions[route_stop] = prediction_result
99 "Prediction not found for %s (executor)",
str(route_stop)
106 return await self.
hasshass.async_add_executor_job(_update_data)
None add_stop_route(self, str stop_id, str route_id)
None __init__(self, HomeAssistant hass, str agency)
dict[str, Any]|None get_prediction_data(self, str stop_id, str route_id)
None remove_stop_route(self, str stop_id, str route_id)
dict[str, Any] _async_update_data(self)
bool add(self, _T matcher)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
_ItemT _update_data(self, _ItemT item, dict update_data)