Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Publish simple item state changes via MQTT."""
2 
3 import json
4 import logging
5 
6 import voluptuous as vol
7 
8 from homeassistant.components import mqtt
9 from homeassistant.components.mqtt import valid_publish_topic
10 from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED
11 from homeassistant.core import Event, EventStateChangedData, HomeAssistant, callback
14  INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
15  convert_include_exclude_filter,
16 )
17 from homeassistant.helpers.json import JSONEncoder
18 from homeassistant.helpers.start import async_at_start
19 from homeassistant.helpers.typing import ConfigType
20 
21 CONF_BASE_TOPIC = "base_topic"
22 CONF_PUBLISH_ATTRIBUTES = "publish_attributes"
23 CONF_PUBLISH_TIMESTAMPS = "publish_timestamps"
24 
25 DOMAIN = "mqtt_statestream"
26 
27 CONFIG_SCHEMA = vol.Schema(
28  {
29  DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
30  {
31  vol.Required(CONF_BASE_TOPIC): valid_publish_topic,
32  vol.Optional(CONF_PUBLISH_ATTRIBUTES, default=False): cv.boolean,
33  vol.Optional(CONF_PUBLISH_TIMESTAMPS, default=False): cv.boolean,
34  }
35  ),
36  },
37  extra=vol.ALLOW_EXTRA,
38 )
39 
40 _LOGGER = logging.getLogger(__name__)
41 
42 
43 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
44  """Set up the MQTT state feed."""
45  # Make sure MQTT integration is enabled and the client is available
46  if not await mqtt.async_wait_for_mqtt_client(hass):
47  _LOGGER.error("MQTT integration is not available")
48  return False
49 
50  conf: ConfigType = config[DOMAIN]
51  publish_filter = convert_include_exclude_filter(conf)
52  base_topic: str = conf[CONF_BASE_TOPIC]
53  publish_attributes: bool = conf[CONF_PUBLISH_ATTRIBUTES]
54  publish_timestamps: bool = conf[CONF_PUBLISH_TIMESTAMPS]
55  if not base_topic.endswith("/"):
56  base_topic = f"{base_topic}/"
57 
58  async def _state_publisher(evt: Event[EventStateChangedData]) -> None:
59  entity_id = evt.data["entity_id"]
60  new_state = evt.data["new_state"]
61  assert new_state
62 
63  payload = new_state.state
64 
65  mybase = f"{base_topic}{entity_id.replace('.', '/')}/"
66  await mqtt.async_publish(hass, f"{mybase}state", payload, 1, True)
67 
68  if publish_timestamps:
69  if new_state.last_updated:
70  await mqtt.async_publish(
71  hass,
72  f"{mybase}last_updated",
73  new_state.last_updated.isoformat(),
74  1,
75  True,
76  )
77  if new_state.last_changed:
78  await mqtt.async_publish(
79  hass,
80  f"{mybase}last_changed",
81  new_state.last_changed.isoformat(),
82  1,
83  True,
84  )
85 
86  if publish_attributes:
87  for key, val in new_state.attributes.items():
88  encoded_val = json.dumps(val, cls=JSONEncoder)
89  await mqtt.async_publish(hass, mybase + key, encoded_val, 1, True)
90 
91  @callback
92  def _ha_started(hass: HomeAssistant) -> None:
93  @callback
94  def _event_filter(event_data: EventStateChangedData) -> bool:
95  entity_id = event_data["entity_id"]
96  new_state = event_data["new_state"]
97  if new_state is None:
98  return False
99  if not publish_filter(entity_id):
100  return False
101  return True
102 
103  callback_handler = hass.bus.async_listen(
104  EVENT_STATE_CHANGED, _state_publisher, _event_filter
105  )
106 
107  @callback
108  def _ha_stopping(_: Event) -> None:
109  callback_handler()
110 
111  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _ha_stopping)
112 
113  async_at_start(hass, _ha_started)
114 
115  return True
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:43
EntityFilter convert_include_exclude_filter(dict[str, dict[str, list[str]]] config)
CALLBACK_TYPE async_at_start(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)
Definition: start.py:61