Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for the IBM Watson IoT Platform."""
2 
3 import logging
4 import queue
5 import threading
6 import time
7 
8 from ibmiotf import MissingMessageEncoderException
9 from ibmiotf.gateway import Client
10 import voluptuous as vol
11 
12 from homeassistant.const import (
13  CONF_DOMAINS,
14  CONF_ENTITIES,
15  CONF_EXCLUDE,
16  CONF_ID,
17  CONF_INCLUDE,
18  CONF_TOKEN,
19  CONF_TYPE,
20  EVENT_HOMEASSISTANT_STOP,
21  EVENT_STATE_CHANGED,
22  STATE_UNAVAILABLE,
23  STATE_UNKNOWN,
24 )
25 from homeassistant.core import HomeAssistant, callback
26 from homeassistant.helpers import state as state_helper
28 from homeassistant.helpers.typing import ConfigType
29 
30 _LOGGER = logging.getLogger(__name__)
31 
32 CONF_ORG = "organization"
33 
34 DOMAIN = "watson_iot"
35 
36 MAX_TRIES = 3
37 
38 RETRY_DELAY = 20
39 
40 CONFIG_SCHEMA = vol.Schema(
41  {
42  DOMAIN: vol.All(
43  vol.Schema(
44  {
45  vol.Required(CONF_ORG): cv.string,
46  vol.Required(CONF_TYPE): cv.string,
47  vol.Required(CONF_ID): cv.string,
48  vol.Required(CONF_TOKEN): cv.string,
49  vol.Optional(CONF_EXCLUDE, default={}): vol.Schema(
50  {
51  vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
52  vol.Optional(CONF_DOMAINS, default=[]): vol.All(
53  cv.ensure_list, [cv.string]
54  ),
55  }
56  ),
57  vol.Optional(CONF_INCLUDE, default={}): vol.Schema(
58  {
59  vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
60  vol.Optional(CONF_DOMAINS, default=[]): vol.All(
61  cv.ensure_list, [cv.string]
62  ),
63  }
64  ),
65  }
66  )
67  )
68  },
69  extra=vol.ALLOW_EXTRA,
70 )
71 
72 
73 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
74  """Set up the Watson IoT Platform component."""
75 
76  conf = config[DOMAIN]
77 
78  include = conf[CONF_INCLUDE]
79  exclude = conf[CONF_EXCLUDE]
80  include_e = set(include[CONF_ENTITIES])
81  include_d = set(include[CONF_DOMAINS])
82  exclude_e = set(exclude[CONF_ENTITIES])
83  exclude_d = set(exclude[CONF_DOMAINS])
84 
85  client_args = {
86  "org": conf[CONF_ORG],
87  "type": conf[CONF_TYPE],
88  "id": conf[CONF_ID],
89  "auth-method": "token",
90  "auth-token": conf[CONF_TOKEN],
91  }
92  watson_gateway = Client(client_args)
93 
94  def event_to_json(event):
95  """Add an event to the outgoing list."""
96  state = event.data.get("new_state")
97  if (
98  state is None
99  or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE)
100  or state.entity_id in exclude_e
101  or state.domain in exclude_d
102  ):
103  return None
104 
105  if (include_e and state.entity_id not in include_e) or (
106  include_d and state.domain not in include_d
107  ):
108  return None
109 
110  try:
111  _state_as_value = float(state.state)
112  except ValueError:
113  _state_as_value = None
114 
115  if _state_as_value is None:
116  try:
117  _state_as_value = float(state_helper.state_as_number(state))
118  except ValueError:
119  _state_as_value = None
120 
121  out_event = {
122  "tags": {"domain": state.domain, "entity_id": state.object_id},
123  "time": event.time_fired.isoformat(),
124  "fields": {"state": state.state},
125  }
126  if _state_as_value is not None:
127  out_event["fields"]["state_value"] = _state_as_value
128 
129  for key, value in state.attributes.items():
130  if key != "unit_of_measurement":
131  # If the key is already in fields
132  if key in out_event["fields"]:
133  key = f"{key}_"
134  # For each value we try to cast it as float
135  # But if we cannot do it we store the value
136  # as string
137  try:
138  out_event["fields"][key] = float(value)
139  except (ValueError, TypeError):
140  out_event["fields"][key] = str(value)
141 
142  return out_event
143 
144  instance = hass.data[DOMAIN] = WatsonIOTThread(hass, watson_gateway, event_to_json)
145  instance.start()
146 
147  def shutdown(event):
148  """Shut down the thread."""
149  instance.queue.put(None)
150  instance.join()
151 
152  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
153 
154  return True
155 
156 
157 class WatsonIOTThread(threading.Thread):
158  """A threaded event handler class."""
159 
160  def __init__(self, hass, gateway, event_to_json):
161  """Initialize the listener."""
162  threading.Thread.__init__(self, name="WatsonIOT")
163  self.queuequeue = queue.Queue()
164  self.gatewaygateway = gateway
165  self.gatewaygateway.connect()
166  self.event_to_jsonevent_to_json = event_to_json
167  self.write_errorswrite_errors = 0
168  self.shutdownshutdown = False
169  hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener_event_listener)
170 
171  @callback
172  def _event_listener(self, event):
173  """Listen for new messages on the bus and queue them for Watson IoT."""
174  item = (time.monotonic(), event)
175  self.queuequeue.put(item)
176 
177  def get_events_json(self):
178  """Return an event formatted for writing."""
179  events = []
180 
181  try:
182  if (item := self.queuequeue.get()) is None:
183  self.shutdownshutdown = True
184  else:
185  event_json = self.event_to_jsonevent_to_json(item[1])
186  if event_json:
187  events.append(event_json)
188 
189  except queue.Empty:
190  pass
191 
192  return events
193 
194  def write_to_watson(self, events):
195  """Write preprocessed events to watson."""
196 
197  for event in events:
198  for retry in range(MAX_TRIES + 1):
199  try:
200  for field in event["fields"]:
201  value = event["fields"][field]
202  device_success = self.gatewaygateway.publishDeviceEvent(
203  event["tags"]["domain"],
204  event["tags"]["entity_id"],
205  field,
206  "json",
207  value,
208  )
209  if not device_success:
210  _LOGGER.error("Failed to publish message to Watson IoT")
211  continue
212  break
213  except (MissingMessageEncoderException, OSError):
214  if retry < MAX_TRIES:
215  time.sleep(RETRY_DELAY)
216  else:
217  _LOGGER.exception("Failed to publish message to Watson IoT")
218 
219  def run(self):
220  """Process incoming events."""
221  while not self.shutdownshutdown:
222  if event := self.get_events_jsonget_events_json():
223  self.write_to_watsonwrite_to_watson(event)
224  self.queuequeue.task_done()
225 
226  def block_till_done(self):
227  """Block till all events processed."""
228  self.queuequeue.join()
def __init__(self, hass, gateway, event_to_json)
Definition: __init__.py:160
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:73