1 """Handle MySensors gateways."""
3 from __future__
import annotations
6 from collections
import defaultdict
7 from collections.abc
import Callable
11 from typing
import Any
13 from mysensors
import BaseAsyncGateway, Message, Sensor, get_const, mysensors
14 import voluptuous
as vol
17 DOMAIN
as MQTT_DOMAIN,
18 ReceiveMessage
as MQTTReceiveMessage,
33 CONF_GATEWAY_TYPE_MQTT,
34 CONF_GATEWAY_TYPE_SERIAL,
35 CONF_PERSISTENCE_FILE,
39 CONF_TOPIC_OUT_PREFIX,
42 MYSENSORS_GATEWAY_START_TASK,
46 from .handler
import HANDLERS
47 from .helpers
import (
48 discover_mysensors_node,
49 discover_mysensors_platform,
55 _LOGGER = logging.getLogger(__name__)
57 GATEWAY_READY_TIMEOUT = 20.0
58 MQTT_COMPONENT =
"mqtt"
62 """Validate that value is a windows serial port or a unix device."""
63 if sys.platform.startswith(
"win"):
64 ports = (f
"COM{idx + 1}" for idx
in range(256))
67 raise vol.Invalid(f
"{value} is not a serial port")
68 return cv.isdevice(value)
72 """Validate that value is a valid address."""
74 socket.getaddrinfo(value,
None)
75 except OSError
as err:
76 raise vol.Invalid(
"Device is not a valid domain name or ip address")
from err
81 hass: HomeAssistant, gateway_type: ConfGatewayType, user_input: dict[str, Any]
83 """Try to connect to a gateway and report if it worked."""
84 if gateway_type ==
"MQTT":
87 gateway_ready = asyncio.Event()
89 def on_conn_made(_: BaseAsyncGateway) ->
None:
95 device=user_input[CONF_DEVICE],
96 version=user_input[CONF_VERSION],
97 event_callback=
lambda _:
None,
98 persistence_file=
None,
99 baud_rate=user_input.get(CONF_BAUD_RATE),
100 tcp_port=user_input.get(CONF_TCP_PORT),
101 topic_in_prefix=
None,
102 topic_out_prefix=
None,
108 gateway.on_conn_made = on_conn_made
112 connect_task = asyncio.create_task(gateway.start())
113 async
with asyncio.timeout(GATEWAY_READY_TIMEOUT):
114 await gateway_ready.wait()
117 _LOGGER.warning(
"Try gateway connect failed with timeout")
120 if connect_task
is not None and not connect_task.done():
121 connect_task.cancel()
123 except OSError
as err:
124 _LOGGER.warning(
"Try gateway connect failed with exception", exc_info=err)
129 hass: HomeAssistant, entry: ConfigEntry
130 ) -> BaseAsyncGateway |
None:
131 """Set up the Gateway for the given ConfigEntry."""
135 gateway_type=entry.data[CONF_GATEWAY_TYPE],
136 device=entry.data[CONF_DEVICE],
137 version=entry.data[CONF_VERSION],
139 persistence_file=entry.data.get(
140 CONF_PERSISTENCE_FILE, f
"mysensors_{entry.entry_id}.json"
142 baud_rate=entry.data.get(CONF_BAUD_RATE),
143 tcp_port=entry.data.get(CONF_TCP_PORT),
144 topic_in_prefix=entry.data.get(CONF_TOPIC_IN_PREFIX),
145 topic_out_prefix=entry.data.get(CONF_TOPIC_OUT_PREFIX),
146 retain=entry.data.get(CONF_RETAIN,
False),
152 gateway_type: ConfGatewayType,
155 event_callback: Callable[[Message],
None],
156 persistence_file: str |
None =
None,
157 baud_rate: int |
None =
None,
158 tcp_port: int |
None =
None,
159 topic_in_prefix: str |
None =
None,
160 topic_out_prefix: str |
None =
None,
161 retain: bool =
False,
162 persistence: bool =
True,
163 ) -> BaseAsyncGateway |
None:
164 """Return gateway after setup of the gateway."""
170 await hass.async_add_import_executor_job(get_const, version)
172 if persistence_file
is not None:
175 persistence_file = hass.config.path(persistence_file)
177 if gateway_type == CONF_GATEWAY_TYPE_MQTT:
180 if MQTT_DOMAIN
not in hass.config.components:
183 def pub_callback(topic: str, payload: str, qos: int, retain: bool) ->
None:
184 """Call MQTT publish function."""
185 hass.async_create_task(
async_publish(hass, topic, payload, qos, retain))
188 topic: str, sub_cb: Callable[[str, ReceivePayloadType, int],
None], qos: int
190 """Call MQTT subscribe function."""
193 def internal_callback(msg: MQTTReceiveMessage) ->
None:
195 sub_cb(msg.topic, msg.payload, msg.qos)
197 hass.async_create_task(
async_subscribe(hass, topic, internal_callback, qos))
199 gateway = mysensors.AsyncMQTTGateway(
202 in_prefix=topic_in_prefix,
203 out_prefix=topic_out_prefix,
206 persistence=persistence,
207 persistence_file=persistence_file,
208 protocol_version=version,
210 elif gateway_type == CONF_GATEWAY_TYPE_SERIAL:
211 gateway = mysensors.AsyncSerialGateway(
215 persistence=persistence,
216 persistence_file=persistence_file,
217 protocol_version=version,
220 gateway = mysensors.AsyncTCPGateway(
224 persistence=persistence,
225 persistence_file=persistence_file,
226 protocol_version=version,
228 gateway.event_callback = event_callback
229 gateway.metric = hass.config.units
is METRIC_SYSTEM
232 await gateway.start_persistence()
238 hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
240 """Load any persistent devices and platforms and start gateway."""
246 hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
248 """Discover platforms for devices loaded via persistence file."""
249 new_devices = defaultdict(list)
250 for node_id
in gateway.sensors:
254 node: Sensor = gateway.sensors[node_id]
255 for child
in node.children.values():
256 validated =
validate_child(entry.entry_id, gateway, node_id, child)
257 for platform, dev_ids
in validated.items():
258 new_devices[platform].extend(dev_ids)
259 _LOGGER.debug(
"discovering persistent devices: %s", new_devices)
260 for platform, dev_ids
in new_devices.items():
265 hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
267 """Stop the gateway."""
268 connect_task = hass.data[DOMAIN].pop(
269 MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id),
None
271 if connect_task
is not None and not connect_task.done():
272 connect_task.cancel()
277 hass: HomeAssistant, entry: ConfigEntry, gateway: BaseAsyncGateway
279 """Start the gateway."""
280 gateway_ready = asyncio.Event()
282 def gateway_connected(_: BaseAsyncGateway) ->
None:
283 """Handle gateway connected."""
286 gateway.on_conn_made = gateway_connected
288 hass.data[DOMAIN][MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id)] = (
289 asyncio.create_task(gateway.start())
292 async
def stop_this_gw(_: Event) ->
None:
293 """Stop the gateway."""
294 await
gw_stop(hass, entry, gateway)
299 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_this_gw),
302 if entry.data[CONF_DEVICE] == MQTT_COMPONENT:
306 async
with asyncio.timeout(GATEWAY_READY_TIMEOUT):
307 await gateway_ready.wait()
310 "Gateway %s not connected after %s secs so continuing with setup",
311 entry.data[CONF_DEVICE],
312 GATEWAY_READY_TIMEOUT,
317 hass: HomeAssistant, gateway_id: GatewayId
318 ) -> Callable[[Message],
None]:
319 """Return a new callback for the gateway."""
322 def mysensors_callback(msg: Message) ->
None:
323 """Handle messages from a MySensors gateway.
325 All MySenors messages are received here.
326 The messages are passed to handler functions depending on their type.
328 _LOGGER.debug(
"Node update: node %s child %s", msg.node_id, msg.child_id)
330 msg_type = msg.gateway.const.MessageType(msg.type)
331 msg_handler = HANDLERS.get(msg_type.name)
333 if msg_handler
is None:
336 msg_handler(hass, gateway_id, msg)
338 return mysensors_callback
CALLBACK_TYPE async_subscribe(HomeAssistant hass, str topic, Callable[[ReceiveMessage], Coroutine[Any, Any, None]|None] msg_callback, int qos=DEFAULT_QOS, str|None encoding=DEFAULT_ENCODING)
None async_publish(HomeAssistant hass, str topic, PublishPayloadType payload, int|None qos=0, bool|None retain=False, str|None encoding=DEFAULT_ENCODING)
bool try_connect(HomeAssistant hass, ConfGatewayType gateway_type, dict[str, Any] user_input)
str is_socket_address(str value)
str is_serial_port(str value)
BaseAsyncGateway|None setup_gateway(HomeAssistant hass, ConfigEntry entry)
None _gw_start(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
Callable[[Message], None] _gw_callback_factory(HomeAssistant hass, GatewayId gateway_id)
BaseAsyncGateway|None _get_gateway(HomeAssistant hass, ConfGatewayType gateway_type, str device, str version, Callable[[Message], None] event_callback, str|None persistence_file=None, int|None baud_rate=None, int|None tcp_port=None, str|None topic_in_prefix=None, str|None topic_out_prefix=None, bool retain=False, bool persistence=True)
None finish_setup(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
None _discover_persistent_devices(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
None gw_stop(HomeAssistant hass, ConfigEntry entry, BaseAsyncGateway gateway)
None on_unload(HomeAssistant hass, GatewayId gateway_id, Callable fnct)
bool validate_node(BaseAsyncGateway gateway, int node_id)
None discover_mysensors_platform(HomeAssistant hass, GatewayId gateway_id, str platform, list[DevId] new_devices)
None discover_mysensors_node(HomeAssistant hass, GatewayId gateway_id, int node_id)
defaultdict[Platform, list[DevId]] validate_child(GatewayId gateway_id, BaseAsyncGateway gateway, int node_id, ChildSensor child, int|None value_type=None)
Generator[None] async_pause_setup(core.HomeAssistant hass, SetupPhases phase)