Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for sending data to a Graphite installation."""
2 
3 from contextlib import suppress
4 import logging
5 import queue
6 import socket
7 import threading
8 import time
9 
10 import voluptuous as vol
11 
12 from homeassistant.const import (
13  CONF_HOST,
14  CONF_PORT,
15  CONF_PREFIX,
16  CONF_PROTOCOL,
17  EVENT_HOMEASSISTANT_START,
18  EVENT_HOMEASSISTANT_STOP,
19  EVENT_STATE_CHANGED,
20 )
21 from homeassistant.core import HomeAssistant
22 from homeassistant.helpers import state
24 from homeassistant.helpers.typing import ConfigType
25 
26 _LOGGER = logging.getLogger(__name__)
27 
28 PROTOCOL_TCP = "tcp"
29 PROTOCOL_UDP = "udp"
30 DEFAULT_HOST = "localhost"
31 DEFAULT_PORT = 2003
32 DEFAULT_PROTOCOL = PROTOCOL_TCP
33 DEFAULT_PREFIX = "ha"
34 DOMAIN = "graphite"
35 
36 CONFIG_SCHEMA = vol.Schema(
37  {
38  DOMAIN: vol.Schema(
39  {
40  vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
41  vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
42  vol.Optional(CONF_PROTOCOL, default=DEFAULT_PROTOCOL): vol.Any(
43  PROTOCOL_TCP, PROTOCOL_UDP
44  ),
45  vol.Optional(CONF_PREFIX, default=DEFAULT_PREFIX): cv.string,
46  }
47  )
48  },
49  extra=vol.ALLOW_EXTRA,
50 )
51 
52 
53 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
54  """Set up the Graphite feeder."""
55  conf = config[DOMAIN]
56  host = conf.get(CONF_HOST)
57  prefix = conf.get(CONF_PREFIX)
58  port = conf.get(CONF_PORT)
59  protocol = conf.get(CONF_PROTOCOL)
60 
61  if protocol == PROTOCOL_TCP:
62  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
63  try:
64  sock.connect((host, port))
65  sock.shutdown(2)
66  _LOGGER.debug("Connection to Graphite possible")
67  except OSError:
68  _LOGGER.error("Not able to connect to Graphite")
69  return False
70  else:
71  _LOGGER.debug("No connection check for UDP possible")
72 
73  hass.data[DOMAIN] = GraphiteFeeder(hass, host, port, protocol, prefix)
74  return True
75 
76 
77 class GraphiteFeeder(threading.Thread):
78  """Feed data to Graphite."""
79 
80  def __init__(self, hass, host, port, protocol, prefix):
81  """Initialize the feeder."""
82  super().__init__(daemon=True)
83  self._hass_hass = hass
84  self._host_host = host
85  self._port_port = port
86  self._protocol_protocol = protocol
87  # rstrip any trailing dots in case they think they need it
88  self._prefix_prefix = prefix.rstrip(".")
89  self._queue_queue = queue.Queue()
90  self._quit_object_quit_object = object()
91  self._unsub_state_changed_unsub_state_changed = None
92 
93  hass.bus.listen_once(EVENT_HOMEASSISTANT_START, self.start_listenstart_listen)
94  _LOGGER.debug("Graphite feeding to %s:%i initialized", self._host_host, self._port_port)
95 
96  def start_listen(self, event):
97  """Start event-processing thread."""
98  _LOGGER.debug("Event processing thread started")
99  self._hass_hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.shutdownshutdown)
100  self._unsub_state_changed_unsub_state_changed = self._hass_hass.bus.listen(
101  EVENT_STATE_CHANGED, self.event_listenerevent_listener
102  )
103  self.start()
104 
105  def shutdown(self, event):
106  """Signal shutdown of processing event."""
107  _LOGGER.debug("Event processing signaled exit")
108  self._unsub_state_changed_unsub_state_changed()
109  self._unsub_state_changed_unsub_state_changed = None
110  self._queue_queue.put(self._quit_object_quit_object)
111  self._queue_queue.join()
112 
113  def event_listener(self, event):
114  """Queue an event for processing."""
115  if self._unsub_state_changed_unsub_state_changed is not None:
116  _LOGGER.debug("Received event")
117  self._queue_queue.put(event)
118  else:
119  _LOGGER.error("Graphite feeder thread has died, not queuing event")
120 
121  def _send_to_graphite(self, data):
122  """Send data to Graphite."""
123  if self._protocol_protocol == PROTOCOL_TCP:
124  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
125  sock.settimeout(10)
126  sock.connect((self._host_host, self._port_port))
127  sock.sendall(data.encode("ascii"))
128  sock.send(b"\n")
129  sock.close()
130  else:
131  sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
132  sock.sendto(data.encode("ascii") + b"\n", (self._host_host, self._port_port))
133 
134  def _report_attributes(self, entity_id, new_state):
135  """Report the attributes."""
136  now = time.time()
137  things = dict(new_state.attributes)
138  with suppress(ValueError):
139  things["state"] = state.state_as_number(new_state)
140  lines = [
141  f"{self._prefix}.{entity_id}.{key.replace(' ', '_')} {value:f} {now}"
142  for key, value in things.items()
143  if isinstance(value, (float, int))
144  ]
145  if not lines:
146  return
147  _LOGGER.debug("Sending to graphite: %s", lines)
148  try:
149  self._send_to_graphite_send_to_graphite("\n".join(lines))
150  except socket.gaierror:
151  _LOGGER.error("Unable to connect to host %s", self._host_host)
152  except OSError:
153  _LOGGER.exception("Failed to send data to graphite")
154 
155  def run(self):
156  """Run the process to export the data."""
157  while True:
158  if (event := self._queue_queue.get()) == self._quit_object_quit_object:
159  _LOGGER.debug("Event processing thread stopped")
160  self._queue_queue.task_done()
161  return
162  if event.event_type == EVENT_STATE_CHANGED:
163  if not event.data.get("new_state"):
164  _LOGGER.debug(
165  "Skipping %s without new_state for %s",
166  event.event_type,
167  event.data["entity_id"],
168  )
169  self._queue_queue.task_done()
170  continue
171 
172  _LOGGER.debug(
173  "Processing STATE_CHANGED event for %s", event.data["entity_id"]
174  )
175  try:
176  self._report_attributes_report_attributes(
177  event.data["entity_id"], event.data["new_state"]
178  )
179  except Exception:
180  # Catch this so we can avoid the thread dying and
181  # make it visible.
182  _LOGGER.exception("Failed to process STATE_CHANGED event")
183  else:
184  _LOGGER.warning("Processing unexpected event type %s", event.event_type)
185 
186  self._queue_queue.task_done()
def _report_attributes(self, entity_id, new_state)
Definition: __init__.py:134
def __init__(self, hass, host, port, protocol, prefix)
Definition: __init__.py:80
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:53