Home Assistant Unofficial Reference 2024.12.1
gateway.py
Go to the documentation of this file.
1 """Handle MySensors gateways."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import defaultdict
7 from collections.abc import Callable
8 import logging
9 import socket
10 import sys
11 from typing import Any
12 
13 from mysensors import BaseAsyncGateway, Message, Sensor, get_const, mysensors
14 import voluptuous as vol
15 
17  DOMAIN as MQTT_DOMAIN,
18  ReceiveMessage as MQTTReceiveMessage,
19  async_publish,
20  async_subscribe,
21 )
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.const import CONF_DEVICE, EVENT_HOMEASSISTANT_STOP
24 from homeassistant.core import Event, HomeAssistant, callback
26 from homeassistant.helpers.service_info.mqtt import ReceivePayloadType
27 from homeassistant.setup import SetupPhases, async_pause_setup
28 from homeassistant.util.unit_system import METRIC_SYSTEM
29 
30 from .const import (
31  CONF_BAUD_RATE,
32  CONF_GATEWAY_TYPE,
33  CONF_GATEWAY_TYPE_MQTT,
34  CONF_GATEWAY_TYPE_SERIAL,
35  CONF_PERSISTENCE_FILE,
36  CONF_RETAIN,
37  CONF_TCP_PORT,
38  CONF_TOPIC_IN_PREFIX,
39  CONF_TOPIC_OUT_PREFIX,
40  CONF_VERSION,
41  DOMAIN,
42  MYSENSORS_GATEWAY_START_TASK,
43  ConfGatewayType,
44  GatewayId,
45 )
46 from .handler import HANDLERS
47 from .helpers import (
48  discover_mysensors_node,
49  discover_mysensors_platform,
50  on_unload,
51  validate_child,
52  validate_node,
53 )
54 
55 _LOGGER = logging.getLogger(__name__)
56 
57 GATEWAY_READY_TIMEOUT = 20.0
58 MQTT_COMPONENT = "mqtt"
59 
60 
61 def is_serial_port(value: str) -> str:
62  """Validate that value is a windows serial port or a unix device."""
63  if sys.platform.startswith("win"):
64  ports = (f"COM{idx + 1}" for idx in range(256))
65  if value in ports:
66  return value
67  raise vol.Invalid(f"{value} is not a serial port")
68  return cv.isdevice(value)
69 
70 
71 def is_socket_address(value: str) -> str:
72  """Validate that value is a valid address."""
73  try:
74  socket.getaddrinfo(value, None)
75  except OSError as err:
76  raise vol.Invalid("Device is not a valid domain name or ip address") from err
77  return value
78 
79 
80 async def try_connect(
81  hass: HomeAssistant, gateway_type: ConfGatewayType, user_input: dict[str, Any]
82 ) -> bool:
83  """Try to connect to a gateway and report if it worked."""
84  if gateway_type == "MQTT":
85  return True # Do not validate MQTT, as that does not use connection made.
86  try:
87  gateway_ready = asyncio.Event()
88 
89  def on_conn_made(_: BaseAsyncGateway) -> None:
90  gateway_ready.set()
91 
92  gateway: BaseAsyncGateway | None = await _get_gateway(
93  hass,
94  gateway_type,
95  device=user_input[CONF_DEVICE],
96  version=user_input[CONF_VERSION],
97  event_callback=lambda _: None,
98  persistence_file=None,
99  baud_rate=user_input.get(CONF_BAUD_RATE),
100  tcp_port=user_input.get(CONF_TCP_PORT),
101  topic_in_prefix=None,
102  topic_out_prefix=None,
103  retain=False,
104  persistence=False,
105  )
106  if gateway is None:
107  return False
108  gateway.on_conn_made = on_conn_made
109 
110  connect_task = None
111  try:
112  connect_task = asyncio.create_task(gateway.start())
113  async with asyncio.timeout(GATEWAY_READY_TIMEOUT):
114  await gateway_ready.wait()
115  return True
116  except TimeoutError:
117  _LOGGER.warning("Try gateway connect failed with timeout")
118  return False
119  finally:
120  if connect_task is not None and not connect_task.done():
121  connect_task.cancel()
122  await gateway.stop()
123  except OSError as err:
124  _LOGGER.warning("Try gateway connect failed with exception", exc_info=err)
125  return False
126 
127 
128 async def setup_gateway(
129  hass: HomeAssistant, entry: ConfigEntry
130 ) -> BaseAsyncGateway | None:
131  """Set up the Gateway for the given ConfigEntry."""
132 
133  return await _get_gateway(
134  hass,
135  gateway_type=entry.data[CONF_GATEWAY_TYPE],
136  device=entry.data[CONF_DEVICE],
137  version=entry.data[CONF_VERSION],
138  event_callback=_gw_callback_factory(hass, entry.entry_id),
139  persistence_file=entry.data.get(
140  CONF_PERSISTENCE_FILE, f"mysensors_{entry.entry_id}.json"
141  ),
142  baud_rate=entry.data.get(CONF_BAUD_RATE),
143  tcp_port=entry.data.get(CONF_TCP_PORT),
144  topic_in_prefix=entry.data.get(CONF_TOPIC_IN_PREFIX),
145  topic_out_prefix=entry.data.get(CONF_TOPIC_OUT_PREFIX),
146  retain=entry.data.get(CONF_RETAIN, False),
147  )
148 
149 
150 async def _get_gateway(
151  hass: HomeAssistant,
152  gateway_type: ConfGatewayType,
153  device: str,
154  version: str,
155  event_callback: Callable[[Message], None],
156  persistence_file: str | None = None,
157  baud_rate: int | None = None,
158  tcp_port: int | None = None,
159  topic_in_prefix: str | None = None,
160  topic_out_prefix: str | None = None,
161  retain: bool = False,
162  persistence: bool = True,
163 ) -> BaseAsyncGateway | None:
164  """Return gateway after setup of the gateway."""
165 
166  with async_pause_setup(hass, SetupPhases.WAIT_IMPORT_PACKAGES):
167  # get_const will import a const module based on the version
168  # so we need to import it here to avoid it being imported
169  # in the event loop
170  await hass.async_add_import_executor_job(get_const, version)
171 
172  if persistence_file is not None:
173  # Interpret relative paths to be in hass config folder.
174  # Absolute paths will be left as they are.
175  persistence_file = hass.config.path(persistence_file)
176 
177  if gateway_type == CONF_GATEWAY_TYPE_MQTT:
178  # Make sure the mqtt integration is set up.
179  # Naive check that doesn't consider config entry state.
180  if MQTT_DOMAIN not in hass.config.components:
181  return None
182 
183  def pub_callback(topic: str, payload: str, qos: int, retain: bool) -> None:
184  """Call MQTT publish function."""
185  hass.async_create_task(async_publish(hass, topic, payload, qos, retain))
186 
187  def sub_callback(
188  topic: str, sub_cb: Callable[[str, ReceivePayloadType, int], None], qos: int
189  ) -> None:
190  """Call MQTT subscribe function."""
191 
192  @callback
193  def internal_callback(msg: MQTTReceiveMessage) -> None:
194  """Call callback."""
195  sub_cb(msg.topic, msg.payload, msg.qos)
196 
197  hass.async_create_task(async_subscribe(hass, topic, internal_callback, qos))
198 
199  gateway = mysensors.AsyncMQTTGateway(
200  pub_callback,
201  sub_callback,
202  in_prefix=topic_in_prefix,
203  out_prefix=topic_out_prefix,
204  retain=retain,
205  event_callback=None,
206  persistence=persistence,
207  persistence_file=persistence_file,
208  protocol_version=version,
209  )
210  elif gateway_type == CONF_GATEWAY_TYPE_SERIAL:
211  gateway = mysensors.AsyncSerialGateway(
212  device,
213  baud=baud_rate,
214  event_callback=None,
215  persistence=persistence,
216  persistence_file=persistence_file,
217  protocol_version=version,
218  )
219  else:
220  gateway = mysensors.AsyncTCPGateway(
221  device,
222  port=tcp_port,
223  event_callback=None,
224  persistence=persistence,
225  persistence_file=persistence_file,
226  protocol_version=version,
227  )
228  gateway.event_callback = event_callback
229  gateway.metric = hass.config.units is METRIC_SYSTEM
230 
231  if persistence:
232  await gateway.start_persistence()
233 
234  return gateway
235 
236 
237 async def finish_setup(
238  hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
239 ) -> None:
240  """Load any persistent devices and platforms and start gateway."""
241  await _discover_persistent_devices(hass, entry, gateway)
242  await _gw_start(hass, entry, gateway)
243 
244 
246  hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
247 ) -> None:
248  """Discover platforms for devices loaded via persistence file."""
249  new_devices = defaultdict(list)
250  for node_id in gateway.sensors:
251  if not validate_node(gateway, node_id):
252  continue
253  discover_mysensors_node(hass, entry.entry_id, node_id)
254  node: Sensor = gateway.sensors[node_id]
255  for child in node.children.values(): # child is of type ChildSensor
256  validated = validate_child(entry.entry_id, gateway, node_id, child)
257  for platform, dev_ids in validated.items():
258  new_devices[platform].extend(dev_ids)
259  _LOGGER.debug("discovering persistent devices: %s", new_devices)
260  for platform, dev_ids in new_devices.items():
261  discover_mysensors_platform(hass, entry.entry_id, platform, dev_ids)
262 
263 
264 async def gw_stop(
265  hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
266 ) -> None:
267  """Stop the gateway."""
268  connect_task = hass.data[DOMAIN].pop(
269  MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id), None
270  )
271  if connect_task is not None and not connect_task.done():
272  connect_task.cancel()
273  await gateway.stop()
274 
275 
276 async def _gw_start(
277  hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
278 ) -> None:
279  """Start the gateway."""
280  gateway_ready = asyncio.Event()
281 
282  def gateway_connected(_: BaseAsyncGateway) -> None:
283  """Handle gateway connected."""
284  gateway_ready.set()
285 
286  gateway.on_conn_made = gateway_connected
287  # Don't use hass.async_create_task to avoid holding up setup indefinitely.
288  hass.data[DOMAIN][MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id)] = (
289  asyncio.create_task(gateway.start())
290  ) # store the connect task so it can be cancelled in gw_stop
291 
292  async def stop_this_gw(_: Event) -> None:
293  """Stop the gateway."""
294  await gw_stop(hass, entry, gateway)
295 
296  on_unload(
297  hass,
298  entry.entry_id,
299  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_this_gw),
300  )
301 
302  if entry.data[CONF_DEVICE] == MQTT_COMPONENT:
303  # Gatways connected via mqtt doesn't send gateway ready message.
304  return
305  try:
306  async with asyncio.timeout(GATEWAY_READY_TIMEOUT):
307  await gateway_ready.wait()
308  except TimeoutError:
309  _LOGGER.warning(
310  "Gateway %s not connected after %s secs so continuing with setup",
311  entry.data[CONF_DEVICE],
312  GATEWAY_READY_TIMEOUT,
313  )
314 
315 
317  hass: HomeAssistant, gateway_id: GatewayId
318 ) -> Callable[[Message], None]:
319  """Return a new callback for the gateway."""
320 
321  @callback
322  def mysensors_callback(msg: Message) -> None:
323  """Handle messages from a MySensors gateway.
324 
325  All MySenors messages are received here.
326  The messages are passed to handler functions depending on their type.
327  """
328  _LOGGER.debug("Node update: node %s child %s", msg.node_id, msg.child_id)
329 
330  msg_type = msg.gateway.const.MessageType(msg.type)
331  msg_handler = HANDLERS.get(msg_type.name)
332 
333  if msg_handler is None:
334  return
335 
336  msg_handler(hass, gateway_id, msg)
337 
338  return mysensors_callback
CALLBACK_TYPE async_subscribe(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:194
None async_publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
Definition: client.py:144
bool try_connect(HomeAssistant hass, ConfGatewayType gateway_type, dict[str, Any] user_input)
Definition: gateway.py:82
BaseAsyncGateway|None setup_gateway(HomeAssistant hass, ConfigEntry entry)
Definition: gateway.py:130
None _gw_start(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
Definition: gateway.py:278
Callable[[Message], None] _gw_callback_factory(HomeAssistant hass, GatewayId gateway_id)
Definition: gateway.py:318
BaseAsyncGateway|None _get_gateway(HomeAssistant hass, ConfGatewayType gateway_type, str device, str version, Callable[[Message], None] event_callback, str|None persistence_file=None, int|None baud_rate=None, int|None tcp_port=None, str|None topic_in_prefix=None, str|None topic_out_prefix=None, bool retain=False, bool persistence=True)
Definition: gateway.py:163
None finish_setup(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
Definition: gateway.py:239
None _discover_persistent_devices(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
Definition: gateway.py:247
None gw_stop(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
Definition: gateway.py:266
None on_unload(HomeAssistant hass, GatewayId gateway_id, Callable fnct)
Definition: helpers.py:45
bool validate_node(BaseAsyncGateway gateway, int node_id)
Definition: helpers.py:186
None discover_mysensors_platform(HomeAssistant hass, GatewayId gateway_id, str platform, list[DevId] new_devices)
Definition: helpers.py:59
None discover_mysensors_node(HomeAssistant hass, GatewayId gateway_id, int node_id)
Definition: helpers.py:76
defaultdict[Platform, list[DevId]] validate_child(GatewayId gateway_id, BaseAsyncGateway gateway, int node_id, ChildSensor child, int|None value_type=None)
Definition: helpers.py:200
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)
Definition: setup.py:691