Home Assistant Unofficial Reference 2024.12.1
sensor.py
Go to the documentation of this file.
1 """Support for departure information for Rhein-Main public transport."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from datetime import timedelta
7 import logging
8 
9 from RMVtransport import RMVtransport
10 from RMVtransport.rmvtransport import (
11  RMVtransportApiConnectionError,
12  RMVtransportDataError,
13 )
14 import voluptuous as vol
15 
17  PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
18  SensorEntity,
19 )
20 from homeassistant.const import CONF_NAME, CONF_TIMEOUT, UnitOfTime
21 from homeassistant.core import HomeAssistant
22 from homeassistant.exceptions import PlatformNotReady
24 from homeassistant.helpers.entity_platform import AddEntitiesCallback
25 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
26 from homeassistant.util import Throttle
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 CONF_NEXT_DEPARTURE = "next_departure"
31 
32 CONF_STATION = "station"
33 CONF_DESTINATIONS = "destinations"
34 CONF_DIRECTION = "direction"
35 CONF_LINES = "lines"
36 CONF_PRODUCTS = "products"
37 CONF_TIME_OFFSET = "time_offset"
38 CONF_MAX_JOURNEYS = "max_journeys"
39 
40 DEFAULT_NAME = "RMV Journey"
41 
42 VALID_PRODUCTS = ["U-Bahn", "Tram", "Bus", "S", "RB", "RE", "EC", "IC", "ICE"]
43 
44 ICONS = {
45  "U-Bahn": "mdi:subway",
46  "Tram": "mdi:tram",
47  "Bus": "mdi:bus",
48  "S": "mdi:train",
49  "RB": "mdi:train",
50  "RE": "mdi:train",
51  "EC": "mdi:train",
52  "IC": "mdi:train",
53  "ICE": "mdi:train",
54  "SEV": "mdi:checkbox-blank-circle-outline",
55  None: "mdi:clock",
56 }
57 ATTRIBUTION = "Data provided by opendata.rmv.de"
58 
59 SCAN_INTERVAL = timedelta(seconds=60)
60 
61 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
62  {
63  vol.Required(CONF_NEXT_DEPARTURE): [
64  {
65  vol.Required(CONF_STATION): cv.string,
66  vol.Optional(CONF_DESTINATIONS, default=[]): vol.All(
67  cv.ensure_list, [cv.string]
68  ),
69  vol.Optional(CONF_DIRECTION): cv.string,
70  vol.Optional(CONF_LINES, default=[]): vol.All(
71  cv.ensure_list, [cv.positive_int, cv.string]
72  ),
73  vol.Optional(CONF_PRODUCTS, default=VALID_PRODUCTS): vol.All(
74  cv.ensure_list, [vol.In(VALID_PRODUCTS)]
75  ),
76  vol.Optional(CONF_TIME_OFFSET, default=0): cv.positive_int,
77  vol.Optional(CONF_MAX_JOURNEYS, default=5): cv.positive_int,
78  vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
79  }
80  ],
81  vol.Optional(CONF_TIMEOUT, default=10): cv.positive_int,
82  }
83 )
84 
85 
87  hass: HomeAssistant,
88  config: ConfigType,
89  async_add_entities: AddEntitiesCallback,
90  discovery_info: DiscoveryInfoType | None = None,
91 ) -> None:
92  """Set up the RMV departure sensor."""
93  timeout = config.get(CONF_TIMEOUT)
94 
95  sensors = [
97  next_departure[CONF_STATION],
98  next_departure.get(CONF_DESTINATIONS),
99  next_departure.get(CONF_DIRECTION),
100  next_departure.get(CONF_LINES),
101  next_departure.get(CONF_PRODUCTS),
102  next_departure.get(CONF_TIME_OFFSET),
103  next_departure.get(CONF_MAX_JOURNEYS),
104  next_departure.get(CONF_NAME),
105  timeout,
106  )
107  for next_departure in config[CONF_NEXT_DEPARTURE]
108  ]
109 
110  tasks = [asyncio.create_task(sensor.async_update()) for sensor in sensors]
111  if tasks:
112  await asyncio.wait(tasks)
113 
114  if not any(sensor.data for sensor in sensors):
115  raise PlatformNotReady
116 
117  async_add_entities(sensors)
118 
119 
121  """Implementation of an RMV departure sensor."""
122 
123  _attr_attribution = ATTRIBUTION
124 
125  def __init__(
126  self,
127  station,
128  destinations,
129  direction,
130  lines,
131  products,
132  time_offset,
133  max_journeys,
134  name,
135  timeout,
136  ):
137  """Initialize the sensor."""
138  self._station_station = station
139  self._name_name = name
140  self._state_state = None
141  self.datadata = RMVDepartureData(
142  station,
143  destinations,
144  direction,
145  lines,
146  products,
147  time_offset,
148  max_journeys,
149  timeout,
150  )
151  self._icon_icon = ICONS[None]
152 
153  @property
154  def name(self):
155  """Return the name of the sensor."""
156  return self._name_name
157 
158  @property
159  def available(self):
160  """Return True if entity is available."""
161  return self._state_state is not None
162 
163  @property
164  def native_value(self):
165  """Return the next departure time."""
166  return self._state_state
167 
168  @property
170  """Return the state attributes."""
171  try:
172  return {
173  "next_departures": self.datadata.departures[1:],
174  "direction": self.datadata.departures[0].get("direction"),
175  "line": self.datadata.departures[0].get("line"),
176  "minutes": self.datadata.departures[0].get("minutes"),
177  "departure_time": self.datadata.departures[0].get("departure_time"),
178  "product": self.datadata.departures[0].get("product"),
179  }
180  except IndexError:
181  return {}
182 
183  @property
184  def icon(self):
185  """Icon to use in the frontend, if any."""
186  return self._icon_icon
187 
188  @property
190  """Return the unit this state is expressed in."""
191  return UnitOfTime.MINUTES
192 
193  async def async_update(self) -> None:
194  """Get the latest data and update the state."""
195  await self.datadata.async_update()
196 
197  if self._name_name == DEFAULT_NAME:
198  self._name_name = self.datadata.station
199 
200  self._station_station = self.datadata.station
201 
202  if not self.datadata.departures:
203  self._state_state = None
204  self._icon_icon = ICONS[None]
205  return
206 
207  self._state_state = self.datadata.departures[0].get("minutes")
208  self._icon_icon = ICONS[self.datadata.departures[0].get("product")]
209 
210 
212  """Pull data from the opendata.rmv.de web page."""
213 
214  def __init__(
215  self,
216  station_id,
217  destinations,
218  direction,
219  lines,
220  products,
221  time_offset,
222  max_journeys,
223  timeout,
224  ):
225  """Initialize the sensor."""
226  self.stationstation = None
227  self._station_id_station_id = station_id
228  self._destinations_destinations = destinations
229  self._direction_direction = direction
230  self._lines_lines = lines
231  self._products_products = products
232  self._time_offset_time_offset = time_offset
233  self._max_journeys_max_journeys = max_journeys
234  self.rmvrmv = RMVtransport(timeout)
235  self.departuresdepartures = []
236  self._error_notification_error_notification = False
237 
238  @Throttle(SCAN_INTERVAL)
239  async def async_update(self):
240  """Update the connection data."""
241  try:
242  _data = await self.rmvrmv.get_departures(
243  self._station_id_station_id,
244  products=self._products_products,
245  direction_id=self._direction_direction,
246  max_journeys=50,
247  )
248 
249  except (RMVtransportApiConnectionError, RMVtransportDataError):
250  self.departuresdepartures = []
251  _LOGGER.warning("Could not retrieve data from rmv.de")
252  return
253 
254  self.stationstation = _data.get("station")
255 
256  _deps = []
257  _deps_not_found = set(self._destinations_destinations)
258 
259  for journey in _data["journeys"]:
260  # find the first departure meeting the criteria
261  _nextdep = {}
262  if self._destinations_destinations:
263  dest_found = False
264  for dest in self._destinations_destinations:
265  if dest in journey["stops"]:
266  dest_found = True
267  if dest in _deps_not_found:
268  _deps_not_found.remove(dest)
269  _nextdep["destination"] = dest
270 
271  if not dest_found:
272  continue
273 
274  if (
275  self._lines_lines
276  and journey["number"] not in self._lines_lines
277  or journey["minutes"] < self._time_offset_time_offset
278  ):
279  continue
280 
281  for attr in ("direction", "departure_time", "product", "minutes"):
282  _nextdep[attr] = journey.get(attr, "")
283 
284  _nextdep["line"] = journey.get("number", "")
285  _deps.append(_nextdep)
286 
287  if len(_deps) > self._max_journeys_max_journeys:
288  break
289 
290  if not self._error_notification_error_notification and _deps_not_found:
291  self._error_notification_error_notification = True
292  _LOGGER.warning("Destination(s) %s not found", ", ".join(_deps_not_found))
293 
294  self.departuresdepartures = _deps
def __init__(self, station_id, destinations, direction, lines, products, time_offset, max_journeys, timeout)
Definition: sensor.py:224
def __init__(self, station, destinations, direction, lines, products, time_offset, max_journeys, name, timeout)
Definition: sensor.py:136
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
Definition: sensor.py:91