1 """Support for Google Cloud Pub/Sub."""
3 from __future__
import annotations
10 from google.cloud.pubsub_v1
import PublisherClient
11 import voluptuous
as vol
19 _LOGGER = logging.getLogger(__name__)
21 DOMAIN =
"google_pubsub"
23 CONF_PROJECT_ID =
"project_id"
24 CONF_TOPIC_NAME =
"topic_name"
25 CONF_SERVICE_PRINCIPAL =
"credentials_json"
26 CONF_FILTER =
"filter"
28 CONFIG_SCHEMA = vol.Schema(
32 vol.Required(CONF_PROJECT_ID): cv.string,
33 vol.Required(CONF_TOPIC_NAME): cv.string,
34 vol.Required(CONF_SERVICE_PRINCIPAL): cv.string,
35 vol.Required(CONF_FILTER): FILTER_SCHEMA,
39 extra=vol.ALLOW_EXTRA,
43 def setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
44 """Activate Google Pub/Sub component."""
45 config = yaml_config[DOMAIN]
46 project_id = config[CONF_PROJECT_ID]
47 topic_name = config[CONF_TOPIC_NAME]
48 service_principal_path = hass.config.path(config[CONF_SERVICE_PRINCIPAL])
50 if not os.path.isfile(service_principal_path):
51 _LOGGER.error(
"Path to credentials file cannot be found")
54 entities_filter = config[CONF_FILTER]
56 publisher = PublisherClient.from_service_account_json(service_principal_path)
58 topic_path = publisher.topic_path(project_id, topic_name)
62 def send_to_pubsub(event: Event[EventStateChangedData]):
63 """Send states to Pub/Sub."""
64 state = event.data[
"new_state"]
67 or state.state
in (STATE_UNKNOWN,
"", STATE_UNAVAILABLE)
68 or not entities_filter(state.entity_id)
72 as_dict = state.as_dict()
73 data = json.dumps(obj=as_dict, default=encoder.encode).encode(
"utf-8")
75 publisher.publish(topic_path, data=data)
77 hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub)
83 """Encode python objects.
85 Additionally add encoding for datetime objects as isoformat.
89 """Implement encoding logic."""
90 if isinstance(o, datetime.datetime):
bool setup(HomeAssistant hass, ConfigType yaml_config)