Home Assistant Unofficial Reference 2024.12.1
messages.py
Go to the documentation of this file.
1 """OwnTracks Message handlers."""
2 
3 import json
4 import logging
5 
6 from nacl.encoding import Base64Encoder
7 from nacl.secret import SecretBox
8 
9 from homeassistant.components import zone as zone_comp
10 from homeassistant.components.device_tracker import SourceType
11 from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, STATE_HOME
12 from homeassistant.util import decorator, slugify
13 
14 from .helper import supports_encryption
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 HANDLERS = decorator.Registry() # type: ignore[var-annotated]
19 
20 
21 def get_cipher():
22  """Return decryption function and length of key.
23 
24  Async friendly.
25  """
26 
27  def decrypt(ciphertext, key):
28  """Decrypt ciphertext using key."""
29  return SecretBox(key).decrypt(ciphertext, encoder=Base64Encoder)
30 
31  return (SecretBox.KEY_SIZE, decrypt)
32 
33 
34 def _parse_topic(topic, subscribe_topic):
35  """Parse an MQTT topic {sub_topic}/user/dev, return (user, dev) tuple.
36 
37  Async friendly.
38  """
39  subscription = subscribe_topic.split("/")
40  try:
41  user_index = subscription.index("#")
42  except ValueError:
43  _LOGGER.error("Can't parse subscription topic: '%s'", subscribe_topic)
44  raise
45 
46  topic_list = topic.split("/")
47  try:
48  user, device = topic_list[user_index], topic_list[user_index + 1]
49  except IndexError:
50  _LOGGER.error("Can't parse topic: '%s'", topic)
51  raise
52 
53  return user, device
54 
55 
56 def _parse_see_args(message, subscribe_topic):
57  """Parse the OwnTracks location parameters, into the format see expects.
58 
59  Async friendly.
60  """
61  user, device = _parse_topic(message["topic"], subscribe_topic)
62  dev_id = slugify(f"{user}_{device}")
63  kwargs = {"dev_id": dev_id, "host_name": user, "attributes": {}}
64  if message["lat"] is not None and message["lon"] is not None:
65  kwargs["gps"] = (message["lat"], message["lon"])
66  else:
67  kwargs["gps"] = None
68 
69  if "acc" in message:
70  kwargs["gps_accuracy"] = message["acc"]
71  if "batt" in message:
72  kwargs["battery"] = message["batt"]
73  if "vel" in message:
74  kwargs["attributes"]["velocity"] = message["vel"]
75  if "tid" in message:
76  kwargs["attributes"]["tid"] = message["tid"]
77  if "addr" in message:
78  kwargs["attributes"]["address"] = message["addr"]
79  if "cog" in message:
80  kwargs["attributes"]["course"] = message["cog"]
81  if "bs" in message:
82  kwargs["attributes"]["battery_status"] = message["bs"]
83  if "t" in message:
84  if message["t"] in ("c", "u"):
85  kwargs["source_type"] = SourceType.GPS
86  if message["t"] == "b":
87  kwargs["source_type"] = SourceType.BLUETOOTH_LE
88 
89  return dev_id, kwargs
90 
91 
92 def _set_gps_from_zone(kwargs, location, zone):
93  """Set the see parameters from the zone parameters.
94 
95  Async friendly.
96  """
97  if zone is not None:
98  kwargs["gps"] = (
99  zone.attributes[ATTR_LATITUDE],
100  zone.attributes[ATTR_LONGITUDE],
101  )
102  kwargs["gps_accuracy"] = zone.attributes["radius"]
103  kwargs["location_name"] = location
104  return kwargs
105 
106 
107 def _decrypt_payload(secret, topic, ciphertext):
108  """Decrypt encrypted payload."""
109  try:
110  if supports_encryption():
111  keylen, decrypt = get_cipher()
112  else:
113  _LOGGER.warning("Ignoring encrypted payload because nacl not installed")
114  return None
115  except OSError:
116  _LOGGER.warning("Ignoring encrypted payload because nacl not installed")
117  return None
118 
119  if isinstance(secret, dict):
120  key = secret.get(topic)
121  else:
122  key = secret
123 
124  if key is None:
125  _LOGGER.warning(
126  "Ignoring encrypted payload because no decryption key known for topic %s",
127  topic,
128  )
129  return None
130 
131  key = key.encode("utf-8")
132  key = key[:keylen]
133  key = key.ljust(keylen, b"\0")
134 
135  try:
136  message = decrypt(ciphertext, key)
137  message = message.decode("utf-8")
138  except ValueError:
139  _LOGGER.warning(
140  (
141  "Ignoring encrypted payload because unable to decrypt using key for"
142  " topic %s"
143  ),
144  topic,
145  )
146  return None
147  _LOGGER.debug("Decrypted payload: %s", message)
148  return message
149 
150 
151 def encrypt_message(secret, topic, message):
152  """Encrypt message."""
153 
154  keylen = SecretBox.KEY_SIZE
155 
156  if isinstance(secret, dict):
157  key = secret.get(topic)
158  else:
159  key = secret
160 
161  if key is None:
162  _LOGGER.warning(
163  "Unable to encrypt payload because no decryption key known for topic %s",
164  topic,
165  )
166  return None
167 
168  key = key.encode("utf-8")
169  key = key[:keylen]
170  key = key.ljust(keylen, b"\0")
171 
172  try:
173  message = message.encode("utf-8")
174  payload = SecretBox(key).encrypt(message, encoder=Base64Encoder)
175  _LOGGER.debug("Encrypted message: %s to %s", message, payload)
176  return payload.decode("utf-8")
177  except ValueError:
178  _LOGGER.warning("Unable to encrypt message for topic %s", topic)
179  return None
180 
181 
182 @HANDLERS.register("location")
183 async def async_handle_location_message(hass, context, message):
184  """Handle a location message."""
185  if not context.async_valid_accuracy(message):
186  return
187 
188  if context.events_only:
189  _LOGGER.debug("Location update ignored due to events_only setting")
190  return
191 
192  dev_id, kwargs = _parse_see_args(message, context.mqtt_topic)
193 
194  if context.regions_entered[dev_id]:
195  _LOGGER.debug(
196  "Location update ignored, inside region %s", context.regions_entered[-1]
197  )
198  return
199 
200  context.async_see(**kwargs)
201  context.async_see_beacons(hass, dev_id, kwargs)
202 
203 
204 async def _async_transition_message_enter(hass, context, message, location):
205  """Execute enter event."""
206  zone = hass.states.get(f"zone.{slugify(location)}")
207  dev_id, kwargs = _parse_see_args(message, context.mqtt_topic)
208 
209  if zone is None and message.get("t") == "b":
210  # Not a HA zone, and a beacon so mobile beacon.
211  # kwargs will contain the lat/lon of the beacon
212  # which is not where the beacon actually is
213  # and is probably set to 0/0
214  beacons = context.mobile_beacons_active[dev_id]
215  if location not in beacons:
216  beacons.add(location)
217  _LOGGER.debug("Added beacon %s", location)
218  context.async_see_beacons(hass, dev_id, kwargs)
219  else:
220  # Normal region
221  regions = context.regions_entered[dev_id]
222  if location not in regions:
223  regions.append(location)
224  _LOGGER.debug("Enter region %s", location)
225  _set_gps_from_zone(kwargs, location, zone)
226  context.async_see(**kwargs)
227  context.async_see_beacons(hass, dev_id, kwargs)
228 
229 
230 async def _async_transition_message_leave(hass, context, message, location):
231  """Execute leave event."""
232  dev_id, kwargs = _parse_see_args(message, context.mqtt_topic)
233  regions = context.regions_entered[dev_id]
234 
235  if location in regions:
236  regions.remove(location)
237 
238  beacons = context.mobile_beacons_active[dev_id]
239  if location in beacons:
240  beacons.remove(location)
241  _LOGGER.debug("Remove beacon %s", location)
242  context.async_see_beacons(hass, dev_id, kwargs)
243  else:
244  new_region = regions[-1] if regions else None
245  if new_region:
246  # Exit to previous region
247  zone = hass.states.get(f"zone.{slugify(new_region)}")
248  _set_gps_from_zone(kwargs, new_region, zone)
249  _LOGGER.debug("Exit to %s", new_region)
250  context.async_see(**kwargs)
251  context.async_see_beacons(hass, dev_id, kwargs)
252  return
253 
254  _LOGGER.debug("Exit to GPS")
255 
256  # Check for GPS accuracy
257  if context.async_valid_accuracy(message):
258  context.async_see(**kwargs)
259  context.async_see_beacons(hass, dev_id, kwargs)
260 
261 
262 @HANDLERS.register("transition")
263 async def async_handle_transition_message(hass, context, message):
264  """Handle a transition message."""
265  if message.get("desc") is None:
266  _LOGGER.error(
267  "Location missing from `Entering/Leaving` message - "
268  "please turn `Share` on in OwnTracks app"
269  )
270  return
271  # OwnTracks uses - at the start of a beacon zone
272  # to switch on 'hold mode' - ignore this
273  location = message["desc"].lstrip("-")
274 
275  # Create a layer of indirection for Owntracks instances that may name
276  # regions differently than their HA names
277  if location in context.region_mapping:
278  location = context.region_mapping[location]
279 
280  if location.lower() == "home":
281  location = STATE_HOME
282 
283  if message["event"] == "enter":
284  await _async_transition_message_enter(hass, context, message, location)
285  elif message["event"] == "leave":
286  await _async_transition_message_leave(hass, context, message, location)
287  else:
288  _LOGGER.error(
289  "Misformatted mqtt msgs, _type=transition, event=%s", message["event"]
290  )
291 
292 
293 async def async_handle_waypoint(hass, name_base, waypoint):
294  """Handle a waypoint."""
295  name = waypoint["desc"]
296  pretty_name = f"{name_base} - {name}"
297  lat = waypoint["lat"]
298  lon = waypoint["lon"]
299  rad = waypoint["rad"]
300 
301  # check zone exists
302  entity_id = zone_comp.ENTITY_ID_FORMAT.format(slugify(pretty_name))
303 
304  # Check if state already exists
305  if hass.states.get(entity_id) is not None:
306  return
307 
308  zone = zone_comp.Zone.from_yaml(
309  {
310  zone_comp.CONF_NAME: pretty_name,
311  zone_comp.CONF_LATITUDE: lat,
312  zone_comp.CONF_LONGITUDE: lon,
313  zone_comp.CONF_RADIUS: rad,
314  zone_comp.CONF_ICON: zone_comp.ICON_IMPORT,
315  zone_comp.CONF_PASSIVE: False,
316  },
317  )
318  zone.hass = hass
319  zone.entity_id = entity_id
320  zone.async_write_ha_state()
321 
322 
323 @HANDLERS.register("waypoint")
324 @HANDLERS.register("waypoints")
325 async def async_handle_waypoints_message(hass, context, message):
326  """Handle a waypoints message."""
327  if not context.import_waypoints:
328  return
329 
330  if context.waypoint_whitelist is not None:
331  user = _parse_topic(message["topic"], context.mqtt_topic)[0]
332 
333  if user not in context.waypoint_whitelist:
334  return
335 
336  wayps = message.get("waypoints", [message])
337 
338  _LOGGER.debug("Got %d waypoints from %s", len(wayps), message["topic"])
339 
340  name_base = " ".join(_parse_topic(message["topic"], context.mqtt_topic))
341 
342  for wayp in wayps:
343  await async_handle_waypoint(hass, name_base, wayp)
344 
345 
346 @HANDLERS.register("encrypted")
347 async def async_handle_encrypted_message(hass, context, message):
348  """Handle an encrypted message."""
349  if "topic" not in message and isinstance(context.secret, dict):
350  _LOGGER.error("You cannot set per topic secrets when using HTTP")
351  return
352 
353  plaintext_payload = _decrypt_payload(
354  context.secret, message.get("topic"), message["data"]
355  )
356 
357  if plaintext_payload is None:
358  return
359 
360  decrypted = json.loads(plaintext_payload)
361  if "topic" in message and "topic" not in decrypted:
362  decrypted["topic"] = message["topic"]
363 
364  await async_handle_message(hass, context, decrypted)
365 
366 
367 @HANDLERS.register("lwt")
368 @HANDLERS.register("configuration")
369 @HANDLERS.register("beacon")
370 @HANDLERS.register("cmd")
371 @HANDLERS.register("steps")
372 @HANDLERS.register("card")
373 async def async_handle_not_impl_msg(hass, context, message):
374  """Handle valid but not implemented message types."""
375  _LOGGER.debug("Not handling %s message: %s", message.get("_type"), message)
376 
377 
378 async def async_handle_unsupported_msg(hass, context, message):
379  """Handle an unsupported or invalid message type."""
380  _LOGGER.warning("Received unsupported message type: %s", message.get("_type"))
381 
382 
383 async def async_handle_message(hass, context, message):
384  """Handle an OwnTracks message."""
385  msgtype = message.get("_type")
386 
387  _LOGGER.debug("Received %s", message)
388 
389  handler = HANDLERS.get(msgtype, async_handle_unsupported_msg)
390 
391  await handler(hass, context, message)
def async_handle_unsupported_msg(hass, context, message)
Definition: messages.py:378
def _parse_see_args(message, subscribe_topic)
Definition: messages.py:56
def _parse_topic(topic, subscribe_topic)
Definition: messages.py:34
def async_handle_waypoint(hass, name_base, waypoint)
Definition: messages.py:293
def _set_gps_from_zone(kwargs, location, zone)
Definition: messages.py:92
def async_handle_transition_message(hass, context, message)
Definition: messages.py:263
def async_handle_message(hass, context, message)
Definition: messages.py:383
def _async_transition_message_leave(hass, context, message, location)
Definition: messages.py:230
def async_handle_encrypted_message(hass, context, message)
Definition: messages.py:347
def async_handle_waypoints_message(hass, context, message)
Definition: messages.py:325
def _decrypt_payload(secret, topic, ciphertext)
Definition: messages.py:107
def _async_transition_message_enter(hass, context, message, location)
Definition: messages.py:204
def async_handle_location_message(hass, context, message)
Definition: messages.py:183
def async_handle_not_impl_msg(hass, context, message)
Definition: messages.py:373
def encrypt_message(secret, topic, message)
Definition: messages.py:151