Home Assistant Unofficial Reference 2024.12.1
dataset_store.py
Go to the documentation of this file.
1 """Persistently store thread datasets."""
2 
3 from __future__ import annotations
4 
5 from asyncio import Event, Task, wait
6 import dataclasses
7 from datetime import datetime
8 import logging
9 from typing import Any, cast
10 
11 from propcache import cached_property
12 from python_otbr_api import tlv_parser
13 from python_otbr_api.tlv_parser import MeshcopTLVType
14 
15 from homeassistant.core import HomeAssistant, callback
16 from homeassistant.exceptions import HomeAssistantError
17 from homeassistant.helpers.singleton import singleton
18 from homeassistant.helpers.storage import Store
19 from homeassistant.util import dt as dt_util, ulid as ulid_util
20 
21 from . import discovery
22 
23 BORDER_AGENT_DISCOVERY_TIMEOUT = 30
24 DATA_STORE = "thread.datasets"
25 STORAGE_KEY = "thread.datasets"
26 STORAGE_VERSION_MAJOR = 1
27 STORAGE_VERSION_MINOR = 4
28 SAVE_DELAY = 10
29 
30 _LOGGER = logging.getLogger(__name__)
31 
32 
34  """Raised when attempting to delete the preferred dataset."""
35 
36 
37 @dataclasses.dataclass(frozen=True)
39  """Dataset store entry."""
40 
41  preferred_border_agent_id: str | None
42  preferred_extended_address: str | None
43  source: str
44  tlv: str
45 
46  created: datetime = dataclasses.field(default_factory=dt_util.utcnow)
47  id: str = dataclasses.field(default_factory=ulid_util.ulid_now)
48 
49  @property
50  def channel(self) -> int | None:
51  """Return channel as an integer."""
52  if (channel := self.datasetdataset.get(MeshcopTLVType.CHANNEL)) is None:
53  return None
54  return cast(tlv_parser.Channel, channel).channel
55 
56  @cached_property
57  def dataset(self) -> dict[MeshcopTLVType, tlv_parser.MeshcopTLVItem]:
58  """Return the dataset in dict format."""
59  return tlv_parser.parse_tlv(self.tlv)
60 
61  @property
62  def extended_pan_id(self) -> str:
63  """Return extended PAN ID as a hex string."""
64  return str(self.datasetdataset[MeshcopTLVType.EXTPANID])
65 
66  @property
67  def network_name(self) -> str | None:
68  """Return network name as a string."""
69  if (name := self.datasetdataset.get(MeshcopTLVType.NETWORKNAME)) is None:
70  return None
71  return cast(tlv_parser.NetworkName, name).name
72 
73  @property
74  def pan_id(self) -> str | None:
75  """Return PAN ID as a hex string."""
76  return str(self.datasetdataset.get(MeshcopTLVType.PANID))
77 
78  def to_json(self) -> dict[str, Any]:
79  """Return a JSON serializable representation for storage."""
80  return {
81  "created": self.created.isoformat(),
82  "id": self.id,
83  "preferred_border_agent_id": self.preferred_border_agent_id,
84  "preferred_extended_address": self.preferred_extended_address,
85  "source": self.source,
86  "tlv": self.tlv,
87  }
88 
89 
91  """Store Thread datasets."""
92 
94  self, old_major_version: int, old_minor_version: int, old_data: dict[str, Any]
95  ) -> dict[str, Any]:
96  """Migrate to the new version."""
97  if old_major_version == 1:
98  data = old_data
99  if old_minor_version < 2:
100  # Deduplicate datasets
101  datasets: dict[str, DatasetEntry] = {}
102  preferred_dataset = old_data["preferred_dataset"]
103 
104  for dataset in old_data["datasets"]:
105  created = cast(datetime, dt_util.parse_datetime(dataset["created"]))
106  entry = DatasetEntry(
107  created=created,
108  id=dataset["id"],
109  preferred_border_agent_id=None,
110  preferred_extended_address=None,
111  source=dataset["source"],
112  tlv=dataset["tlv"],
113  )
114  if (
115  MeshcopTLVType.EXTPANID not in entry.dataset
116  or MeshcopTLVType.ACTIVETIMESTAMP not in entry.dataset
117  ):
118  _LOGGER.warning(
119  "Dropped invalid Thread dataset '%s'", entry.tlv
120  )
121  if entry.id == preferred_dataset:
122  preferred_dataset = None
123  continue
124 
125  if entry.extended_pan_id in datasets:
126  if datasets[entry.extended_pan_id].id == preferred_dataset:
127  _LOGGER.warning(
128  (
129  "Dropped duplicated Thread dataset '%s' "
130  "(duplicate of preferred dataset '%s')"
131  ),
132  entry.tlv,
133  datasets[entry.extended_pan_id].tlv,
134  )
135  continue
136  new_timestamp = cast(
137  tlv_parser.Timestamp,
138  entry.dataset[MeshcopTLVType.ACTIVETIMESTAMP],
139  )
140  old_timestamp = cast(
141  tlv_parser.Timestamp,
142  datasets[entry.extended_pan_id].dataset[
143  MeshcopTLVType.ACTIVETIMESTAMP
144  ],
145  )
146  if old_timestamp.seconds >= new_timestamp.seconds or (
147  old_timestamp.seconds == new_timestamp.seconds
148  and old_timestamp.ticks >= new_timestamp.ticks
149  ):
150  _LOGGER.warning(
151  (
152  "Dropped duplicated Thread dataset '%s' "
153  "(duplicate of '%s')"
154  ),
155  entry.tlv,
156  datasets[entry.extended_pan_id].tlv,
157  )
158  continue
159  _LOGGER.warning(
160  (
161  "Dropped duplicated Thread dataset '%s' "
162  "(duplicate of '%s')"
163  ),
164  datasets[entry.extended_pan_id].tlv,
165  entry.tlv,
166  )
167  datasets[entry.extended_pan_id] = entry
168  data = {
169  "preferred_dataset": preferred_dataset,
170  "datasets": [dataset.to_json() for dataset in datasets.values()],
171  }
172  # Migration to version 1.3 removed, it added the ID of the preferred border
173  # agent
174  if old_minor_version < 4:
175  # Add extended address of the preferred border agent and clear border
176  # agent ID
177  for dataset in data["datasets"]:
178  dataset["preferred_border_agent_id"] = None
179  dataset["preferred_extended_address"] = None
180 
181  return data
182 
183 
185  """Class to hold a collection of thread datasets."""
186 
187  def __init__(self, hass: HomeAssistant) -> None:
188  """Initialize the dataset store."""
189  self.hasshass = hass
190  self.datasetsdatasets: dict[str, DatasetEntry] = {}
191  self._preferred_dataset_preferred_dataset: str | None = None
192  self._set_preferred_dataset_task_set_preferred_dataset_task: Task | None = None
193  self._store: Store[dict[str, Any]] = DatasetStoreStore(
194  hass,
195  STORAGE_VERSION_MAJOR,
196  STORAGE_KEY,
197  atomic_writes=True,
198  minor_version=STORAGE_VERSION_MINOR,
199  )
200 
201  @callback
203  self,
204  source: str,
205  tlv: str,
206  preferred_border_agent_id: str | None,
207  preferred_extended_address: str | None,
208  ) -> None:
209  """Add dataset, does nothing if it already exists."""
210  # Make sure the tlv is valid
211  dataset = tlv_parser.parse_tlv(tlv)
212 
213  # Don't allow adding a dataset which does not have an extended pan id or
214  # timestamp
215  if (
216  MeshcopTLVType.EXTPANID not in dataset
217  or MeshcopTLVType.ACTIVETIMESTAMP not in dataset
218  ):
219  raise HomeAssistantError("Invalid dataset")
220 
221  # Don't allow setting preferred border agent ID without setting
222  # preferred extended address
223  if preferred_border_agent_id is not None and preferred_extended_address is None:
224  raise HomeAssistantError(
225  "Must set preferred extended address with preferred border agent ID"
226  )
227 
228  # Bail out if the dataset already exists
229  entry: DatasetEntry | None
230  for entry in self.datasetsdatasets.values():
231  if entry.dataset == dataset:
232  if (
233  preferred_extended_address
234  and entry.preferred_extended_address is None
235  ):
236  self.async_set_preferred_border_agentasync_set_preferred_border_agent(
237  entry.id, preferred_border_agent_id, preferred_extended_address
238  )
239  return
240 
241  # Update if dataset with same extended pan id exists and the timestamp
242  # is newer
243  if entry := next(
244  (
245  entry
246  for entry in self.datasetsdatasets.values()
247  if entry.dataset[MeshcopTLVType.EXTPANID]
248  == dataset[MeshcopTLVType.EXTPANID]
249  ),
250  None,
251  ):
252  new_timestamp = cast(
253  tlv_parser.Timestamp, dataset[MeshcopTLVType.ACTIVETIMESTAMP]
254  )
255  old_timestamp = cast(
256  tlv_parser.Timestamp,
257  entry.dataset[MeshcopTLVType.ACTIVETIMESTAMP],
258  )
259  if old_timestamp.seconds >= new_timestamp.seconds or (
260  old_timestamp.seconds == new_timestamp.seconds
261  and old_timestamp.ticks >= new_timestamp.ticks
262  ):
263  _LOGGER.warning(
264  (
265  "Got dataset with same extended PAN ID and same or older active"
266  " timestamp, old dataset: '%s', new dataset: '%s'"
267  ),
268  entry.tlv,
269  tlv,
270  )
271  return
272  _LOGGER.debug(
273  (
274  "Updating dataset with same extended PAN ID and newer active "
275  "timestamp, old dataset: '%s', new dataset: '%s'"
276  ),
277  entry.tlv,
278  tlv,
279  )
280  self.datasetsdatasets[entry.id] = dataclasses.replace(
281  self.datasetsdatasets[entry.id], tlv=tlv
282  )
283  self.async_schedule_saveasync_schedule_save()
284  if preferred_extended_address and entry.preferred_extended_address is None:
285  self.async_set_preferred_border_agentasync_set_preferred_border_agent(
286  entry.id, preferred_border_agent_id, preferred_extended_address
287  )
288  return
289 
290  entry = DatasetEntry(
291  preferred_border_agent_id=preferred_border_agent_id,
292  preferred_extended_address=preferred_extended_address,
293  source=source,
294  tlv=tlv,
295  )
296  self.datasetsdatasets[entry.id] = entry
297  self.async_schedule_saveasync_schedule_save()
298 
299  # Set the new network as preferred if there is no preferred dataset and there is
300  # no other router present. We only attempt this once.
301  if (
302  self._preferred_dataset_preferred_dataset is None
303  and preferred_extended_address
304  and not self._set_preferred_dataset_task_set_preferred_dataset_task
305  ):
306  self._set_preferred_dataset_task_set_preferred_dataset_task = self.hasshass.async_create_task(
307  self._set_preferred_dataset_if_only_network_set_preferred_dataset_if_only_network(
308  entry.id, preferred_extended_address
309  )
310  )
311 
312  @callback
313  def async_delete(self, dataset_id: str) -> None:
314  """Delete dataset."""
315  if self._preferred_dataset_preferred_dataset == dataset_id:
316  raise DatasetPreferredError("attempt to remove preferred dataset")
317  del self.datasetsdatasets[dataset_id]
318  self.async_schedule_saveasync_schedule_save()
319 
320  @callback
321  def async_get(self, dataset_id: str) -> DatasetEntry | None:
322  """Get dataset by id."""
323  return self.datasetsdatasets.get(dataset_id)
324 
325  @callback
327  self, dataset_id: str, border_agent_id: str | None, extended_address: str
328  ) -> None:
329  """Set preferred border agent id and extended address of a dataset."""
330  # Don't allow setting preferred border agent ID without setting
331  # preferred extended address
332  if border_agent_id is not None and extended_address is None:
333  raise HomeAssistantError(
334  "Must set preferred extended address with preferred border agent ID"
335  )
336 
337  self.datasetsdatasets[dataset_id] = dataclasses.replace(
338  self.datasetsdatasets[dataset_id],
339  preferred_border_agent_id=border_agent_id,
340  preferred_extended_address=extended_address,
341  )
342  self.async_schedule_saveasync_schedule_save()
343 
344  @property
345  @callback
346  def preferred_dataset(self) -> str | None:
347  """Get the id of the preferred dataset."""
348  return self._preferred_dataset_preferred_dataset
349 
350  @preferred_dataset.setter
351  @callback
352  def preferred_dataset(self, dataset_id: str) -> None:
353  """Set the preferred dataset."""
354  if dataset_id not in self.datasetsdatasets:
355  raise KeyError("unknown dataset")
356  self._preferred_dataset_preferred_dataset = dataset_id
357  self.async_schedule_saveasync_schedule_save()
358 
360  self, dataset_id: str, extended_address: str | None
361  ) -> None:
362  """Set the preferred dataset, unless there are other routers present."""
363  _LOGGER.debug(
364  "_set_preferred_dataset_if_only_network called for router %s",
365  extended_address,
366  )
367 
368  own_router_evt = Event()
369  other_router_evt = Event()
370 
371  @callback
372  def router_discovered(
374  ) -> None:
375  """Handle router discovered."""
376  _LOGGER.debug("discovered router with ext addr %s", data.extended_address)
377  if data.extended_address == extended_address:
378  own_router_evt.set()
379  return
380 
381  other_router_evt.set()
382 
383  # Start Thread router discovery
384  thread_discovery = discovery.ThreadRouterDiscovery(
385  self.hasshass, router_discovered, lambda key: None
386  )
387  await thread_discovery.async_start()
388 
389  found_own_router = self.hasshass.async_create_task(own_router_evt.wait())
390  found_other_router = self.hasshass.async_create_task(other_router_evt.wait())
391  pending = {found_own_router, found_other_router}
392  (done, pending) = await wait(pending, timeout=BORDER_AGENT_DISCOVERY_TIMEOUT)
393  if found_other_router in done:
394  # We found another router on the network, don't set the dataset
395  # as preferred
396  _LOGGER.debug("Other router found, do not set dataset as default")
397 
398  # Note that asyncio.wait does not raise TimeoutError, it instead returns
399  # the jobs which did not finish in the pending-set.
400  elif found_own_router in pending:
401  # Either the router is not there, or mDNS is not working. In any case,
402  # don't set the router as preferred.
403  _LOGGER.debug("Own router not found, do not set dataset as default")
404 
405  else:
406  # We've discovered the router connected to the dataset, but we did not
407  # find any other router on the network - mark the dataset as preferred.
408  _LOGGER.debug("No other router found, set dataset as default")
409  self.preferred_datasetpreferred_datasetpreferred_datasetpreferred_dataset = dataset_id
410 
411  for task in pending:
412  task.cancel()
413  await thread_discovery.async_stop()
414 
415  async def async_load(self) -> None:
416  """Load the datasets."""
417  data = await self._store.async_load()
418 
419  datasets: dict[str, DatasetEntry] = {}
420  preferred_dataset: str | None = None
421 
422  if data is not None:
423  for dataset in data["datasets"]:
424  created = cast(datetime, dt_util.parse_datetime(dataset["created"]))
425  datasets[dataset["id"]] = DatasetEntry(
426  created=created,
427  id=dataset["id"],
428  preferred_border_agent_id=dataset["preferred_border_agent_id"],
429  preferred_extended_address=dataset["preferred_extended_address"],
430  source=dataset["source"],
431  tlv=dataset["tlv"],
432  )
433  preferred_dataset = data["preferred_dataset"]
434 
435  self.datasetsdatasets = datasets
436  self._preferred_dataset_preferred_dataset = preferred_dataset
437 
438  @callback
439  def async_schedule_save(self) -> None:
440  """Schedule saving the dataset store."""
441  self._store.async_delay_save(self._data_to_save_data_to_save, SAVE_DELAY)
442 
443  @callback
444  def _data_to_save(self) -> dict[str, list[dict[str, str | None]]]:
445  """Return data of datasets to store in a file."""
446  data: dict[str, Any] = {}
447  data["datasets"] = [dataset.to_json() for dataset in self.datasetsdatasets.values()]
448  data["preferred_dataset"] = self._preferred_dataset_preferred_dataset
449  return data
450 
451 
452 @singleton(DATA_STORE)
453 async def async_get_store(hass: HomeAssistant) -> DatasetStore:
454  """Get the dataset store."""
455  store = DatasetStore(hass)
456  await store.async_load()
457  return store
458 
459 
461  hass: HomeAssistant,
462  source: str,
463  tlv: str,
464  *,
465  preferred_border_agent_id: str | None = None,
466  preferred_extended_address: str | None = None,
467 ) -> None:
468  """Add a dataset."""
469  store = await async_get_store(hass)
470  store.async_add(source, tlv, preferred_border_agent_id, preferred_extended_address)
471 
472 
473 async def async_get_dataset(hass: HomeAssistant, dataset_id: str) -> str | None:
474  """Get a dataset."""
475  store = await async_get_store(hass)
476  if (entry := store.async_get(dataset_id)) is None:
477  return None
478  return entry.tlv
479 
480 
481 async def async_get_preferred_dataset(hass: HomeAssistant) -> str | None:
482  """Get the preferred dataset."""
483  store = await async_get_store(hass)
484  if (preferred_dataset := store.preferred_dataset) is None or (
485  entry := store.async_get(preferred_dataset)
486  ) is None:
487  return None
488  return entry.tlv
dict[MeshcopTLVType, tlv_parser.MeshcopTLVItem] dataset(self)
dict[str, Any] _async_migrate_func(self, int old_major_version, int old_minor_version, dict[str, Any] old_data)
dict[str, list[dict[str, str|None]]] _data_to_save(self)
None _set_preferred_dataset_if_only_network(self, str dataset_id, str|None extended_address)
None async_add(self, str source, str tlv, str|None preferred_border_agent_id, str|None preferred_extended_address)
None async_set_preferred_border_agent(self, str dataset_id, str|None border_agent_id, str extended_address)
DatasetEntry|None async_get(self, str dataset_id)
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_add_dataset(HomeAssistant hass, str source, str tlv, *str|None preferred_border_agent_id=None, str|None preferred_extended_address=None)
DatasetStore async_get_store(HomeAssistant hass)
str|None async_get_dataset(HomeAssistant hass, str dataset_id)
str|None async_get_preferred_dataset(HomeAssistant hass)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)
Definition: storage.py:444