1 """Publish simple item state changes via MQTT."""
6 import voluptuous
as vol
14 INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
15 convert_include_exclude_filter,
21 CONF_BASE_TOPIC =
"base_topic"
22 CONF_PUBLISH_ATTRIBUTES =
"publish_attributes"
23 CONF_PUBLISH_TIMESTAMPS =
"publish_timestamps"
25 DOMAIN =
"mqtt_statestream"
27 CONFIG_SCHEMA = vol.Schema(
29 DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
31 vol.Required(CONF_BASE_TOPIC): valid_publish_topic,
32 vol.Optional(CONF_PUBLISH_ATTRIBUTES, default=
False): cv.boolean,
33 vol.Optional(CONF_PUBLISH_TIMESTAMPS, default=
False): cv.boolean,
37 extra=vol.ALLOW_EXTRA,
40 _LOGGER = logging.getLogger(__name__)
43 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
44 """Set up the MQTT state feed."""
46 if not await mqtt.async_wait_for_mqtt_client(hass):
47 _LOGGER.error(
"MQTT integration is not available")
50 conf: ConfigType = config[DOMAIN]
52 base_topic: str = conf[CONF_BASE_TOPIC]
53 publish_attributes: bool = conf[CONF_PUBLISH_ATTRIBUTES]
54 publish_timestamps: bool = conf[CONF_PUBLISH_TIMESTAMPS]
55 if not base_topic.endswith(
"/"):
56 base_topic = f
"{base_topic}/"
58 async
def _state_publisher(evt: Event[EventStateChangedData]) ->
None:
59 entity_id = evt.data[
"entity_id"]
60 new_state = evt.data[
"new_state"]
63 payload = new_state.state
65 mybase = f
"{base_topic}{entity_id.replace('.', '/')}/"
66 await mqtt.async_publish(hass, f
"{mybase}state", payload, 1,
True)
68 if publish_timestamps:
69 if new_state.last_updated:
70 await mqtt.async_publish(
72 f
"{mybase}last_updated",
73 new_state.last_updated.isoformat(),
77 if new_state.last_changed:
78 await mqtt.async_publish(
80 f
"{mybase}last_changed",
81 new_state.last_changed.isoformat(),
86 if publish_attributes:
87 for key, val
in new_state.attributes.items():
88 encoded_val = json.dumps(val, cls=JSONEncoder)
89 await mqtt.async_publish(hass, mybase + key, encoded_val, 1,
True)
92 def _ha_started(hass: HomeAssistant) ->
None:
94 def _event_filter(event_data: EventStateChangedData) -> bool:
95 entity_id = event_data[
"entity_id"]
96 new_state = event_data[
"new_state"]
99 if not publish_filter(entity_id):
103 callback_handler = hass.bus.async_listen(
104 EVENT_STATE_CHANGED, _state_publisher, _event_filter
108 def _ha_stopping(_: Event) ->
None:
111 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _ha_stopping)
bool async_setup(HomeAssistant hass, ConfigType config)
EntityFilter convert_include_exclude_filter(dict[str, dict[str, list[str]]] config)
CALLBACK_TYPE async_at_start(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)