Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Apache Kafka."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime
6 import json
7 from typing import Any, Literal
8 
9 from aiokafka import AIOKafkaProducer
10 import voluptuous as vol
11 
12 from homeassistant.const import (
13  CONF_IP_ADDRESS,
14  CONF_PASSWORD,
15  CONF_PORT,
16  CONF_USERNAME,
17  EVENT_HOMEASSISTANT_STOP,
18  EVENT_STATE_CHANGED,
19 )
20 from homeassistant.core import Event, EventStateChangedData, HomeAssistant
22 from homeassistant.helpers.entityfilter import FILTER_SCHEMA, EntityFilter
23 from homeassistant.helpers.typing import ConfigType
24 from homeassistant.util import ssl as ssl_util
25 
26 DOMAIN = "apache_kafka"
27 
28 CONF_FILTER = "filter"
29 CONF_TOPIC = "topic"
30 CONF_SECURITY_PROTOCOL = "security_protocol"
31 
32 CONFIG_SCHEMA = vol.Schema(
33  {
34  DOMAIN: vol.Schema(
35  {
36  vol.Required(CONF_IP_ADDRESS): cv.string,
37  vol.Required(CONF_PORT): cv.port,
38  vol.Required(CONF_TOPIC): cv.string,
39  vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA,
40  vol.Optional(CONF_SECURITY_PROTOCOL, default="PLAINTEXT"): vol.In(
41  ["PLAINTEXT", "SSL", "SASL_SSL"]
42  ),
43  vol.Optional(CONF_USERNAME): cv.string,
44  vol.Optional(CONF_PASSWORD): cv.string,
45  }
46  )
47  },
48  extra=vol.ALLOW_EXTRA,
49 )
50 
51 
52 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
53  """Activate the Apache Kafka integration."""
54  conf = config[DOMAIN]
55 
56  kafka = KafkaManager(
57  hass,
58  conf[CONF_IP_ADDRESS],
59  conf[CONF_PORT],
60  conf[CONF_TOPIC],
61  conf[CONF_FILTER],
62  conf[CONF_SECURITY_PROTOCOL],
63  conf.get(CONF_USERNAME),
64  conf.get(CONF_PASSWORD),
65  )
66 
67  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, kafka.shutdown)
68 
69  await kafka.start()
70 
71  return True
72 
73 
74 class DateTimeJSONEncoder(json.JSONEncoder):
75  """Encode python objects.
76 
77  Additionally add encoding for datetime objects as isoformat.
78  """
79 
80  def default(self, o: Any) -> str:
81  """Implement encoding logic."""
82  if isinstance(o, datetime):
83  return o.isoformat()
84  return super().default(o) # type: ignore[no-any-return]
85 
86 
88  """Define a manager to buffer events to Kafka."""
89 
90  def __init__(
91  self,
92  hass: HomeAssistant,
93  ip_address: str,
94  port: int,
95  topic: str,
96  entities_filter: EntityFilter,
97  security_protocol: Literal["PLAINTEXT", "SSL", "SASL_SSL"],
98  username: str | None,
99  password: str | None,
100  ) -> None:
101  """Initialize."""
102  self._encoder_encoder = DateTimeJSONEncoder()
103  self._entities_filter_entities_filter = entities_filter
104  self._hass_hass = hass
105  ssl_context = ssl_util.client_context()
106  self._producer_producer = AIOKafkaProducer(
107  bootstrap_servers=f"{ip_address}:{port}",
108  compression_type="gzip",
109  security_protocol=security_protocol,
110  ssl_context=ssl_context,
111  sasl_mechanism="PLAIN",
112  sasl_plain_username=username,
113  sasl_plain_password=password,
114  )
115  self._topic_topic = topic
116 
117  def _encode_event(self, event: Event[EventStateChangedData]) -> bytes | None:
118  """Translate events into a binary JSON payload."""
119  state = event.data["new_state"]
120  if (
121  state is None
122  or state.state == ""
123  or not self._entities_filter_entities_filter(state.entity_id)
124  ):
125  return None
126 
127  return json.dumps(obj=state.as_dict(), default=self._encoder_encoder.encode).encode(
128  "utf-8"
129  )
130 
131  async def start(self) -> None:
132  """Start the Kafka manager."""
133  self._hass_hass.bus.async_listen(EVENT_STATE_CHANGED, self.writewrite)
134  await self._producer_producer.start()
135 
136  async def shutdown(self, _: Event) -> None:
137  """Shut the manager down."""
138  await self._producer_producer.stop()
139 
140  async def write(self, event: Event[EventStateChangedData]) -> None:
141  """Write a binary payload to Kafka."""
142  key = event.data["entity_id"].encode("utf-8")
143  payload = self._encode_event_encode_event(event)
144 
145  if payload:
146  await self._producer_producer.send_and_wait(self._topic_topic, payload, key)
bytes|None _encode_event(self, Event[EventStateChangedData] event)
Definition: __init__.py:117
None write(self, Event[EventStateChangedData] event)
Definition: __init__.py:140
None __init__(self, HomeAssistant hass, str ip_address, int port, str topic, EntityFilter entities_filter, Literal["PLAINTEXT", "SSL", "SASL_SSL"] security_protocol, str|None username, str|None password)
Definition: __init__.py:100
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:52