1 """Support for the IBM Watson IoT Platform."""
8 from ibmiotf
import MissingMessageEncoderException
9 from ibmiotf.gateway
import Client
10 import voluptuous
as vol
20 EVENT_HOMEASSISTANT_STOP,
30 _LOGGER = logging.getLogger(__name__)
32 CONF_ORG =
"organization"
40 CONFIG_SCHEMA = vol.Schema(
45 vol.Required(CONF_ORG): cv.string,
46 vol.Required(CONF_TYPE): cv.string,
47 vol.Required(CONF_ID): cv.string,
48 vol.Required(CONF_TOKEN): cv.string,
49 vol.Optional(CONF_EXCLUDE, default={}): vol.Schema(
51 vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
52 vol.Optional(CONF_DOMAINS, default=[]): vol.All(
53 cv.ensure_list, [cv.string]
57 vol.Optional(CONF_INCLUDE, default={}): vol.Schema(
59 vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
60 vol.Optional(CONF_DOMAINS, default=[]): vol.All(
61 cv.ensure_list, [cv.string]
69 extra=vol.ALLOW_EXTRA,
73 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
74 """Set up the Watson IoT Platform component."""
78 include = conf[CONF_INCLUDE]
79 exclude = conf[CONF_EXCLUDE]
80 include_e = set(include[CONF_ENTITIES])
81 include_d = set(include[CONF_DOMAINS])
82 exclude_e = set(exclude[CONF_ENTITIES])
83 exclude_d = set(exclude[CONF_DOMAINS])
86 "org": conf[CONF_ORG],
87 "type": conf[CONF_TYPE],
89 "auth-method":
"token",
90 "auth-token": conf[CONF_TOKEN],
92 watson_gateway = Client(client_args)
94 def event_to_json(event):
95 """Add an event to the outgoing list."""
96 state = event.data.get(
"new_state")
99 or state.state
in (STATE_UNKNOWN,
"", STATE_UNAVAILABLE)
100 or state.entity_id
in exclude_e
101 or state.domain
in exclude_d
105 if (include_e
and state.entity_id
not in include_e)
or (
106 include_d
and state.domain
not in include_d
111 _state_as_value =
float(state.state)
113 _state_as_value =
None
115 if _state_as_value
is None:
117 _state_as_value =
float(state_helper.state_as_number(state))
119 _state_as_value =
None
122 "tags": {
"domain": state.domain,
"entity_id": state.object_id},
123 "time": event.time_fired.isoformat(),
124 "fields": {
"state": state.state},
126 if _state_as_value
is not None:
127 out_event[
"fields"][
"state_value"] = _state_as_value
129 for key, value
in state.attributes.items():
130 if key !=
"unit_of_measurement":
132 if key
in out_event[
"fields"]:
138 out_event[
"fields"][key] =
float(value)
139 except (ValueError, TypeError):
140 out_event[
"fields"][key] =
str(value)
144 instance = hass.data[DOMAIN] =
WatsonIOTThread(hass, watson_gateway, event_to_json)
148 """Shut down the thread."""
149 instance.queue.put(
None)
152 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
158 """A threaded event handler class."""
161 """Initialize the listener."""
162 threading.Thread.__init__(self, name=
"WatsonIOT")
169 hass.bus.listen(EVENT_STATE_CHANGED, self.
_event_listener_event_listener)
173 """Listen for new messages on the bus and queue them for Watson IoT."""
174 item = (time.monotonic(), event)
175 self.
queuequeue.put(item)
178 """Return an event formatted for writing."""
182 if (item := self.
queuequeue.
get())
is None:
187 events.append(event_json)
195 """Write preprocessed events to watson."""
198 for retry
in range(MAX_TRIES + 1):
200 for field
in event[
"fields"]:
201 value = event[
"fields"][field]
202 device_success = self.
gatewaygateway.publishDeviceEvent(
203 event[
"tags"][
"domain"],
204 event[
"tags"][
"entity_id"],
209 if not device_success:
210 _LOGGER.error(
"Failed to publish message to Watson IoT")
213 except (MissingMessageEncoderException, OSError):
214 if retry < MAX_TRIES:
215 time.sleep(RETRY_DELAY)
217 _LOGGER.exception(
"Failed to publish message to Watson IoT")
220 """Process incoming events."""
224 self.
queuequeue.task_done()
227 """Block till all events processed."""
228 self.
queuequeue.join()
def _event_listener(self, event)
def __init__(self, hass, gateway, event_to_json)
def write_to_watson(self, events)
def get_events_json(self)
def block_till_done(self)
web.Response get(self, web.Request request, str config_key)
bool setup(HomeAssistant hass, ConfigType config)