1 """Support for GPS tracking MQTT enabled devices."""
3 from __future__
import annotations
8 import voluptuous
as vol
12 PLATFORM_SCHEMA
as DEVICE_TRACKER_PLATFORM_SCHEMA,
27 _LOGGER = logging.getLogger(__name__)
29 GPS_JSON_PAYLOAD_SCHEMA = vol.Schema(
31 vol.Required(ATTR_LATITUDE): vol.Coerce(float),
32 vol.Required(ATTR_LONGITUDE): vol.Coerce(float),
33 vol.Optional(ATTR_GPS_ACCURACY): vol.Coerce(int),
34 vol.Optional(ATTR_BATTERY_LEVEL): vol.Coerce(str),
36 extra=vol.ALLOW_EXTRA,
39 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(mqtt.config.SCHEMA_BASE).extend(
40 {vol.Required(CONF_DEVICES): {cv.string: mqtt.valid_subscribe_topic}}
47 async_see: AsyncSeeCallback,
48 discovery_info: DiscoveryInfoType |
None =
None,
50 """Set up the MQTT JSON tracker."""
54 if not await mqtt.async_wait_for_mqtt_client(hass):
55 _LOGGER.error(
"MQTT integration is not available")
58 devices = config[CONF_DEVICES]
59 qos = config[CONF_QOS]
61 for dev_id, topic
in devices.items():
64 def async_message_received(msg, dev_id=dev_id):
65 """Handle received MQTT message."""
68 except vol.MultipleInvalid:
71 "Skipping update for following data "
72 "because of missing or malformatted data: %s"
78 _LOGGER.error(
"Error parsing JSON payload: %s", msg.payload)
82 hass.async_create_task(async_see(**kwargs))
84 await mqtt.async_subscribe(hass, topic, async_message_received, qos)
90 """Parse the payload location parameters, into the format see expects."""
91 kwargs = {
"gps": (data[ATTR_LATITUDE], data[ATTR_LONGITUDE]),
"dev_id": dev_id}
93 if ATTR_GPS_ACCURACY
in data:
94 kwargs[ATTR_GPS_ACCURACY] = data[ATTR_GPS_ACCURACY]
95 if ATTR_BATTERY_LEVEL
in data:
96 kwargs[
"battery"] = data[ATTR_BATTERY_LEVEL]
bool async_setup_scanner(HomeAssistant hass, ConfigType config, AsyncSeeCallback async_see, DiscoveryInfoType|None discovery_info=None)
def _parse_see_args(dev_id, data)