1 """Persistently store thread datasets."""
3 from __future__
import annotations
5 from asyncio
import Event, Task, wait
7 from datetime
import datetime
9 from typing
import Any, cast
11 from propcache
import cached_property
12 from python_otbr_api
import tlv_parser
13 from python_otbr_api.tlv_parser
import MeshcopTLVType
21 from .
import discovery
23 BORDER_AGENT_DISCOVERY_TIMEOUT = 30
24 DATA_STORE =
"thread.datasets"
25 STORAGE_KEY =
"thread.datasets"
26 STORAGE_VERSION_MAJOR = 1
27 STORAGE_VERSION_MINOR = 4
30 _LOGGER = logging.getLogger(__name__)
34 """Raised when attempting to delete the preferred dataset."""
37 @dataclasses.dataclass(frozen=
True)
39 """Dataset store entry."""
41 preferred_border_agent_id: str |
None
42 preferred_extended_address: str |
None
46 created: datetime = dataclasses.field(default_factory=dt_util.utcnow)
47 id: str = dataclasses.field(default_factory=ulid_util.ulid_now)
51 """Return channel as an integer."""
52 if (channel := self.
datasetdataset.
get(MeshcopTLVType.CHANNEL))
is None:
54 return cast(tlv_parser.Channel, channel).channel
57 def dataset(self) -> dict[MeshcopTLVType, tlv_parser.MeshcopTLVItem]:
58 """Return the dataset in dict format."""
59 return tlv_parser.parse_tlv(self.tlv)
63 """Return extended PAN ID as a hex string."""
64 return str(self.
datasetdataset[MeshcopTLVType.EXTPANID])
68 """Return network name as a string."""
69 if (name := self.
datasetdataset.
get(MeshcopTLVType.NETWORKNAME))
is None:
71 return cast(tlv_parser.NetworkName, name).name
75 """Return PAN ID as a hex string."""
79 """Return a JSON serializable representation for storage."""
81 "created": self.created.isoformat(),
83 "preferred_border_agent_id": self.preferred_border_agent_id,
84 "preferred_extended_address": self.preferred_extended_address,
85 "source": self.source,
91 """Store Thread datasets."""
94 self, old_major_version: int, old_minor_version: int, old_data: dict[str, Any]
96 """Migrate to the new version."""
97 if old_major_version == 1:
99 if old_minor_version < 2:
101 datasets: dict[str, DatasetEntry] = {}
102 preferred_dataset = old_data[
"preferred_dataset"]
104 for dataset
in old_data[
"datasets"]:
105 created = cast(datetime, dt_util.parse_datetime(dataset[
"created"]))
109 preferred_border_agent_id=
None,
110 preferred_extended_address=
None,
111 source=dataset[
"source"],
115 MeshcopTLVType.EXTPANID
not in entry.dataset
116 or MeshcopTLVType.ACTIVETIMESTAMP
not in entry.dataset
119 "Dropped invalid Thread dataset '%s'", entry.tlv
121 if entry.id == preferred_dataset:
122 preferred_dataset =
None
125 if entry.extended_pan_id
in datasets:
126 if datasets[entry.extended_pan_id].id == preferred_dataset:
129 "Dropped duplicated Thread dataset '%s' "
130 "(duplicate of preferred dataset '%s')"
133 datasets[entry.extended_pan_id].tlv,
136 new_timestamp = cast(
137 tlv_parser.Timestamp,
138 entry.dataset[MeshcopTLVType.ACTIVETIMESTAMP],
140 old_timestamp = cast(
141 tlv_parser.Timestamp,
142 datasets[entry.extended_pan_id].dataset[
143 MeshcopTLVType.ACTIVETIMESTAMP
146 if old_timestamp.seconds >= new_timestamp.seconds
or (
147 old_timestamp.seconds == new_timestamp.seconds
148 and old_timestamp.ticks >= new_timestamp.ticks
152 "Dropped duplicated Thread dataset '%s' "
153 "(duplicate of '%s')"
156 datasets[entry.extended_pan_id].tlv,
161 "Dropped duplicated Thread dataset '%s' "
162 "(duplicate of '%s')"
164 datasets[entry.extended_pan_id].tlv,
167 datasets[entry.extended_pan_id] = entry
169 "preferred_dataset": preferred_dataset,
170 "datasets": [dataset.to_json()
for dataset
in datasets.values()],
174 if old_minor_version < 4:
177 for dataset
in data[
"datasets"]:
178 dataset[
"preferred_border_agent_id"] =
None
179 dataset[
"preferred_extended_address"] =
None
185 """Class to hold a collection of thread datasets."""
188 """Initialize the dataset store."""
190 self.
datasetsdatasets: dict[str, DatasetEntry] = {}
195 STORAGE_VERSION_MAJOR,
198 minor_version=STORAGE_VERSION_MINOR,
206 preferred_border_agent_id: str |
None,
207 preferred_extended_address: str |
None,
209 """Add dataset, does nothing if it already exists."""
211 dataset = tlv_parser.parse_tlv(tlv)
216 MeshcopTLVType.EXTPANID
not in dataset
217 or MeshcopTLVType.ACTIVETIMESTAMP
not in dataset
223 if preferred_border_agent_id
is not None and preferred_extended_address
is None:
225 "Must set preferred extended address with preferred border agent ID"
229 entry: DatasetEntry |
None
230 for entry
in self.
datasetsdatasets.values():
231 if entry.dataset == dataset:
233 preferred_extended_address
234 and entry.preferred_extended_address
is None
237 entry.id, preferred_border_agent_id, preferred_extended_address
246 for entry
in self.
datasetsdatasets.values()
247 if entry.dataset[MeshcopTLVType.EXTPANID]
248 == dataset[MeshcopTLVType.EXTPANID]
252 new_timestamp = cast(
253 tlv_parser.Timestamp, dataset[MeshcopTLVType.ACTIVETIMESTAMP]
255 old_timestamp = cast(
256 tlv_parser.Timestamp,
257 entry.dataset[MeshcopTLVType.ACTIVETIMESTAMP],
259 if old_timestamp.seconds >= new_timestamp.seconds
or (
260 old_timestamp.seconds == new_timestamp.seconds
261 and old_timestamp.ticks >= new_timestamp.ticks
265 "Got dataset with same extended PAN ID and same or older active"
266 " timestamp, old dataset: '%s', new dataset: '%s'"
274 "Updating dataset with same extended PAN ID and newer active "
275 "timestamp, old dataset: '%s', new dataset: '%s'"
280 self.
datasetsdatasets[entry.id] = dataclasses.replace(
281 self.
datasetsdatasets[entry.id], tlv=tlv
284 if preferred_extended_address
and entry.preferred_extended_address
is None:
286 entry.id, preferred_border_agent_id, preferred_extended_address
291 preferred_border_agent_id=preferred_border_agent_id,
292 preferred_extended_address=preferred_extended_address,
296 self.
datasetsdatasets[entry.id] = entry
303 and preferred_extended_address
308 entry.id, preferred_extended_address
314 """Delete dataset."""
317 del self.
datasetsdatasets[dataset_id]
321 def async_get(self, dataset_id: str) -> DatasetEntry |
None:
322 """Get dataset by id."""
327 self, dataset_id: str, border_agent_id: str |
None, extended_address: str
329 """Set preferred border agent id and extended address of a dataset."""
332 if border_agent_id
is not None and extended_address
is None:
334 "Must set preferred extended address with preferred border agent ID"
337 self.
datasetsdatasets[dataset_id] = dataclasses.replace(
339 preferred_border_agent_id=border_agent_id,
340 preferred_extended_address=extended_address,
347 """Get the id of the preferred dataset."""
350 @preferred_dataset.setter
353 """Set the preferred dataset."""
354 if dataset_id
not in self.
datasetsdatasets:
355 raise KeyError(
"unknown dataset")
360 self, dataset_id: str, extended_address: str |
None
362 """Set the preferred dataset, unless there are other routers present."""
364 "_set_preferred_dataset_if_only_network called for router %s",
368 own_router_evt = Event()
369 other_router_evt = Event()
372 def router_discovered(
375 """Handle router discovered."""
376 _LOGGER.debug(
"discovered router with ext addr %s", data.extended_address)
377 if data.extended_address == extended_address:
381 other_router_evt.set()
385 self.
hasshass, router_discovered,
lambda key:
None
387 await thread_discovery.async_start()
389 found_own_router = self.
hasshass.async_create_task(own_router_evt.wait())
390 found_other_router = self.
hasshass.async_create_task(other_router_evt.wait())
391 pending = {found_own_router, found_other_router}
392 (done, pending) = await wait(pending, timeout=BORDER_AGENT_DISCOVERY_TIMEOUT)
393 if found_other_router
in done:
396 _LOGGER.debug(
"Other router found, do not set dataset as default")
400 elif found_own_router
in pending:
403 _LOGGER.debug(
"Own router not found, do not set dataset as default")
408 _LOGGER.debug(
"No other router found, set dataset as default")
413 await thread_discovery.async_stop()
416 """Load the datasets."""
419 datasets: dict[str, DatasetEntry] = {}
420 preferred_dataset: str |
None =
None
423 for dataset
in data[
"datasets"]:
424 created = cast(datetime, dt_util.parse_datetime(dataset[
"created"]))
428 preferred_border_agent_id=dataset[
"preferred_border_agent_id"],
429 preferred_extended_address=dataset[
"preferred_extended_address"],
430 source=dataset[
"source"],
433 preferred_dataset = data[
"preferred_dataset"]
440 """Schedule saving the dataset store."""
445 """Return data of datasets to store in a file."""
446 data: dict[str, Any] = {}
447 data[
"datasets"] = [dataset.to_json()
for dataset
in self.
datasetsdatasets.values()]
452 @singleton(DATA_STORE)
454 """Get the dataset store."""
456 await store.async_load()
465 preferred_border_agent_id: str |
None =
None,
466 preferred_extended_address: str |
None =
None,
470 store.async_add(source, tlv, preferred_border_agent_id, preferred_extended_address)
476 if (entry := store.async_get(dataset_id))
is None:
482 """Get the preferred dataset."""
484 if (preferred_dataset := store.preferred_dataset)
is None or (
485 entry := store.async_get(preferred_dataset)
str|None network_name(self)
dict[str, Any] to_json(self)
str extended_pan_id(self)
dict[MeshcopTLVType, tlv_parser.MeshcopTLVItem] dataset(self)
dict[str, Any] _async_migrate_func(self, int old_major_version, int old_minor_version, dict[str, Any] old_data)
dict[str, list[dict[str, str|None]]] _data_to_save(self)
_set_preferred_dataset_task
None _set_preferred_dataset_if_only_network(self, str dataset_id, str|None extended_address)
None async_add(self, str source, str tlv, str|None preferred_border_agent_id, str|None preferred_extended_address)
None async_set_preferred_border_agent(self, str dataset_id, str|None border_agent_id, str extended_address)
None async_delete(self, str dataset_id)
DatasetEntry|None async_get(self, str dataset_id)
None __init__(self, HomeAssistant hass)
str|None preferred_dataset(self)
None preferred_dataset(self, str dataset_id)
None async_schedule_save(self)
web.Response get(self, web.Request request, str config_key)
None async_add_dataset(HomeAssistant hass, str source, str tlv, *str|None preferred_border_agent_id=None, str|None preferred_extended_address=None)
DatasetStore async_get_store(HomeAssistant hass)
str|None async_get_dataset(HomeAssistant hass, str dataset_id)
str|None async_get_preferred_dataset(HomeAssistant hass)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)