1 """Support for Zabbix."""
3 from collections.abc
import Callable
4 from contextlib
import suppress
11 from urllib.error
import HTTPError
12 from urllib.parse
import urljoin
14 from pyzabbix
import ZabbixAPI, ZabbixAPIException, ZabbixMetric, ZabbixSender
15 import voluptuous
as vol
23 EVENT_HOMEASSISTANT_STOP,
32 INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA,
33 convert_include_exclude_filter,
37 from .const
import DOMAIN
39 _LOGGER = logging.getLogger(__name__)
41 CONF_PUBLISH_STATES_HOST =
"publish_states_host"
44 DEFAULT_PATH =
"zabbix"
48 QUEUE_BACKLOG_SECONDS = 30
50 RETRY_MESSAGE = f
"%s Retrying in {RETRY_INTERVAL} seconds."
53 BATCH_BUFFER_SIZE = 100
55 CONFIG_SCHEMA = vol.Schema(
57 DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
59 vol.Required(CONF_HOST): cv.string,
60 vol.Optional(CONF_PASSWORD): cv.string,
61 vol.Optional(CONF_PATH, default=DEFAULT_PATH): cv.string,
62 vol.Optional(CONF_SSL, default=DEFAULT_SSL): cv.boolean,
63 vol.Optional(CONF_USERNAME): cv.string,
64 vol.Optional(CONF_PUBLISH_STATES_HOST): cv.string,
68 extra=vol.ALLOW_EXTRA,
72 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
73 """Set up the Zabbix component."""
76 protocol =
"https" if conf[CONF_SSL]
else "http"
78 url = urljoin(f
"{protocol}://{conf[CONF_HOST]}", conf[CONF_PATH])
79 username = conf.get(CONF_USERNAME)
80 password = conf.get(CONF_PASSWORD)
82 publish_states_host = conf.get(CONF_PUBLISH_STATES_HOST)
87 zapi = ZabbixAPI(url=url, user=username, password=password)
88 _LOGGER.debug(
"Connected to Zabbix API Version %s", zapi.api_version())
89 except ZabbixAPIException
as login_exception:
90 _LOGGER.error(
"Unable to login to the Zabbix API: %s", login_exception)
92 except HTTPError
as http_error:
93 _LOGGER.error(
"HTTPError when connecting to Zabbix API: %s", http_error)
95 _LOGGER.error(RETRY_MESSAGE, http_error)
96 event_helper.call_later(
99 lambda _:
setup(hass, config),
103 hass.data[DOMAIN] = zapi
105 def event_to_metrics(
106 event: Event, float_keys: set[str], string_keys: set[str]
107 ) -> list[ZabbixMetric] |
None:
108 """Add an event to the outgoing Zabbix list."""
109 state = event.data.get(
"new_state")
110 if state
is None or state.state
in (STATE_UNKNOWN,
"", STATE_UNAVAILABLE):
113 entity_id = state.entity_id
114 if not entities_filter(entity_id):
120 _state_as_value =
float(state.state)
121 floats[entity_id] = _state_as_value
124 _state_as_value =
float(state_helper.state_as_number(state))
125 floats[entity_id] = _state_as_value
127 strings[entity_id] = state.state
129 for key, value
in state.attributes.items():
133 attribute_id = f
"{entity_id}/{key}"
135 float_value =
float(value)
136 except (ValueError, TypeError):
138 if float_value
is None or not math.isfinite(float_value):
139 strings[attribute_id] =
str(value)
141 floats[attribute_id] = float_value
144 float_keys_count = len(float_keys)
145 float_keys.update(floats)
146 if len(float_keys) != float_keys_count:
147 floats_discovery = [{
"{#KEY}": float_key}
for float_key
in float_keys]
148 metric = ZabbixMetric(
150 "homeassistant.floats_discovery",
151 json.dumps(floats_discovery),
153 metrics.append(metric)
154 for key, value
in floats.items():
155 metric = ZabbixMetric(
156 publish_states_host, f
"homeassistant.float[{key}]", value
158 metrics.append(metric)
160 string_keys.update(strings)
163 if publish_states_host:
164 zabbix_sender = ZabbixSender(zabbix_server=conf[CONF_HOST])
165 instance =
ZabbixThread(zabbix_sender, event_to_metrics)
172 """A threaded event handler class."""
178 zabbix_sender: ZabbixSender,
179 event_to_metrics: Callable[
180 [Event, set[str], set[str]], list[ZabbixMetric] |
None
183 """Initialize the listener."""
184 threading.Thread.__init__(self, name=
"Zabbix")
185 self.queue: queue.Queue = queue.Queue()
190 self.float_keys: set[str] = set()
191 self.string_keys: set[str] = set()
193 def setup(self, hass: HomeAssistant) ->
None:
194 """Set up the thread and start it."""
195 hass.bus.listen(EVENT_STATE_CHANGED, self.
_event_listener_event_listener)
196 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.
_shutdown_shutdown)
198 _LOGGER.debug(
"Started publishing state changes to Zabbix")
201 """Shut down the thread."""
207 """Listen for new messages on the bus and queue them for Zabbix."""
208 item = (time.monotonic(), event)
212 """Return a batch of events formatted for writing."""
213 queue_seconds = QUEUE_BACKLOG_SECONDS + self.
MAX_TRIESMAX_TRIES * RETRY_DELAY
216 metrics: list[ZabbixMetric] = []
220 with suppress(queue.Empty):
221 while len(metrics) < BATCH_BUFFER_SIZE
and not self.
shutdownshutdown:
222 timeout =
None if count == 0
else BATCH_TIMEOUT
223 item = self.queue.
get(timeout=timeout)
229 timestamp, event = item
230 age = time.monotonic() - timestamp
232 if age < queue_seconds:
234 event, self.float_keys, self.string_keys
237 metrics += event_metrics
242 _LOGGER.warning(
"Catching up, dropped %d old events", dropped)
244 return count, metrics
247 """Write preprocessed events to zabbix, with retry."""
249 for retry
in range(self.
MAX_TRIESMAX_TRIES + 1):
254 _LOGGER.error(
"Resumed, lost %d events", self.
write_errorswrite_errors)
257 _LOGGER.debug(
"Wrote %d metrics", len(metrics))
259 except OSError
as err:
261 time.sleep(RETRY_DELAY)
264 _LOGGER.error(
"Write error: %s", err)
268 """Process incoming events."""
273 for _
in range(count):
274 self.queue.task_done()
None write_to_zabbix(self, list[ZabbixMetric] metrics)
None _event_listener(self, Event[EventStateChangedData] event)
None _shutdown(self, Event event)
None __init__(self, ZabbixSender zabbix_sender, Callable[[Event, set[str], set[str]], list[ZabbixMetric]|None] event_to_metrics)
tuple[int, list[ZabbixMetric]] get_metrics(self)
None setup(self, HomeAssistant hass)
web.Response get(self, web.Request request, str config_key)
bool setup(HomeAssistant hass, ConfigType config)
EntityFilter convert_include_exclude_filter(dict[str, dict[str, list[str]]] config)