1 """Handle MySensors messages."""
3 from __future__
import annotations
5 from collections.abc
import Callable
7 from mysensors
import Message
8 from mysensors.const
import SYSTEM_CHILD_ID
15 from .const
import CHILD_CALLBACK, NODE_CALLBACK, DevId, GatewayId
16 from .entity
import get_mysensors_devices
17 from .helpers
import (
18 discover_mysensors_node,
19 discover_mysensors_platform,
23 HANDLERS: decorator.Registry[
24 str, Callable[[HomeAssistant, GatewayId, Message],
None]
25 ] = decorator.Registry()
28 @HANDLERS.register("set")
30 def handle_set(hass: HomeAssistant, gateway_id: GatewayId, msg: Message) ->
None:
31 """Handle a mysensors set message."""
36 @HANDLERS.register("internal")
38 def handle_internal(hass: HomeAssistant, gateway_id: GatewayId, msg: Message) ->
None:
39 """Handle a mysensors internal message."""
40 internal = msg.gateway.const.Internal(msg.sub_type)
41 if (handler := HANDLERS.get(internal.name))
is None:
43 handler(hass, gateway_id, msg)
46 @HANDLERS.register("I_BATTERY_LEVEL")
49 hass: HomeAssistant, gateway_id: GatewayId, msg: Message
51 """Handle an internal battery level message."""
55 @HANDLERS.register("I_HEARTBEAT_RESPONSE")
57 def handle_heartbeat(hass: HomeAssistant, gateway_id: GatewayId, msg: Message) ->
None:
58 """Handle an heartbeat."""
62 @HANDLERS.register("I_SKETCH_NAME")
65 hass: HomeAssistant, gateway_id: GatewayId, msg: Message
67 """Handle an internal sketch name message."""
71 @HANDLERS.register("I_SKETCH_VERSION")
74 hass: HomeAssistant, gateway_id: GatewayId, msg: Message
76 """Handle an internal sketch version message."""
80 @HANDLERS.register("presentation")
83 hass: HomeAssistant, gateway_id: GatewayId, msg: Message
85 """Handle an internal presentation message."""
86 if msg.child_id == SYSTEM_CHILD_ID:
92 hass: HomeAssistant, gateway_id: GatewayId, validated: dict[Platform, list[DevId]]
94 """Handle a child update."""
95 signals: list[str] = []
99 for platform, dev_ids
in validated.items():
101 new_dev_ids: list[DevId] = []
102 for dev_id
in dev_ids:
103 if dev_id
in devices:
104 signals.append(CHILD_CALLBACK.format(*dev_id))
106 new_dev_ids.append(dev_id)
109 for signal
in set(signals):
117 hass: HomeAssistant, gateway_id: GatewayId, msg: Message
119 """Handle a node update."""
120 signal = NODE_CALLBACK.format(gateway_id, msg.node_id)
dict[DevId, MySensorsChildEntity] get_mysensors_devices(HomeAssistant hass, Platform domain)
None handle_sketch_version(HomeAssistant hass, GatewayId gateway_id, Message msg)
None handle_heartbeat(HomeAssistant hass, GatewayId gateway_id, Message msg)
None _handle_node_update(HomeAssistant hass, GatewayId gateway_id, Message msg)
None handle_set(HomeAssistant hass, GatewayId gateway_id, Message msg)
None handle_presentation(HomeAssistant hass, GatewayId gateway_id, Message msg)
None handle_internal(HomeAssistant hass, GatewayId gateway_id, Message msg)
None handle_battery_level(HomeAssistant hass, GatewayId gateway_id, Message msg)
None _handle_child_update(HomeAssistant hass, GatewayId gateway_id, dict[Platform, list[DevId]] validated)
None handle_sketch_name(HomeAssistant hass, GatewayId gateway_id, Message msg)
None discover_mysensors_platform(HomeAssistant hass, GatewayId gateway_id, str platform, list[DevId] new_devices)
None discover_mysensors_node(HomeAssistant hass, GatewayId gateway_id, int node_id)
dict[Platform, list[DevId]] validate_set_msg(GatewayId gateway_id, Message msg)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)