Home Assistant Unofficial Reference 2024.12.1
telegrams.py
Go to the documentation of this file.
1 """KNX Telegram handler."""
2 
3 from __future__ import annotations
4 
5 from collections import deque
6 from typing import Final, TypedDict
7 
8 from xknx import XKNX
9 from xknx.dpt import DPTArray, DPTBase, DPTBinary
10 from xknx.dpt.dpt import DPTComplexData, DPTEnumData
11 from xknx.exceptions import XKNXException
12 from xknx.telegram import Telegram, TelegramDirection
13 from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
14 
15 from homeassistant.core import HomeAssistant
16 from homeassistant.helpers.dispatcher import async_dispatcher_send
17 from homeassistant.helpers.storage import Store
18 import homeassistant.util.dt as dt_util
19 from homeassistant.util.signal_type import SignalType
20 
21 from .const import DOMAIN
22 from .project import KNXProject
23 
24 STORAGE_VERSION: Final = 1
25 STORAGE_KEY: Final = f"{DOMAIN}/telegrams_history.json"
26 
27 # dispatcher signal for KNX interface device triggers
28 SIGNAL_KNX_TELEGRAM: SignalType[Telegram, TelegramDict] = SignalType("knx_telegram")
29 
30 
31 class DecodedTelegramPayload(TypedDict):
32  """Decoded payload value and metadata."""
33 
34  dpt_main: int | None
35  dpt_sub: int | None
36  dpt_name: str | None
37  unit: str | None
38  value: bool | str | int | float | dict[str, str | int | float | bool] | None
39 
40 
42  """Represent a Telegram as a dict."""
43 
44  # this has to be in sync with the frontend implementation
45  destination: str
46  destination_name: str
47  direction: str
48  payload: int | tuple[int, ...] | None
49  source: str
50  source_name: str
51  telegramtype: str
52  timestamp: str # ISO format
53 
54 
55 class Telegrams:
56  """Class to handle KNX telegrams."""
57 
58  def __init__(
59  self,
60  hass: HomeAssistant,
61  xknx: XKNX,
62  project: KNXProject,
63  log_size: int,
64  ) -> None:
65  """Initialize Telegrams class."""
66  self.hasshass = hass
67  self.projectproject = project
68  self._history_store_history_store = Store[list[TelegramDict]](
69  hass, STORAGE_VERSION, STORAGE_KEY
70  )
71  self._xknx_telegram_cb_handle_xknx_telegram_cb_handle = (
72  xknx.telegram_queue.register_telegram_received_cb(
73  telegram_received_cb=self._xknx_telegram_cb_xknx_telegram_cb,
74  match_for_outgoing=True,
75  )
76  )
77  self.recent_telegrams: deque[TelegramDict] = deque(maxlen=log_size)
78  self.last_ga_telegramslast_ga_telegrams: dict[str, TelegramDict] = {}
79 
80  async def load_history(self) -> None:
81  """Load history from store."""
82  if (telegrams := await self._history_store_history_store.async_load()) is None:
83  return
84  if self.recent_telegrams.maxlen == 0:
85  await self._history_store_history_store.async_remove()
86  return
87  for telegram in telegrams:
88  # tuples are stored as lists in JSON
89  if isinstance(telegram["payload"], list):
90  telegram["payload"] = tuple(telegram["payload"]) # type: ignore[unreachable]
91  self.recent_telegrams.extend(telegrams)
92  self.last_ga_telegramslast_ga_telegrams = {
93  t["destination"]: t for t in telegrams if t["payload"] is not None
94  }
95 
96  async def save_history(self) -> None:
97  """Save history to store."""
98  if self.recent_telegrams:
99  await self._history_store_history_store.async_save(list(self.recent_telegrams))
100 
101  def _xknx_telegram_cb(self, telegram: Telegram) -> None:
102  """Handle incoming and outgoing telegrams from xknx."""
103  telegram_dict = self.telegram_to_dicttelegram_to_dict(telegram)
104  self.recent_telegrams.append(telegram_dict)
105  if telegram_dict["payload"] is not None:
106  # exclude GroupValueRead telegrams
107  self.last_ga_telegramslast_ga_telegrams[telegram_dict["destination"]] = telegram_dict
108  async_dispatcher_send(self.hasshass, SIGNAL_KNX_TELEGRAM, telegram, telegram_dict)
109 
110  def telegram_to_dict(self, telegram: Telegram) -> TelegramDict:
111  """Convert a Telegram to a dict."""
112  dst_name = ""
113  payload_data: int | tuple[int, ...] | None = None
114  src_name = ""
115  transcoder = None
116  value = None
117 
118  if (
119  ga_info := self.projectproject.group_addresses.get(
120  f"{telegram.destination_address}"
121  )
122  ) is not None:
123  dst_name = ga_info.name
124 
125  if (
126  device := self.projectproject.devices.get(f"{telegram.source_address}")
127  ) is not None:
128  src_name = f"{device['manufacturer_name']} {device['name']}"
129  elif telegram.direction is TelegramDirection.OUTGOING:
130  src_name = "Home Assistant"
131 
132  if isinstance(telegram.payload, (GroupValueWrite, GroupValueResponse)):
133  payload_data = telegram.payload.value.value
134 
135  if telegram.decoded_data is not None:
136  transcoder = telegram.decoded_data.transcoder
137  value = _serializable_decoded_data(telegram.decoded_data.value)
138 
139  return TelegramDict(
140  destination=f"{telegram.destination_address}",
141  destination_name=dst_name,
142  direction=telegram.direction.value,
143  dpt_main=transcoder.dpt_main_number if transcoder is not None else None,
144  dpt_sub=transcoder.dpt_sub_number if transcoder is not None else None,
145  dpt_name=transcoder.value_type if transcoder is not None else None,
146  payload=payload_data,
147  source=f"{telegram.source_address}",
148  source_name=src_name,
149  telegramtype=telegram.payload.__class__.__name__,
150  timestamp=dt_util.now().isoformat(),
151  unit=transcoder.unit if transcoder is not None else None,
152  value=value,
153  )
154 
155 
157  value: bool | float | str | DPTComplexData | DPTEnumData,
158 ) -> bool | str | int | float | dict[str, str | int | float | bool]:
159  """Return a serializable representation of decoded data."""
160  if isinstance(value, DPTComplexData):
161  return value.as_dict()
162  if isinstance(value, DPTEnumData):
163  return value.name.lower()
164  return value
165 
166 
168  payload: DPTArray | DPTBinary, transcoder: type[DPTBase]
169 ) -> DecodedTelegramPayload:
170  """Decode the payload of a KNX telegram with custom transcoder."""
171  try:
172  value = transcoder.from_knx(payload)
173  except XKNXException:
174  value = "Error decoding value"
175 
176  value = _serializable_decoded_data(value)
177 
178  return DecodedTelegramPayload(
179  dpt_main=transcoder.dpt_main_number,
180  dpt_sub=transcoder.dpt_sub_number,
181  dpt_name=transcoder.value_type,
182  unit=transcoder.unit,
183  value=value,
184  )
None __init__(self, HomeAssistant hass, XKNX xknx, KNXProject project, int log_size)
Definition: telegrams.py:64
TelegramDict telegram_to_dict(self, Telegram telegram)
Definition: telegrams.py:110
None _xknx_telegram_cb(self, Telegram telegram)
Definition: telegrams.py:101
bool|str|int|float|dict[str, str|int|float|bool] _serializable_decoded_data(bool|float|str|DPTComplexData|DPTEnumData value)
Definition: telegrams.py:158
DecodedTelegramPayload decode_telegram_payload(DPTArray|DPTBinary payload, type[DPTBase] transcoder)
Definition: telegrams.py:169
None async_load(HomeAssistant hass)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
None async_remove(HomeAssistant hass, str intent_type)
Definition: intent.py:90
None async_save(self, _T data)
Definition: storage.py:424