Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Zabbix."""
2 
3 from collections.abc import Callable
4 from contextlib import suppress
5 import json
6 import logging
7 import math
8 import queue
9 import threading
10 import time
11 from urllib.error import HTTPError
12 from urllib.parse import urljoin
13 
14 from pyzabbix import ZabbixAPI, ZabbixAPIException, ZabbixMetric, ZabbixSender
15 import voluptuous as vol
16 
17 from homeassistant.const import (
18  CONF_HOST,
19  CONF_PASSWORD,
20  CONF_PATH,
21  CONF_SSL,
22  CONF_USERNAME,
23  EVENT_HOMEASSISTANT_STOP,
24  EVENT_STATE_CHANGED,
25  STATE_UNAVAILABLE,
26  STATE_UNKNOWN,
27 )
28 from homeassistant.core import Event, EventStateChangedData, HomeAssistant, callback
29 from homeassistant.helpers import event as event_helper, state as state_helper
32  INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
33  convert_include_exclude_filter,
34 )
35 from homeassistant.helpers.typing import ConfigType
36 
37 from .const import DOMAIN
38 
39 _LOGGER = logging.getLogger(__name__)
40 
41 CONF_PUBLISH_STATES_HOST = "publish_states_host"
42 
43 DEFAULT_SSL = False
44 DEFAULT_PATH = "zabbix"
45 
46 TIMEOUT = 5
47 RETRY_DELAY = 20
48 QUEUE_BACKLOG_SECONDS = 30
49 RETRY_INTERVAL = 60 # seconds
50 RETRY_MESSAGE = f"%s Retrying in {RETRY_INTERVAL} seconds."
51 
52 BATCH_TIMEOUT = 1
53 BATCH_BUFFER_SIZE = 100
54 
55 CONFIG_SCHEMA = vol.Schema(
56  {
57  DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
58  {
59  vol.Required(CONF_HOST): cv.string,
60  vol.Optional(CONF_PASSWORD): cv.string,
61  vol.Optional(CONF_PATH, default=DEFAULT_PATH): cv.string,
62  vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
63  vol.Optional(CONF_USERNAME): cv.string,
64  vol.Optional(CONF_PUBLISH_STATES_HOST): cv.string,
65  }
66  )
67  },
68  extra=vol.ALLOW_EXTRA,
69 )
70 
71 
72 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
73  """Set up the Zabbix component."""
74 
75  conf = config[DOMAIN]
76  protocol = "https" if conf[CONF_SSL] else "http"
77 
78  url = urljoin(f"{protocol}://{conf[CONF_HOST]}", conf[CONF_PATH])
79  username = conf.get(CONF_USERNAME)
80  password = conf.get(CONF_PASSWORD)
81 
82  publish_states_host = conf.get(CONF_PUBLISH_STATES_HOST)
83 
84  entities_filter = convert_include_exclude_filter(conf)
85 
86  try:
87  zapi = ZabbixAPI(url=url, user=username, password=password)
88  _LOGGER.debug("Connected to Zabbix API Version %s", zapi.api_version())
89  except ZabbixAPIException as login_exception:
90  _LOGGER.error("Unable to login to the Zabbix API: %s", login_exception)
91  return False
92  except HTTPError as http_error:
93  _LOGGER.error("HTTPError when connecting to Zabbix API: %s", http_error)
94  zapi = None
95  _LOGGER.error(RETRY_MESSAGE, http_error)
96  event_helper.call_later(
97  hass,
98  RETRY_INTERVAL,
99  lambda _: setup(hass, config), # type: ignore[arg-type,return-value]
100  )
101  return True
102 
103  hass.data[DOMAIN] = zapi
104 
105  def event_to_metrics(
106  event: Event, float_keys: set[str], string_keys: set[str]
107  ) -> list[ZabbixMetric] | None:
108  """Add an event to the outgoing Zabbix list."""
109  state = event.data.get("new_state")
110  if state is None or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE):
111  return None
112 
113  entity_id = state.entity_id
114  if not entities_filter(entity_id):
115  return None
116 
117  floats = {}
118  strings = {}
119  try:
120  _state_as_value = float(state.state)
121  floats[entity_id] = _state_as_value
122  except ValueError:
123  try:
124  _state_as_value = float(state_helper.state_as_number(state))
125  floats[entity_id] = _state_as_value
126  except ValueError:
127  strings[entity_id] = state.state
128 
129  for key, value in state.attributes.items():
130  # For each value we try to cast it as float
131  # But if we cannot do it we store the value
132  # as string
133  attribute_id = f"{entity_id}/{key}"
134  try:
135  float_value = float(value)
136  except (ValueError, TypeError):
137  float_value = None
138  if float_value is None or not math.isfinite(float_value):
139  strings[attribute_id] = str(value)
140  else:
141  floats[attribute_id] = float_value
142 
143  metrics = []
144  float_keys_count = len(float_keys)
145  float_keys.update(floats)
146  if len(float_keys) != float_keys_count:
147  floats_discovery = [{"{#KEY}": float_key} for float_key in float_keys]
148  metric = ZabbixMetric(
149  publish_states_host,
150  "homeassistant.floats_discovery",
151  json.dumps(floats_discovery),
152  )
153  metrics.append(metric)
154  for key, value in floats.items():
155  metric = ZabbixMetric(
156  publish_states_host, f"homeassistant.float[{key}]", value
157  )
158  metrics.append(metric)
159 
160  string_keys.update(strings)
161  return metrics
162 
163  if publish_states_host:
164  zabbix_sender = ZabbixSender(zabbix_server=conf[CONF_HOST])
165  instance = ZabbixThread(zabbix_sender, event_to_metrics)
166  instance.setup(hass)
167 
168  return True
169 
170 
171 class ZabbixThread(threading.Thread):
172  """A threaded event handler class."""
173 
174  MAX_TRIES = 3
175 
176  def __init__(
177  self,
178  zabbix_sender: ZabbixSender,
179  event_to_metrics: Callable[
180  [Event, set[str], set[str]], list[ZabbixMetric] | None
181  ],
182  ) -> None:
183  """Initialize the listener."""
184  threading.Thread.__init__(self, name="Zabbix")
185  self.queue: queue.Queue = queue.Queue()
186  self.zabbix_senderzabbix_sender = zabbix_sender
187  self.event_to_metricsevent_to_metrics = event_to_metrics
188  self.write_errorswrite_errors = 0
189  self.shutdownshutdown = False
190  self.float_keys: set[str] = set()
191  self.string_keys: set[str] = set()
192 
193  def setup(self, hass: HomeAssistant) -> None:
194  """Set up the thread and start it."""
195  hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener_event_listener)
196  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self._shutdown_shutdown)
197  self.start()
198  _LOGGER.debug("Started publishing state changes to Zabbix")
199 
200  def _shutdown(self, event: Event) -> None:
201  """Shut down the thread."""
202  self.queue.put(None)
203  self.join()
204 
205  @callback
206  def _event_listener(self, event: Event[EventStateChangedData]) -> None:
207  """Listen for new messages on the bus and queue them for Zabbix."""
208  item = (time.monotonic(), event)
209  self.queue.put(item)
210 
211  def get_metrics(self) -> tuple[int, list[ZabbixMetric]]:
212  """Return a batch of events formatted for writing."""
213  queue_seconds = QUEUE_BACKLOG_SECONDS + self.MAX_TRIESMAX_TRIES * RETRY_DELAY
214 
215  count = 0
216  metrics: list[ZabbixMetric] = []
217 
218  dropped = 0
219 
220  with suppress(queue.Empty):
221  while len(metrics) < BATCH_BUFFER_SIZE and not self.shutdownshutdown:
222  timeout = None if count == 0 else BATCH_TIMEOUT
223  item = self.queue.get(timeout=timeout)
224  count += 1
225 
226  if item is None:
227  self.shutdownshutdown = True
228  else:
229  timestamp, event = item
230  age = time.monotonic() - timestamp
231 
232  if age < queue_seconds:
233  event_metrics = self.event_to_metricsevent_to_metrics(
234  event, self.float_keys, self.string_keys
235  )
236  if event_metrics:
237  metrics += event_metrics
238  else:
239  dropped += 1
240 
241  if dropped:
242  _LOGGER.warning("Catching up, dropped %d old events", dropped)
243 
244  return count, metrics
245 
246  def write_to_zabbix(self, metrics: list[ZabbixMetric]) -> None:
247  """Write preprocessed events to zabbix, with retry."""
248 
249  for retry in range(self.MAX_TRIESMAX_TRIES + 1):
250  try:
251  self.zabbix_senderzabbix_sender.send(metrics)
252 
253  if self.write_errorswrite_errors:
254  _LOGGER.error("Resumed, lost %d events", self.write_errorswrite_errors)
255  self.write_errorswrite_errors = 0
256 
257  _LOGGER.debug("Wrote %d metrics", len(metrics))
258  break
259  except OSError as err:
260  if retry < self.MAX_TRIESMAX_TRIES:
261  time.sleep(RETRY_DELAY)
262  else:
263  if not self.write_errorswrite_errors:
264  _LOGGER.error("Write error: %s", err)
265  self.write_errorswrite_errors += len(metrics)
266 
267  def run(self) -> None:
268  """Process incoming events."""
269  while not self.shutdownshutdown:
270  count, metrics = self.get_metricsget_metrics()
271  if metrics:
272  self.write_to_zabbixwrite_to_zabbix(metrics)
273  for _ in range(count):
274  self.queue.task_done()
None write_to_zabbix(self, list[ZabbixMetric] metrics)
Definition: __init__.py:246
None _event_listener(self, Event[EventStateChangedData] event)
Definition: __init__.py:206
None __init__(self, ZabbixSender zabbix_sender, Callable[[Event, set[str], set[str]], list[ZabbixMetric]|None] event_to_metrics)
Definition: __init__.py:182
tuple[int, list[ZabbixMetric]] get_metrics(self)
Definition: __init__.py:211
None setup(self, HomeAssistant hass)
Definition: __init__.py:193
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:72
EntityFilter convert_include_exclude_filter(dict[str, dict[str, list[str]]] config)