Home Assistant Unofficial Reference 2024.12.1
device_tracker.py
Go to the documentation of this file.
1 """Support for APRS device tracking."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import threading
7 from typing import Any
8 
9 import aprslib
10 from aprslib import ConnectionError as AprsConnectionError, LoginError
11 import geopy.distance
12 import voluptuous as vol
13 
15  PLATFORM_SCHEMA as DEVICE_TRACKER_PLATFORM_SCHEMA,
16  SeeCallback,
17 )
18 from homeassistant.const import (
19  ATTR_GPS_ACCURACY,
20  ATTR_LATITUDE,
21  ATTR_LONGITUDE,
22  CONF_HOST,
23  CONF_PASSWORD,
24  CONF_TIMEOUT,
25  CONF_USERNAME,
26  EVENT_HOMEASSISTANT_STOP,
27 )
28 from homeassistant.core import Event, HomeAssistant
30 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
31 from homeassistant.util import slugify
32 
33 DOMAIN = "aprs"
34 
35 _LOGGER = logging.getLogger(__name__)
36 
37 ATTR_ALTITUDE = "altitude"
38 ATTR_COURSE = "course"
39 ATTR_COMMENT = "comment"
40 ATTR_FROM = "from"
41 ATTR_FORMAT = "format"
42 ATTR_OBJECT_NAME = "object_name"
43 ATTR_POS_AMBIGUITY = "posambiguity"
44 ATTR_SPEED = "speed"
45 
46 CONF_CALLSIGNS = "callsigns"
47 
48 DEFAULT_HOST = "rotate.aprs2.net"
49 DEFAULT_PASSWORD = "-1"
50 DEFAULT_TIMEOUT = 30.0
51 
52 FILTER_PORT = 14580
53 
54 MSG_FORMATS = ["compressed", "uncompressed", "mic-e", "object"]
55 
56 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
57  {
58  vol.Required(CONF_CALLSIGNS): cv.ensure_list,
59  vol.Required(CONF_USERNAME): cv.string,
60  vol.Optional(CONF_PASSWORD, default=DEFAULT_PASSWORD): cv.string,
61  vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
62  vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): vol.Coerce(float),
63  }
64 )
65 
66 
67 def make_filter(callsigns: list) -> str:
68  """Make a server-side filter from a list of callsigns."""
69  return " ".join(f"b/{sign.upper()}" for sign in callsigns)
70 
71 
72 def gps_accuracy(gps: tuple[float, float], posambiguity: int) -> int:
73  """Calculate the GPS accuracy based on APRS posambiguity."""
74 
75  pos_a_map = {0: 0, 1: 1 / 600, 2: 1 / 60, 3: 1 / 6, 4: 1}
76  if posambiguity in pos_a_map:
77  degrees = pos_a_map[posambiguity]
78 
79  gps2 = (gps[0], gps[1] + degrees)
80  dist_m: float = geopy.distance.distance(gps, gps2).m
81 
82  accuracy = round(dist_m)
83  else:
84  message = f"APRS position ambiguity must be 0-4, not '{posambiguity}'."
85  raise ValueError(message)
86 
87  return accuracy
88 
89 
91  hass: HomeAssistant,
92  config: ConfigType,
93  see: SeeCallback,
94  discovery_info: DiscoveryInfoType | None = None,
95 ) -> bool:
96  """Set up the APRS tracker."""
97  callsigns = config[CONF_CALLSIGNS]
98  server_filter = make_filter(callsigns)
99 
100  callsign = config[CONF_USERNAME]
101  password = config[CONF_PASSWORD]
102  host = config[CONF_HOST]
103  timeout = config[CONF_TIMEOUT]
104  aprs_listener = AprsListenerThread(callsign, password, host, server_filter, see)
105 
106  def aprs_disconnect(event: Event) -> None:
107  """Stop the APRS connection."""
108  aprs_listener.stop()
109 
110  aprs_listener.start()
111  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, aprs_disconnect)
112 
113  if not aprs_listener.start_event.wait(timeout):
114  _LOGGER.error("Timeout waiting for APRS to connect")
115  return False
116 
117  if not aprs_listener.start_success:
118  _LOGGER.error(aprs_listener.start_message)
119  return False
120 
121  _LOGGER.debug(aprs_listener.start_message)
122  return True
123 
124 
125 class AprsListenerThread(threading.Thread):
126  """APRS message listener."""
127 
128  def __init__(
129  self,
130  callsign: str,
131  password: str,
132  host: str,
133  server_filter: str,
134  see: SeeCallback,
135  ) -> None:
136  """Initialize the class."""
137  super().__init__()
138 
139  self.callsigncallsign = callsign
140  self.hosthost = host
141  self.start_eventstart_event = threading.Event()
142  self.seesee = see
143  self.server_filterserver_filter = server_filter
144  self.start_messagestart_message = ""
145  self.start_successstart_success = False
146 
147  self.aisais = aprslib.IS(
148  self.callsigncallsign, passwd=password, host=self.hosthost, port=FILTER_PORT
149  )
150 
151  def start_complete(self, success: bool, message: str) -> None:
152  """Complete startup process."""
153  self.start_messagestart_message = message
154  self.start_successstart_success = success
155  self.start_eventstart_event.set()
156 
157  def run(self) -> None:
158  """Connect to APRS and listen for data."""
159  self.aisais.set_filter(self.server_filterserver_filter)
160 
161  try:
162  _LOGGER.debug(
163  "Opening connection to %s with callsign %s", self.hosthost, self.callsigncallsign
164  )
165  self.aisais.connect()
166  self.start_completestart_complete(
167  True, f"Connected to {self.host} with callsign {self.callsign}."
168  )
169  self.aisais.consumer(callback=self.rx_msgrx_msg, immortal=True)
170  except (AprsConnectionError, LoginError) as err:
171  self.start_completestart_complete(False, str(err))
172  except OSError:
173  _LOGGER.debug(
174  "Closing connection to %s with callsign %s", self.hosthost, self.callsigncallsign
175  )
176 
177  def stop(self) -> None:
178  """Close the connection to the APRS network."""
179  self.aisais.close()
180 
181  def rx_msg(self, msg: dict[str, Any]) -> None:
182  """Receive message and process if position."""
183  _LOGGER.debug("APRS message received: %s", str(msg))
184  if msg[ATTR_FORMAT] in MSG_FORMATS:
185  if msg[ATTR_FORMAT] == "object":
186  dev_id = slugify(msg[ATTR_OBJECT_NAME])
187  else:
188  dev_id = slugify(msg[ATTR_FROM])
189  lat = msg[ATTR_LATITUDE]
190  lon = msg[ATTR_LONGITUDE]
191 
192  attrs = {}
193  if ATTR_POS_AMBIGUITY in msg:
194  pos_amb = msg[ATTR_POS_AMBIGUITY]
195  try:
196  attrs[ATTR_GPS_ACCURACY] = gps_accuracy((lat, lon), pos_amb)
197  except ValueError:
198  _LOGGER.warning(
199  "APRS message contained invalid posambiguity: %s", str(pos_amb)
200  )
201  for attr in (ATTR_ALTITUDE, ATTR_COMMENT, ATTR_COURSE, ATTR_SPEED):
202  if attr in msg:
203  attrs[attr] = msg[attr]
204 
205  self.seesee(dev_id=dev_id, gps=(lat, lon), attributes=attrs)
None __init__(self, str callsign, str password, str host, str server_filter, SeeCallback see)
bool setup_scanner(HomeAssistant hass, ConfigType config, SeeCallback see, DiscoveryInfoType|None discovery_info=None)
int gps_accuracy(tuple[float, float] gps, int posambiguity)