Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Proximity integration."""
2 
3 from collections import defaultdict
4 from dataclasses import dataclass
5 import logging
6 from typing import cast
7 
8 from homeassistant.components.zone import DOMAIN as ZONE_DOMAIN
9 from homeassistant.config_entries import ConfigEntry
10 from homeassistant.const import (
11  ATTR_LATITUDE,
12  ATTR_LONGITUDE,
13  ATTR_NAME,
14  CONF_UNIT_OF_MEASUREMENT,
15  CONF_ZONE,
16 )
17 from homeassistant.core import (
18  Event,
19  EventStateChangedData,
20  HomeAssistant,
21  State,
22  callback,
23 )
24 from homeassistant.helpers import entity_registry as er
25 from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
26 from homeassistant.helpers.typing import ConfigType
27 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
28 from homeassistant.util.location import distance
29 
30 from .const import (
31  ATTR_DIR_OF_TRAVEL,
32  ATTR_DIST_TO,
33  ATTR_IN_IGNORED_ZONE,
34  ATTR_NEAREST,
35  CONF_IGNORED_ZONES,
36  CONF_TOLERANCE,
37  CONF_TRACKED_ENTITIES,
38  DEFAULT_DIR_OF_TRAVEL,
39  DEFAULT_DIST_TO_ZONE,
40  DEFAULT_NEAREST,
41  DOMAIN,
42 )
43 
44 _LOGGER = logging.getLogger(__name__)
45 
46 type ProximityConfigEntry = ConfigEntry[ProximityDataUpdateCoordinator]
47 
48 
49 @dataclass
51  """StateChangedData class."""
52 
53  entity_id: str
54  old_state: State | None
55  new_state: State | None
56 
57 
58 @dataclass
60  """ProximityCoordinatorData class."""
61 
62  proximity: dict[str, str | int | None]
63  entities: dict[str, dict[str, str | int | None]]
64 
65 
66 DEFAULT_PROXIMITY_DATA: dict[str, str | int | None] = {
67  ATTR_DIST_TO: DEFAULT_DIST_TO_ZONE,
68  ATTR_DIR_OF_TRAVEL: DEFAULT_DIR_OF_TRAVEL,
69  ATTR_NEAREST: DEFAULT_NEAREST,
70 }
71 
72 
74  """Proximity data update coordinator."""
75 
76  config_entry: ProximityConfigEntry
77 
78  def __init__(
79  self, hass: HomeAssistant, friendly_name: str, config: ConfigType
80  ) -> None:
81  """Initialize the Proximity coordinator."""
82  self.ignored_zone_ids: list[str] = config[CONF_IGNORED_ZONES]
83  self.tracked_entities: list[str] = config[CONF_TRACKED_ENTITIES]
84  self.tolerance: int = config[CONF_TOLERANCE]
85  self.proximity_zone_id: str = config[CONF_ZONE]
86  self.proximity_zone_name: str = self.proximity_zone_id.split(".")[-1]
87  self.unit_of_measurement: str = config.get(
88  CONF_UNIT_OF_MEASUREMENT, hass.config.units.length_unit
89  )
90  self.entity_mapping: dict[str, list[str]] = defaultdict(list)
91 
92  super().__init__(
93  hass,
94  _LOGGER,
95  name=friendly_name,
96  update_interval=None,
97  )
98 
99  self.datadatadata = ProximityData(DEFAULT_PROXIMITY_DATA, {})
100 
101  self.state_change_datastate_change_data: StateChangedData | None = None
102 
103  @callback
104  def async_add_entity_mapping(self, tracked_entity_id: str, entity_id: str) -> None:
105  """Add an tracked entity to proximity entity mapping."""
106  self.entity_mapping[tracked_entity_id].append(entity_id)
107 
109  self,
110  event: Event[EventStateChangedData],
111  ) -> None:
112  """Fetch and process state change event."""
113  data = event.data
114  self.state_change_datastate_change_data = StateChangedData(
115  data["entity_id"], data["old_state"], data["new_state"]
116  )
117  await self.async_refreshasync_refresh()
118 
120  self, event: Event[er.EventEntityRegistryUpdatedData]
121  ) -> None:
122  """Fetch and process tracked entity change event."""
123  data = event.data
124  if data["action"] == "remove":
125  self._create_removed_tracked_entity_issue_create_removed_tracked_entity_issue(data["entity_id"])
126 
127  if data["action"] == "update" and "entity_id" in data["changes"]:
128  old_tracked_entity_id = data["old_entity_id"]
129  new_tracked_entity_id = data["entity_id"]
130 
131  self.hasshass.config_entries.async_update_entry(
132  self.config_entryconfig_entry,
133  data={
134  **self.config_entryconfig_entry.data,
135  CONF_TRACKED_ENTITIES: [
136  tracked_entity
137  for tracked_entity in (
138  *self.tracked_entities,
139  new_tracked_entity_id,
140  )
141  if tracked_entity != old_tracked_entity_id
142  ],
143  },
144  )
145 
147  self,
148  zone: State,
149  device: State,
150  latitude: float | None,
151  longitude: float | None,
152  ) -> int | None:
153  if device.state.lower() == self.proximity_zone_name.lower():
154  _LOGGER.debug(
155  "%s: %s in zone -> distance=0",
156  self.namename,
157  device.entity_id,
158  )
159  return 0
160 
161  if latitude is None or longitude is None:
162  _LOGGER.debug(
163  "%s: %s has no coordinates -> distance=None",
164  self.namename,
165  device.entity_id,
166  )
167  return None
168 
169  distance_to_zone = distance(
170  zone.attributes[ATTR_LATITUDE],
171  zone.attributes[ATTR_LONGITUDE],
172  latitude,
173  longitude,
174  )
175 
176  # it is ensured, that distance can't be None, since zones must have lat/lon coordinates
177  assert distance_to_zone is not None
178  return round(distance_to_zone)
179 
181  self,
182  zone: State,
183  device: State,
184  old_latitude: float | None,
185  old_longitude: float | None,
186  new_latitude: float | None,
187  new_longitude: float | None,
188  ) -> str | None:
189  if device.state.lower() == self.proximity_zone_name.lower():
190  _LOGGER.debug(
191  "%s: %s in zone -> direction_of_travel=arrived",
192  self.namename,
193  device.entity_id,
194  )
195  return "arrived"
196 
197  if (
198  old_latitude is None
199  or old_longitude is None
200  or new_latitude is None
201  or new_longitude is None
202  ):
203  return None
204 
205  old_distance = distance(
206  zone.attributes[ATTR_LATITUDE],
207  zone.attributes[ATTR_LONGITUDE],
208  old_latitude,
209  old_longitude,
210  )
211  new_distance = distance(
212  zone.attributes[ATTR_LATITUDE],
213  zone.attributes[ATTR_LONGITUDE],
214  new_latitude,
215  new_longitude,
216  )
217 
218  # it is ensured, that distance can't be None, since zones must have lat/lon coordinates
219  assert old_distance is not None
220  assert new_distance is not None
221  distance_travelled = round(new_distance - old_distance, 1)
222 
223  if distance_travelled < self.tolerance * -1:
224  return "towards"
225 
226  if distance_travelled > self.tolerance:
227  return "away_from"
228 
229  return "stationary"
230 
231  async def _async_update_data(self) -> ProximityData:
232  """Calculate Proximity data."""
233  if (zone_state := self.hasshass.states.get(self.proximity_zone_id)) is None:
234  _LOGGER.debug(
235  "%s: zone %s does not exist -> reset",
236  self.namename,
237  self.proximity_zone_id,
238  )
239  return ProximityData(DEFAULT_PROXIMITY_DATA, {})
240 
241  entities_data = self.datadatadata.entities
242 
243  # calculate distance for all tracked entities
244  for entity_id in self.tracked_entities:
245  if (tracked_entity_state := self.hasshass.states.get(entity_id)) is None:
246  if entities_data.pop(entity_id, None) is not None:
247  _LOGGER.debug(
248  "%s: %s does not exist -> remove", self.namename, entity_id
249  )
250  continue
251 
252  if entity_id not in entities_data:
253  _LOGGER.debug("%s: %s is new -> add", self.namename, entity_id)
254  entities_data[entity_id] = {
255  ATTR_DIST_TO: None,
256  ATTR_DIR_OF_TRAVEL: None,
257  ATTR_NAME: tracked_entity_state.name,
258  ATTR_IN_IGNORED_ZONE: False,
259  }
260  entities_data[entity_id][ATTR_IN_IGNORED_ZONE] = (
261  f"{ZONE_DOMAIN}.{tracked_entity_state.state.lower()}"
262  in self.ignored_zone_ids
263  )
264  entities_data[entity_id][ATTR_DIST_TO] = self._calc_distance_to_zone_calc_distance_to_zone(
265  zone_state,
266  tracked_entity_state,
267  tracked_entity_state.attributes.get(ATTR_LATITUDE),
268  tracked_entity_state.attributes.get(ATTR_LONGITUDE),
269  )
270  if entities_data[entity_id][ATTR_DIST_TO] is None:
271  _LOGGER.debug(
272  "%s: %s has unknown distance got -> direction_of_travel=None",
273  self.namename,
274  entity_id,
275  )
276  entities_data[entity_id][ATTR_DIR_OF_TRAVEL] = None
277 
278  # calculate direction of travel only for last updated tracked entity
279  if (state_change_data := self.state_change_datastate_change_data) is not None and (
280  new_state := state_change_data.new_state
281  ) is not None:
282  _LOGGER.debug(
283  "%s: calculate direction of travel for %s",
284  self.namename,
285  state_change_data.entity_id,
286  )
287 
288  if (old_state := state_change_data.old_state) is not None:
289  old_lat = old_state.attributes.get(ATTR_LATITUDE)
290  old_lon = old_state.attributes.get(ATTR_LONGITUDE)
291  else:
292  old_lat = None
293  old_lon = None
294 
295  entities_data[state_change_data.entity_id][ATTR_DIR_OF_TRAVEL] = (
296  self._calc_direction_of_travel_calc_direction_of_travel(
297  zone_state,
298  new_state,
299  old_lat,
300  old_lon,
301  new_state.attributes.get(ATTR_LATITUDE),
302  new_state.attributes.get(ATTR_LONGITUDE),
303  )
304  )
305 
306  # takeover data for legacy proximity entity
307  proximity_data: dict[str, str | int | None] = {
308  ATTR_DIST_TO: DEFAULT_DIST_TO_ZONE,
309  ATTR_DIR_OF_TRAVEL: DEFAULT_DIR_OF_TRAVEL,
310  ATTR_NEAREST: DEFAULT_NEAREST,
311  }
312  for entity_data in entities_data.values():
313  if (distance_to := entity_data[ATTR_DIST_TO]) is None or entity_data[
314  ATTR_IN_IGNORED_ZONE
315  ]:
316  continue
317 
318  if isinstance((nearest_distance_to := proximity_data[ATTR_DIST_TO]), str):
319  _LOGGER.debug("set first entity_data: %s", entity_data)
320  proximity_data = {
321  ATTR_DIST_TO: distance_to,
322  ATTR_DIR_OF_TRAVEL: entity_data[ATTR_DIR_OF_TRAVEL],
323  ATTR_NEAREST: str(entity_data[ATTR_NAME]),
324  }
325  continue
326 
327  if cast(int, nearest_distance_to) > int(distance_to):
328  _LOGGER.debug("set closer entity_data: %s", entity_data)
329  proximity_data = {
330  ATTR_DIST_TO: distance_to,
331  ATTR_DIR_OF_TRAVEL: entity_data[ATTR_DIR_OF_TRAVEL],
332  ATTR_NEAREST: str(entity_data[ATTR_NAME]),
333  }
334  continue
335 
336  if cast(int, nearest_distance_to) == int(distance_to):
337  _LOGGER.debug("set equally close entity_data: %s", entity_data)
338  proximity_data[ATTR_NEAREST] = (
339  f"{proximity_data[ATTR_NEAREST]}, {entity_data[ATTR_NAME]!s}"
340  )
341 
342  return ProximityData(proximity_data, entities_data)
343 
344  def _create_removed_tracked_entity_issue(self, entity_id: str) -> None:
345  """Create a repair issue for a removed tracked entity."""
347  self.hasshass,
348  DOMAIN,
349  f"tracked_entity_removed_{entity_id}",
350  is_fixable=True,
351  is_persistent=True,
352  severity=IssueSeverity.WARNING,
353  translation_key="tracked_entity_removed",
354  translation_placeholders={"entity_id": entity_id, "name": self.namename},
355  )
None async_check_tracked_entity_change(self, Event[er.EventEntityRegistryUpdatedData] event)
Definition: coordinator.py:121
None async_check_proximity_state_change(self, Event[EventStateChangedData] event)
Definition: coordinator.py:111
None __init__(self, HomeAssistant hass, str friendly_name, ConfigType config)
Definition: coordinator.py:80
None async_add_entity_mapping(self, str tracked_entity_id, str entity_id)
Definition: coordinator.py:104
str|None _calc_direction_of_travel(self, State zone, State device, float|None old_latitude, float|None old_longitude, float|None new_latitude, float|None new_longitude)
Definition: coordinator.py:188
int|None _calc_distance_to_zone(self, State zone, State device, float|None latitude, float|None longitude)
Definition: coordinator.py:152
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69
def distance(hass, *args)
Definition: template.py:1796