1 """DataUpdateCoordinator for the OpenSky integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
7 from python_opensky
import OpenSky, OpenSkyError, StateVector
35 """An OpenSky Data Update Coordinator."""
37 config_entry: ConfigEntry
39 def __init__(self, hass: HomeAssistant, opensky: OpenSky) ->
None:
40 """Initialize the OpenSky data coordinator."""
48 }.
get(opensky.is_authenticated),
62 except OpenSkyError
as exc:
63 raise UpdateFailed
from exc
64 currently_tracked = set()
65 flight_metadata: dict[str, StateVector] = {}
66 for flight
in response.states:
67 if not flight.callsign:
69 callsign = flight.callsign.strip()
71 flight_metadata[callsign] = flight
75 flight.longitude
is None
76 or flight.latitude
is None
78 or flight.barometric_altitude
is None
81 altitude = flight.barometric_altitude
84 currently_tracked.add(callsign)
88 self.
_handle_boundary_handle_boundary(entries, EVENT_OPENSKY_ENTRY, flight_metadata)
89 self.
_handle_boundary_handle_boundary(exits, EVENT_OPENSKY_EXIT, flight_metadata)
92 return len(currently_tracked)
95 self, flights: set[str], event: str, metadata: dict[str, StateVector]
97 """Handle flights crossing region boundary."""
98 for flight
in flights:
99 if flight
in metadata:
100 altitude = metadata[flight].barometric_altitude
101 longitude = metadata[flight].longitude
102 latitude = metadata[flight].latitude
103 icao24 = metadata[flight].icao24
112 ATTR_CALLSIGN: flight,
113 ATTR_ALTITUDE: altitude,
115 ATTR_LONGITUDE: longitude,
116 ATTR_LATITUDE: latitude,
119 self.
hasshass.bus.fire(event, data)
None _handle_boundary(self, set[str] flights, str event, dict[str, StateVector] metadata)
int _async_update_data(self)
None __init__(self, HomeAssistant hass, OpenSky opensky)
web.Response get(self, web.Request request, str config_key)