Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Google Cloud Pub/Sub."""
2 
3 from __future__ import annotations
4 
5 import datetime
6 import json
7 import logging
8 import os
9 
10 from google.cloud.pubsub_v1 import PublisherClient
11 import voluptuous as vol
12 
13 from homeassistant.const import EVENT_STATE_CHANGED, STATE_UNAVAILABLE, STATE_UNKNOWN
14 from homeassistant.core import Event, EventStateChangedData, HomeAssistant
16 from homeassistant.helpers.entityfilter import FILTER_SCHEMA
17 from homeassistant.helpers.typing import ConfigType
18 
19 _LOGGER = logging.getLogger(__name__)
20 
21 DOMAIN = "google_pubsub"
22 
23 CONF_PROJECT_ID = "project_id"
24 CONF_TOPIC_NAME = "topic_name"
25 CONF_SERVICE_PRINCIPAL = "credentials_json"
26 CONF_FILTER = "filter"
27 
28 CONFIG_SCHEMA = vol.Schema(
29  {
30  DOMAIN: vol.Schema(
31  {
32  vol.Required(CONF_PROJECT_ID): cv.string,
33  vol.Required(CONF_TOPIC_NAME): cv.string,
34  vol.Required(CONF_SERVICE_PRINCIPAL): cv.string,
35  vol.Required(CONF_FILTER): FILTER_SCHEMA,
36  }
37  )
38  },
39  extra=vol.ALLOW_EXTRA,
40 )
41 
42 
43 def setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
44  """Activate Google Pub/Sub component."""
45  config = yaml_config[DOMAIN]
46  project_id = config[CONF_PROJECT_ID]
47  topic_name = config[CONF_TOPIC_NAME]
48  service_principal_path = hass.config.path(config[CONF_SERVICE_PRINCIPAL])
49 
50  if not os.path.isfile(service_principal_path):
51  _LOGGER.error("Path to credentials file cannot be found")
52  return False
53 
54  entities_filter = config[CONF_FILTER]
55 
56  publisher = PublisherClient.from_service_account_json(service_principal_path)
57 
58  topic_path = publisher.topic_path(project_id, topic_name)
59 
60  encoder = DateTimeJSONEncoder()
61 
62  def send_to_pubsub(event: Event[EventStateChangedData]):
63  """Send states to Pub/Sub."""
64  state = event.data["new_state"]
65  if (
66  state is None
67  or state.state in (STATE_UNKNOWN, "", STATE_UNAVAILABLE)
68  or not entities_filter(state.entity_id)
69  ):
70  return
71 
72  as_dict = state.as_dict()
73  data = json.dumps(obj=as_dict, default=encoder.encode).encode("utf-8")
74 
75  publisher.publish(topic_path, data=data)
76 
77  hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub)
78 
79  return True
80 
81 
82 class DateTimeJSONEncoder(json.JSONEncoder):
83  """Encode python objects.
84 
85  Additionally add encoding for datetime objects as isoformat.
86  """
87 
88  def default(self, o):
89  """Implement encoding logic."""
90  if isinstance(o, datetime.datetime):
91  return o.isoformat()
92  return super().default(o)
bool setup(HomeAssistant hass, ConfigType yaml_config)
Definition: __init__.py:43