Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for Traccar Server."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from datetime import datetime
7 from logging import DEBUG as LOG_LEVEL_DEBUG
8 from typing import TYPE_CHECKING, Any, TypedDict
9 
10 from pytraccar import (
11  ApiClient,
12  DeviceModel,
13  GeofenceModel,
14  PositionModel,
15  SubscriptionData,
16  TraccarException,
17 )
18 
19 from homeassistant.config_entries import ConfigEntry
20 from homeassistant.core import HomeAssistant
21 from homeassistant.helpers.dispatcher import async_dispatcher_send
22 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
23 from homeassistant.util import dt as dt_util
24 
25 from .const import DOMAIN, EVENTS, LOGGER
26 from .helpers import get_device, get_first_geofence
27 
28 
30  """Traccar Server coordinator data."""
31 
32  device: DeviceModel
33  geofence: GeofenceModel | None
34  position: PositionModel
35  attributes: dict[str, Any]
36 
37 
38 type TraccarServerCoordinatorData = dict[int, TraccarServerCoordinatorDataDevice]
39 
40 
41 class TraccarServerCoordinator(DataUpdateCoordinator[TraccarServerCoordinatorData]):
42  """Class to manage fetching Traccar Server data."""
43 
44  config_entry: ConfigEntry
45 
46  def __init__(
47  self,
48  hass: HomeAssistant,
49  client: ApiClient,
50  *,
51  events: list[str],
52  max_accuracy: float,
53  skip_accuracy_filter_for: list[str],
54  custom_attributes: list[str],
55  ) -> None:
56  """Initialize global Traccar Server data updater."""
57  super().__init__(
58  hass=hass,
59  logger=LOGGER,
60  name=DOMAIN,
61  update_interval=None,
62  )
63  self.clientclient = client
64  self.custom_attributescustom_attributes = custom_attributes
65  self.eventsevents = events
66  self.max_accuracymax_accuracy = max_accuracy
67  self.skip_accuracy_filter_forskip_accuracy_filter_for = skip_accuracy_filter_for
68  self._geofences_geofences: list[GeofenceModel] = []
69  self._last_event_import_last_event_import: datetime | None = None
70  self._should_log_subscription_error_should_log_subscription_error: bool = True
71 
72  async def _async_update_data(self) -> TraccarServerCoordinatorData:
73  """Fetch data from Traccar Server."""
74  LOGGER.debug("Updating device data")
75  data: TraccarServerCoordinatorData = {}
76  try:
77  (
78  devices,
79  positions,
80  geofences,
81  ) = await asyncio.gather(
82  self.clientclient.get_devices(),
83  self.clientclient.get_positions(),
84  self.clientclient.get_geofences(),
85  )
86  except TraccarException as ex:
87  raise UpdateFailed(f"Error while updating device data: {ex}") from ex
88 
89  if TYPE_CHECKING:
90  assert isinstance(devices, list[DeviceModel]) # type: ignore[misc]
91  assert isinstance(positions, list[PositionModel]) # type: ignore[misc]
92  assert isinstance(geofences, list[GeofenceModel]) # type: ignore[misc]
93 
94  self._geofences_geofences = geofences
95 
96  if self.loggerlogger.isEnabledFor(LOG_LEVEL_DEBUG):
97  self.loggerlogger.debug("Received devices: %s", devices)
98  self.loggerlogger.debug("Received positions: %s", positions)
99 
100  for position in positions:
101  device_id = position["deviceId"]
102  if (device := get_device(device_id, devices)) is None:
103  self.loggerlogger.debug(
104  "Device %s not found for position: %s",
105  device_id,
106  position["id"],
107  )
108  continue
109 
110  if (
111  attr
112  := self._return_custom_attributes_if_not_filtered_by_accuracy_configuration_return_custom_attributes_if_not_filtered_by_accuracy_configuration(
113  device, position
114  )
115  ) is None:
116  self.loggerlogger.debug(
117  "Skipping position update %s for %s due to accuracy filter",
118  position["id"],
119  device_id,
120  )
121  continue
122 
123  data[device_id] = {
124  "device": device,
125  "geofence": get_first_geofence(
126  geofences,
127  position["geofenceIds"] or [],
128  ),
129  "position": position,
130  "attributes": attr,
131  }
132 
133  return data
134 
135  async def handle_subscription_data(self, data: SubscriptionData) -> None:
136  """Handle subscription data."""
137  self.loggerlogger.debug("Received subscription data: %s", data)
138  self._should_log_subscription_error_should_log_subscription_error = True
139  update_devices = set()
140  for device in data.get("devices") or []:
141  if (device_id := device["id"]) not in self.datadata:
142  self.loggerlogger.debug("Device %s not found in data", device_id)
143  continue
144 
145  if (
146  attr
147  := self._return_custom_attributes_if_not_filtered_by_accuracy_configuration_return_custom_attributes_if_not_filtered_by_accuracy_configuration(
148  device, self.datadata[device_id]["position"]
149  )
150  ) is None:
151  continue
152 
153  self.datadata[device_id]["device"] = device
154  self.datadata[device_id]["attributes"] = attr
155  update_devices.add(device_id)
156 
157  for position in data.get("positions") or []:
158  if (device_id := position["deviceId"]) not in self.datadata:
159  self.loggerlogger.debug(
160  "Device %s for position %s not found in data",
161  device_id,
162  position["id"],
163  )
164  continue
165 
166  if (
167  attr
168  := self._return_custom_attributes_if_not_filtered_by_accuracy_configuration_return_custom_attributes_if_not_filtered_by_accuracy_configuration(
169  self.datadata[device_id]["device"], position
170  )
171  ) is None:
172  self.loggerlogger.debug(
173  "Skipping position update %s for %s due to accuracy filter",
174  position["id"],
175  device_id,
176  )
177  continue
178 
179  self.datadata[device_id]["position"] = position
180  self.datadata[device_id]["attributes"] = attr
181  self.datadata[device_id]["geofence"] = get_first_geofence(
182  self._geofences_geofences,
183  position["geofenceIds"] or [],
184  )
185  update_devices.add(device_id)
186 
187  for device_id in update_devices:
188  async_dispatcher_send(self.hasshass, f"{DOMAIN}_{device_id}")
189 
190  async def import_events(self, _: datetime) -> None:
191  """Import events from Traccar."""
192  start_time = dt_util.utcnow().replace(tzinfo=None)
193  end_time = None
194 
195  if self._last_event_import_last_event_import is not None:
196  end_time = start_time - (start_time - self._last_event_import_last_event_import)
197 
198  events = await self.clientclient.get_reports_events(
199  devices=list(self.datadata),
200  start_time=start_time,
201  end_time=end_time,
202  event_types=self.eventsevents,
203  )
204  if not events:
205  return
206 
207  self._last_event_import_last_event_import = start_time
208  for event in events:
209  device = self.datadata[event["deviceId"]]["device"]
210  self.hasshass.bus.async_fire(
211  # This goes against two of the HA core guidelines:
212  # 1. Event names should be prefixed with the domain name of
213  # the integration
214  # 2. This should be event entities
215  #
216  # However, to not break it for those who currently use
217  # the "old" integration, this is kept as is.
218  f"traccar_{EVENTS[event['type']]}",
219  {
220  "device_traccar_id": event["deviceId"],
221  "device_name": device["name"] if device else None,
222  "type": event["type"],
223  "serverTime": event["eventTime"],
224  "attributes": event["attributes"],
225  },
226  )
227 
228  async def subscribe(self) -> None:
229  """Subscribe to events."""
230  try:
231  await self.clientclient.subscribe(self.handle_subscription_datahandle_subscription_data)
232  except TraccarException as ex:
233  if self._should_log_subscription_error_should_log_subscription_error:
234  self._should_log_subscription_error_should_log_subscription_error = False
235  LOGGER.error("Error while subscribing to Traccar: %s", ex)
236  # Retry after 10 seconds
237  await asyncio.sleep(10)
238  await self.subscribesubscribe()
239 
241  self,
242  device: DeviceModel,
243  position: PositionModel,
244  ) -> dict[str, Any] | None:
245  """Return a dictionary of custom attributes if not filtered by accuracy configuration."""
246  attr = {}
247  skip_accuracy_filter = False
248 
249  for custom_attr in self.custom_attributescustom_attributes:
250  if custom_attr in self.skip_accuracy_filter_forskip_accuracy_filter_for:
251  skip_accuracy_filter = True
252  attr[custom_attr] = device["attributes"].get(
253  custom_attr,
254  position["attributes"].get(custom_attr, None),
255  )
256 
257  accuracy = position["accuracy"] or 0.0
258  if (
259  not skip_accuracy_filter
260  and self.max_accuracymax_accuracy > 0
261  and accuracy > self.max_accuracymax_accuracy
262  ):
263  return None
264  return attr
dict[str, Any]|None _return_custom_attributes_if_not_filtered_by_accuracy_configuration(self, DeviceModel device, PositionModel position)
Definition: coordinator.py:244
None __init__(self, HomeAssistant hass, ApiClient client, *list[str] events, float max_accuracy, list[str] skip_accuracy_filter_for, list[str] custom_attributes)
Definition: coordinator.py:55
DeviceEntry get_device(HomeAssistant hass, str unique_id)
Definition: util.py:12
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
GeofenceModel|None get_first_geofence(list[GeofenceModel] geofences, list[int] target)
Definition: helpers.py:19
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193