1 """Support for sending data to a Graphite installation."""
3 from contextlib
import suppress
10 import voluptuous
as vol
17 EVENT_HOMEASSISTANT_START,
18 EVENT_HOMEASSISTANT_STOP,
26 _LOGGER = logging.getLogger(__name__)
30 DEFAULT_HOST =
"localhost"
32 DEFAULT_PROTOCOL = PROTOCOL_TCP
36 CONFIG_SCHEMA = vol.Schema(
40 vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
41 vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
42 vol.Optional(CONF_PROTOCOL, default=DEFAULT_PROTOCOL): vol.Any(
43 PROTOCOL_TCP, PROTOCOL_UDP
45 vol.Optional(CONF_PREFIX, default=DEFAULT_PREFIX): cv.string,
49 extra=vol.ALLOW_EXTRA,
53 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
54 """Set up the Graphite feeder."""
56 host = conf.get(CONF_HOST)
57 prefix = conf.get(CONF_PREFIX)
58 port = conf.get(CONF_PORT)
59 protocol = conf.get(CONF_PROTOCOL)
61 if protocol == PROTOCOL_TCP:
62 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
64 sock.connect((host, port))
66 _LOGGER.debug(
"Connection to Graphite possible")
68 _LOGGER.error(
"Not able to connect to Graphite")
71 _LOGGER.debug(
"No connection check for UDP possible")
73 hass.data[DOMAIN] =
GraphiteFeeder(hass, host, port, protocol, prefix)
78 """Feed data to Graphite."""
80 def __init__(self, hass, host, port, protocol, prefix):
81 """Initialize the feeder."""
93 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, self.
start_listenstart_listen)
94 _LOGGER.debug(
"Graphite feeding to %s:%i initialized", self.
_host_host, self.
_port_port)
97 """Start event-processing thread."""
98 _LOGGER.debug(
"Event processing thread started")
99 self.
_hass_hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.
shutdownshutdown)
106 """Signal shutdown of processing event."""
107 _LOGGER.debug(
"Event processing signaled exit")
114 """Queue an event for processing."""
116 _LOGGER.debug(
"Received event")
117 self.
_queue_queue.put(event)
119 _LOGGER.error(
"Graphite feeder thread has died, not queuing event")
122 """Send data to Graphite."""
123 if self.
_protocol_protocol == PROTOCOL_TCP:
124 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
126 sock.connect((self.
_host_host, self.
_port_port))
127 sock.sendall(data.encode(
"ascii"))
131 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
132 sock.sendto(data.encode(
"ascii") + b
"\n", (self.
_host_host, self.
_port_port))
135 """Report the attributes."""
137 things =
dict(new_state.attributes)
138 with suppress(ValueError):
139 things[
"state"] = state.state_as_number(new_state)
141 f
"{self._prefix}.{entity_id}.{key.replace(' ', '_')} {value:f} {now}"
142 for key, value
in things.items()
143 if isinstance(value, (float, int))
147 _LOGGER.debug(
"Sending to graphite: %s", lines)
150 except socket.gaierror:
151 _LOGGER.error(
"Unable to connect to host %s", self.
_host_host)
153 _LOGGER.exception(
"Failed to send data to graphite")
156 """Run the process to export the data."""
159 _LOGGER.debug(
"Event processing thread stopped")
160 self.
_queue_queue.task_done()
162 if event.event_type == EVENT_STATE_CHANGED:
163 if not event.data.get(
"new_state"):
165 "Skipping %s without new_state for %s",
167 event.data[
"entity_id"],
169 self.
_queue_queue.task_done()
173 "Processing STATE_CHANGED event for %s", event.data[
"entity_id"]
177 event.data[
"entity_id"], event.data[
"new_state"]
182 _LOGGER.exception(
"Failed to process STATE_CHANGED event")
184 _LOGGER.warning(
"Processing unexpected event type %s", event.event_type)
186 self.
_queue_queue.task_done()
def _report_attributes(self, entity_id, new_state)
def shutdown(self, event)
def _send_to_graphite(self, data)
def __init__(self, hass, host, port, protocol, prefix)
def event_listener(self, event)
def start_listen(self, event)
web.Response get(self, web.Request request, str config_key)
bool setup(HomeAssistant hass, ConfigType config)