1 """Support for APRS device tracking."""
3 from __future__
import annotations
10 from aprslib
import ConnectionError
as AprsConnectionError, LoginError
12 import voluptuous
as vol
15 PLATFORM_SCHEMA
as DEVICE_TRACKER_PLATFORM_SCHEMA,
26 EVENT_HOMEASSISTANT_STOP,
35 _LOGGER = logging.getLogger(__name__)
37 ATTR_ALTITUDE =
"altitude"
38 ATTR_COURSE =
"course"
39 ATTR_COMMENT =
"comment"
41 ATTR_FORMAT =
"format"
42 ATTR_OBJECT_NAME =
"object_name"
43 ATTR_POS_AMBIGUITY =
"posambiguity"
46 CONF_CALLSIGNS =
"callsigns"
48 DEFAULT_HOST =
"rotate.aprs2.net"
49 DEFAULT_PASSWORD =
"-1"
50 DEFAULT_TIMEOUT = 30.0
54 MSG_FORMATS = [
"compressed",
"uncompressed",
"mic-e",
"object"]
56 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
58 vol.Required(CONF_CALLSIGNS): cv.ensure_list,
59 vol.Required(CONF_USERNAME): cv.string,
60 vol.Optional(CONF_PASSWORD, default=DEFAULT_PASSWORD): cv.string,
61 vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
62 vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): vol.Coerce(float),
68 """Make a server-side filter from a list of callsigns."""
69 return " ".join(f
"b/{sign.upper()}" for sign
in callsigns)
72 def gps_accuracy(gps: tuple[float, float], posambiguity: int) -> int:
73 """Calculate the GPS accuracy based on APRS posambiguity."""
75 pos_a_map = {0: 0, 1: 1 / 600, 2: 1 / 60, 3: 1 / 6, 4: 1}
76 if posambiguity
in pos_a_map:
77 degrees = pos_a_map[posambiguity]
79 gps2 = (gps[0], gps[1] + degrees)
80 dist_m: float = geopy.distance.distance(gps, gps2).m
82 accuracy = round(dist_m)
84 message = f
"APRS position ambiguity must be 0-4, not '{posambiguity}'."
85 raise ValueError(message)
94 discovery_info: DiscoveryInfoType |
None =
None,
96 """Set up the APRS tracker."""
97 callsigns = config[CONF_CALLSIGNS]
100 callsign = config[CONF_USERNAME]
101 password = config[CONF_PASSWORD]
102 host = config[CONF_HOST]
103 timeout = config[CONF_TIMEOUT]
106 def aprs_disconnect(event: Event) ->
None:
107 """Stop the APRS connection."""
110 aprs_listener.start()
111 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, aprs_disconnect)
113 if not aprs_listener.start_event.wait(timeout):
114 _LOGGER.error(
"Timeout waiting for APRS to connect")
117 if not aprs_listener.start_success:
118 _LOGGER.error(aprs_listener.start_message)
121 _LOGGER.debug(aprs_listener.start_message)
126 """APRS message listener."""
136 """Initialize the class."""
148 self.
callsigncallsign, passwd=password, host=self.
hosthost, port=FILTER_PORT
152 """Complete startup process."""
158 """Connect to APRS and listen for data."""
163 "Opening connection to %s with callsign %s", self.
hosthost, self.
callsigncallsign
165 self.
aisais.connect()
167 True, f
"Connected to {self.host} with callsign {self.callsign}."
169 self.
aisais.consumer(callback=self.
rx_msgrx_msg, immortal=
True)
170 except (AprsConnectionError, LoginError)
as err:
174 "Closing connection to %s with callsign %s", self.
hosthost, self.
callsigncallsign
178 """Close the connection to the APRS network."""
181 def rx_msg(self, msg: dict[str, Any]) ->
None:
182 """Receive message and process if position."""
183 _LOGGER.debug(
"APRS message received: %s",
str(msg))
184 if msg[ATTR_FORMAT]
in MSG_FORMATS:
185 if msg[ATTR_FORMAT] ==
"object":
186 dev_id =
slugify(msg[ATTR_OBJECT_NAME])
188 dev_id =
slugify(msg[ATTR_FROM])
189 lat = msg[ATTR_LATITUDE]
190 lon = msg[ATTR_LONGITUDE]
193 if ATTR_POS_AMBIGUITY
in msg:
194 pos_amb = msg[ATTR_POS_AMBIGUITY]
196 attrs[ATTR_GPS_ACCURACY] =
gps_accuracy((lat, lon), pos_amb)
199 "APRS message contained invalid posambiguity: %s",
str(pos_amb)
201 for attr
in (ATTR_ALTITUDE, ATTR_COMMENT, ATTR_COURSE, ATTR_SPEED):
203 attrs[attr] = msg[attr]
205 self.
seesee(dev_id=dev_id, gps=(lat, lon), attributes=attrs)
None rx_msg(self, dict[str, Any] msg)
None __init__(self, str callsign, str password, str host, str server_filter, SeeCallback see)
None start_complete(self, bool success, str message)
bool setup_scanner(HomeAssistant hass, ConfigType config, SeeCallback see, DiscoveryInfoType|None discovery_info=None)
int gps_accuracy(tuple[float, float] gps, int posambiguity)
str make_filter(list callsigns)