1 """Support for OASA Telematics from telematics.oasa.gr."""
3 from __future__
import annotations
5 from datetime
import timedelta
7 from operator
import itemgetter
10 import voluptuous
as vol
13 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
24 _LOGGER = logging.getLogger(__name__)
26 ATTR_STOP_ID =
"stop_id"
27 ATTR_STOP_NAME =
"stop_name"
28 ATTR_ROUTE_ID =
"route_id"
29 ATTR_ROUTE_NAME =
"route_name"
30 ATTR_NEXT_ARRIVAL =
"next_arrival"
31 ATTR_SECOND_NEXT_ARRIVAL =
"second_next_arrival"
32 ATTR_NEXT_DEPARTURE =
"next_departure"
34 CONF_STOP_ID =
"stop_id"
35 CONF_ROUTE_ID =
"route_id"
37 DEFAULT_NAME =
"OASA Telematics"
42 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
44 vol.Required(CONF_STOP_ID): cv.string,
45 vol.Required(CONF_ROUTE_ID): cv.string,
46 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
54 add_entities: AddEntitiesCallback,
55 discovery_info: DiscoveryInfoType |
None =
None,
57 """Set up the OASA Telematics sensor."""
58 name = config[CONF_NAME]
59 stop_id = config[CONF_STOP_ID]
60 route_id = config.get(CONF_ROUTE_ID)
68 """Implementation of the OASA Telematics sensor."""
70 _attr_attribution =
"Data retrieved from telematics.oasa.gr"
71 _attr_icon =
"mdi:bus"
73 def __init__(self, data, stop_id, route_id, name):
74 """Initialize the sensor."""
83 """Return the name of the sensor."""
84 return self.
_name_name
88 """Return the class of this sensor."""
89 return SensorDeviceClass.TIMESTAMP
93 """Return the state of the sensor."""
98 """Return the state attributes."""
100 if self.
_times_times
is not None:
101 next_arrival_data = self.
_times_times[0]
102 if ATTR_NEXT_ARRIVAL
in next_arrival_data:
103 next_arrival = next_arrival_data[ATTR_NEXT_ARRIVAL]
104 params.update({ATTR_NEXT_ARRIVAL: next_arrival.isoformat()})
105 if len(self.
_times_times) > 1:
106 second_next_arrival_time = self.
_times_times[1][ATTR_NEXT_ARRIVAL]
107 if second_next_arrival_time
is not None:
108 second_arrival = second_next_arrival_time
110 {ATTR_SECOND_NEXT_ARRIVAL: second_arrival.isoformat()}
114 ATTR_ROUTE_ID: self.
_times_times[0][ATTR_ROUTE_ID],
115 ATTR_STOP_ID: self.
_stop_id_stop_id,
120 ATTR_ROUTE_NAME: self.
_name_data_name_data[ATTR_ROUTE_NAME],
121 ATTR_STOP_NAME: self.
_name_data_name_data[ATTR_STOP_NAME],
124 return {k: v
for k, v
in params.items()
if v}
127 """Get the latest data from OASA API and update the states."""
131 next_arrival_data = self.
_times_times[0]
132 if ATTR_NEXT_ARRIVAL
in next_arrival_data:
133 self.
_state_state = next_arrival_data[ATTR_NEXT_ARRIVAL]
137 """The class for handling data retrieval."""
140 """Initialize the data object."""
151 """Object returned when no arrivals are found."""
152 return [{ATTR_ROUTE_ID: self.
route_idroute_id}]
155 """Get the route name from the API."""
159 return route[0].
get(
"route_departure_eng")
161 _LOGGER.error(
"Cannot get route name from OASA API")
165 """Get the stop name from the API."""
167 name_data = self.
oasa_apioasa_api.getStopNameAndXY(self.
stop_idstop_id)
169 return name_data[0].
get(
"stop_descr_matrix_eng")
171 _LOGGER.error(
"Cannot get stop name from OASA API")
175 """Get the latest arrival data from telematics.oasa.gr API."""
185 results = [r
for r
in results
if r.get(
"route_code")
in self.
route_idroute_id]
186 current_time = dt_util.utcnow()
188 for result
in results:
189 if (btime2 := result.get(
"btime2"))
is not None:
190 arrival_min =
int(btime2)
191 timestamp = current_time +
timedelta(minutes=arrival_min)
193 ATTR_NEXT_ARRIVAL: timestamp,
194 ATTR_ROUTE_ID: self.
route_idroute_id,
196 self.
infoinfo.append(arrival_data)
198 if not self.
infoinfo:
199 _LOGGER.debug(
"No arrivals with given parameters")
204 sort = sorted(self.
infoinfo, key=itemgetter(ATTR_NEXT_ARRIVAL))
def __init__(self, stop_id, route_id)
def extra_state_attributes(self)
def __init__(self, data, stop_id, route_id, name)
web.Response get(self, web.Request request, str config_key)
def add_entities(account, async_add_entities, tracked)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)