1 """Connect two Home Assistant instances via MQTT."""
6 import voluptuous
as vol
13 EVENT_HOMEASSISTANT_CLOSE,
14 EVENT_HOMEASSISTANT_FINAL_WRITE,
15 EVENT_HOMEASSISTANT_START,
16 EVENT_HOMEASSISTANT_STARTED,
17 EVENT_HOMEASSISTANT_STOP,
26 _LOGGER = logging.getLogger(__name__)
28 DOMAIN =
"mqtt_eventstream"
29 CONF_PUBLISH_TOPIC =
"publish_topic"
30 CONF_SUBSCRIBE_TOPIC =
"subscribe_topic"
31 CONF_PUBLISH_EVENTSTREAM_RECEIVED =
"publish_eventstream_received"
32 CONF_IGNORE_EVENT =
"ignore_event"
34 CONFIG_SCHEMA = vol.Schema(
38 vol.Optional(CONF_PUBLISH_TOPIC): valid_publish_topic,
39 vol.Optional(CONF_SUBSCRIBE_TOPIC): valid_subscribe_topic,
41 CONF_PUBLISH_EVENTSTREAM_RECEIVED, default=
False
43 vol.Optional(CONF_IGNORE_EVENT, default=[]): cv.ensure_list,
47 extra=vol.ALLOW_EXTRA,
51 EVENT_HOMEASSISTANT_CLOSE,
52 EVENT_HOMEASSISTANT_START,
53 EVENT_HOMEASSISTANT_STARTED,
54 EVENT_HOMEASSISTANT_STOP,
55 EVENT_HOMEASSISTANT_FINAL_WRITE,
59 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
60 """Set up the MQTT eventstream component."""
62 if not await mqtt.async_wait_for_mqtt_client(hass):
63 _LOGGER.error(
"MQTT integration is not available")
66 conf = config.get(DOMAIN, {})
67 pub_topic = conf.get(CONF_PUBLISH_TOPIC)
68 sub_topic = conf.get(CONF_SUBSCRIBE_TOPIC)
69 ignore_event = conf.get(CONF_IGNORE_EVENT)
71 async
def _event_publisher(event):
72 """Handle events by publishing them on the MQTT queue."""
73 if event.origin != EventOrigin.local:
77 if event.event_type
in ignore_event:
83 event.event_type == EVENT_CALL_SERVICE
84 and event.data.get(
"domain") == mqtt.DOMAIN
85 and event.data.get(
"service") == mqtt.SERVICE_PUBLISH
86 and event.data[ATTR_SERVICE_DATA].
get(
"topic") == pub_topic
90 event_info = {
"event_type": event.event_type,
"event_data": event.data}
91 msg = json.dumps(event_info, cls=JSONEncoder)
92 await mqtt.async_publish(hass, pub_topic, msg)
96 hass.bus.async_listen(MATCH_ALL, _event_publisher)
100 def _event_receiver(msg):
101 """Receive events published by and fire them on this hass instance."""
102 event = json.loads(msg.payload)
103 event_type = event.get(
"event_type")
104 event_data = event.get(
"event_data")
107 if event_type
in BLOCKED_EVENTS:
114 if event_type == EVENT_STATE_CHANGED
and event_data:
115 for key
in (
"old_state",
"new_state"):
116 state = State.from_dict(event_data.get(key))
119 event_data[key] = state
122 event_type, event_data=event_data, origin=EventOrigin.remote
127 await mqtt.async_subscribe(hass, sub_topic, _event_receiver)
web.Response get(self, web.Request request, str config_key)
bool async_setup(HomeAssistant hass, ConfigType config)