Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """NextBus data update coordinator."""
2 
3 from datetime import timedelta
4 import logging
5 from typing import Any
6 
7 from py_nextbus import NextBusClient
8 from py_nextbus.client import NextBusFormatError, NextBusHTTPError
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import DOMAIN
14 from .util import RouteStop
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
20  """Class to manage fetching NextBus data."""
21 
22  def __init__(self, hass: HomeAssistant, agency: str) -> None:
23  """Initialize a global coordinator for fetching data for a given agency."""
24  super().__init__(
25  hass,
26  _LOGGER,
27  config_entry=None, # It is shared between multiple entries
28  name=DOMAIN,
29  update_interval=timedelta(seconds=30),
30  )
31  self.clientclient = NextBusClient(agency_id=agency)
32  self._agency_agency = agency
33  self._route_stops: set[RouteStop] = set()
34  self._predictions_predictions: dict[RouteStop, dict[str, Any]] = {}
35 
36  def add_stop_route(self, stop_id: str, route_id: str) -> None:
37  """Tell coordinator to start tracking a given stop and route."""
38  self._route_stops.add(RouteStop(route_id, stop_id))
39 
40  def remove_stop_route(self, stop_id: str, route_id: str) -> None:
41  """Tell coordinator to stop tracking a given stop and route."""
42  self._route_stops.remove(RouteStop(route_id, stop_id))
43 
44  def get_prediction_data(self, stop_id: str, route_id: str) -> dict[str, Any] | None:
45  """Get prediction result for a given stop and route."""
46  return self._predictions_predictions.get(RouteStop(route_id, stop_id))
47 
48  def has_routes(self) -> bool:
49  """Check if this coordinator is tracking any routes."""
50  return len(self._route_stops) > 0
51 
52  async def _async_update_data(self) -> dict[str, Any]:
53  """Fetch data from NextBus."""
54 
55  _stops_to_route_stops: dict[str, set[RouteStop]] = {}
56  for route_stop in self._route_stops:
57  _stops_to_route_stops.setdefault(route_stop.stop_id, set()).add(route_stop)
58 
59  self.loggerlogger.debug(
60  "Updating data from API. Routes: %s", str(_stops_to_route_stops)
61  )
62 
63  def _update_data() -> dict:
64  """Fetch data from NextBus."""
65  self.loggerlogger.debug("Updating data from API (executor)")
66  predictions: dict[RouteStop, dict[str, Any]] = {}
67 
68  for stop_id, route_stops in _stops_to_route_stops.items():
69  self.loggerlogger.debug("Updating data from API (executor) %s", stop_id)
70  try:
71  prediction_results = self.clientclient.predictions_for_stop(stop_id)
72  except NextBusHTTPError as ex:
73  self.loggerlogger.error(
74  "Error updating %s (executor): %s %s",
75  str(stop_id),
76  ex,
77  getattr(ex, "response", None),
78  )
79  raise UpdateFailed("Failed updating nextbus data", ex) from ex
80  except NextBusFormatError as ex:
81  raise UpdateFailed("Failed updating nextbus data", ex) from ex
82 
83  self.loggerlogger.debug(
84  "Prediction results for %s (executor): %s",
85  str(stop_id),
86  str(prediction_results),
87  )
88 
89  for route_stop in route_stops:
90  for prediction_result in prediction_results:
91  if (
92  prediction_result["stop"]["id"] == route_stop.stop_id
93  and prediction_result["route"]["id"] == route_stop.route_id
94  ):
95  predictions[route_stop] = prediction_result
96  break
97  else:
98  self.loggerlogger.warning(
99  "Prediction not found for %s (executor)", str(route_stop)
100  )
101 
102  self._predictions_predictions = predictions
103 
104  return predictions
105 
106  return await self.hasshass.async_add_executor_job(_update_data)
dict[str, Any]|None get_prediction_data(self, str stop_id, str route_id)
Definition: coordinator.py:44
bool add(self, _T matcher)
Definition: match.py:185
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
_ItemT _update_data(self, _ItemT item, dict update_data)
Definition: collection.py:296