1 """Support for Apache Kafka."""
3 from __future__
import annotations
5 from datetime
import datetime
7 from typing
import Any, Literal
9 from aiokafka
import AIOKafkaProducer
10 import voluptuous
as vol
17 EVENT_HOMEASSISTANT_STOP,
26 DOMAIN =
"apache_kafka"
28 CONF_FILTER =
"filter"
30 CONF_SECURITY_PROTOCOL =
"security_protocol"
32 CONFIG_SCHEMA = vol.Schema(
36 vol.Required(CONF_IP_ADDRESS): cv.string,
37 vol.Required(CONF_PORT): cv.port,
38 vol.Required(CONF_TOPIC): cv.string,
39 vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
40 vol.Optional(CONF_SECURITY_PROTOCOL, default=
"PLAINTEXT"): vol.In(
41 [
"PLAINTEXT",
"SSL",
"SASL_SSL"]
43 vol.Optional(CONF_USERNAME): cv.string,
44 vol.Optional(CONF_PASSWORD): cv.string,
48 extra=vol.ALLOW_EXTRA,
52 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
53 """Activate the Apache Kafka integration."""
58 conf[CONF_IP_ADDRESS],
62 conf[CONF_SECURITY_PROTOCOL],
63 conf.get(CONF_USERNAME),
64 conf.get(CONF_PASSWORD),
67 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, kafka.shutdown)
75 """Encode python objects.
77 Additionally add encoding for datetime objects as isoformat.
81 """Implement encoding logic."""
82 if isinstance(o, datetime):
88 """Define a manager to buffer events to Kafka."""
96 entities_filter: EntityFilter,
97 security_protocol: Literal[
"PLAINTEXT",
"SSL",
"SASL_SSL"],
105 ssl_context = ssl_util.client_context()
107 bootstrap_servers=f
"{ip_address}:{port}",
108 compression_type=
"gzip",
109 security_protocol=security_protocol,
110 ssl_context=ssl_context,
111 sasl_mechanism=
"PLAIN",
112 sasl_plain_username=username,
113 sasl_plain_password=password,
117 def _encode_event(self, event: Event[EventStateChangedData]) -> bytes |
None:
118 """Translate events into a binary JSON payload."""
119 state = event.data[
"new_state"]
127 return json.dumps(obj=state.as_dict(), default=self.
_encoder_encoder.encode).encode(
132 """Start the Kafka manager."""
133 self.
_hass_hass.bus.async_listen(EVENT_STATE_CHANGED, self.
writewrite)
137 """Shut the manager down."""
140 async
def write(self, event: Event[EventStateChangedData]) ->
None:
141 """Write a binary payload to Kafka."""
142 key = event.data[
"entity_id"].encode(
"utf-8")
146 await self.
_producer_producer.send_and_wait(self.
_topic_topic, payload, key)
None shutdown(self, Event _)
bytes|None _encode_event(self, Event[EventStateChangedData] event)
None write(self, Event[EventStateChangedData] event)
None __init__(self, HomeAssistant hass, str ip_address, int port, str topic, EntityFilter entities_filter, Literal["PLAINTEXT", "SSL", "SASL_SSL"] security_protocol, str|None username, str|None password)
bool async_setup(HomeAssistant hass, ConfigType config)