Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Connect two Home Assistant instances via MQTT."""
2 
3 import json
4 import logging
5 
6 import voluptuous as vol
7 
8 from homeassistant.components import mqtt
9 from homeassistant.components.mqtt import valid_publish_topic, valid_subscribe_topic
10 from homeassistant.const import (
11  ATTR_SERVICE_DATA,
12  EVENT_CALL_SERVICE,
13  EVENT_HOMEASSISTANT_CLOSE,
14  EVENT_HOMEASSISTANT_FINAL_WRITE,
15  EVENT_HOMEASSISTANT_START,
16  EVENT_HOMEASSISTANT_STARTED,
17  EVENT_HOMEASSISTANT_STOP,
18  EVENT_STATE_CHANGED,
19  MATCH_ALL,
20 )
21 from homeassistant.core import EventOrigin, HomeAssistant, State, callback
23 from homeassistant.helpers.json import JSONEncoder
24 from homeassistant.helpers.typing import ConfigType
25 
26 _LOGGER = logging.getLogger(__name__)
27 
28 DOMAIN = "mqtt_eventstream"
29 CONF_PUBLISH_TOPIC = "publish_topic"
30 CONF_SUBSCRIBE_TOPIC = "subscribe_topic"
31 CONF_PUBLISH_EVENTSTREAM_RECEIVED = "publish_eventstream_received"
32 CONF_IGNORE_EVENT = "ignore_event"
33 
34 CONFIG_SCHEMA = vol.Schema(
35  {
36  DOMAIN: vol.Schema(
37  {
38  vol.Optional(CONF_PUBLISH_TOPIC): valid_publish_topic,
39  vol.Optional(CONF_SUBSCRIBE_TOPIC): valid_subscribe_topic,
40  vol.Optional(
41  CONF_PUBLISH_EVENTSTREAM_RECEIVED, default=False
42  ): cv.boolean,
43  vol.Optional(CONF_IGNORE_EVENT, default=[]): cv.ensure_list,
44  }
45  )
46  },
47  extra=vol.ALLOW_EXTRA,
48 )
49 
50 BLOCKED_EVENTS = [
51  EVENT_HOMEASSISTANT_CLOSE,
52  EVENT_HOMEASSISTANT_START,
53  EVENT_HOMEASSISTANT_STARTED,
54  EVENT_HOMEASSISTANT_STOP,
55  EVENT_HOMEASSISTANT_FINAL_WRITE,
56 ]
57 
58 
59 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
60  """Set up the MQTT eventstream component."""
61  # Make sure MQTT integration is enabled and the client is available
62  if not await mqtt.async_wait_for_mqtt_client(hass):
63  _LOGGER.error("MQTT integration is not available")
64  return False
65 
66  conf = config.get(DOMAIN, {})
67  pub_topic = conf.get(CONF_PUBLISH_TOPIC)
68  sub_topic = conf.get(CONF_SUBSCRIBE_TOPIC)
69  ignore_event = conf.get(CONF_IGNORE_EVENT)
70 
71  async def _event_publisher(event):
72  """Handle events by publishing them on the MQTT queue."""
73  if event.origin != EventOrigin.local:
74  return
75 
76  # Events to ignore
77  if event.event_type in ignore_event:
78  return
79 
80  # Filter out the events that were triggered by publishing
81  # to the MQTT topic, or you will end up in an infinite loop.
82  if (
83  event.event_type == EVENT_CALL_SERVICE
84  and event.data.get("domain") == mqtt.DOMAIN
85  and event.data.get("service") == mqtt.SERVICE_PUBLISH
86  and event.data[ATTR_SERVICE_DATA].get("topic") == pub_topic
87  ):
88  return
89 
90  event_info = {"event_type": event.event_type, "event_data": event.data}
91  msg = json.dumps(event_info, cls=JSONEncoder)
92  await mqtt.async_publish(hass, pub_topic, msg)
93 
94  # Only listen for local events if you are going to publish them.
95  if pub_topic:
96  hass.bus.async_listen(MATCH_ALL, _event_publisher)
97 
98  # Process events from a remote server that are received on a queue.
99  @callback
100  def _event_receiver(msg):
101  """Receive events published by and fire them on this hass instance."""
102  event = json.loads(msg.payload)
103  event_type = event.get("event_type")
104  event_data = event.get("event_data")
105 
106  # Don't fire HOMEASSISTANT_* events on this instance
107  if event_type in BLOCKED_EVENTS:
108  return
109 
110  # Special case handling for event STATE_CHANGED
111  # We will try to convert state dicts back to State objects
112  # Copied over from the _handle_api_post_events_event method
113  # of the api component.
114  if event_type == EVENT_STATE_CHANGED and event_data:
115  for key in ("old_state", "new_state"):
116  state = State.from_dict(event_data.get(key))
117 
118  if state:
119  event_data[key] = state
120 
121  hass.bus.async_fire(
122  event_type, event_data=event_data, origin=EventOrigin.remote
123  )
124 
125  # Only subscribe if you specified a topic.
126  if sub_topic:
127  await mqtt.async_subscribe(hass, sub_topic, _event_receiver)
128 
129  return True
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:59