Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """AWS platform for notify component."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import base64
7 import json
8 import logging
9 from typing import Any
10 
11 from aiobotocore.session import AioSession
12 
14  ATTR_DATA,
15  ATTR_TARGET,
16  ATTR_TITLE,
17  ATTR_TITLE_DEFAULT,
18  BaseNotificationService,
19 )
20 from homeassistant.const import (
21  CONF_NAME,
22  CONF_PLATFORM,
23  CONF_PROFILE_NAME,
24  CONF_SERVICE,
25 )
26 from homeassistant.core import HomeAssistant
27 from homeassistant.helpers.json import JSONEncoder
28 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
29 
30 from .const import CONF_CONTEXT, CONF_CREDENTIAL_NAME, CONF_REGION, DATA_SESSIONS
31 
32 _LOGGER = logging.getLogger(__name__)
33 
34 
35 async def get_available_regions(hass, service):
36  """Get available regions for a service."""
37  session = AioSession()
38  return await session.get_available_regions(service)
39 
40 
42  hass: HomeAssistant,
43  config: ConfigType,
44  discovery_info: DiscoveryInfoType | None = None,
45 ) -> AWSNotify | None:
46  """Get the AWS notification service."""
47  if discovery_info is None:
48  _LOGGER.error("Please config aws notify platform in aws component")
49  return None
50 
51  session = None
52 
53  conf = discovery_info
54 
55  service = conf[CONF_SERVICE]
56  region_name = conf[CONF_REGION]
57 
58  available_regions = await get_available_regions(hass, service)
59  if region_name not in available_regions:
60  _LOGGER.error(
61  "Region %s is not available for %s service, must in %s",
62  region_name,
63  service,
64  available_regions,
65  )
66  return None
67 
68  aws_config = conf.copy()
69 
70  del aws_config[CONF_SERVICE]
71  del aws_config[CONF_REGION]
72  if CONF_PLATFORM in aws_config:
73  del aws_config[CONF_PLATFORM]
74  if CONF_NAME in aws_config:
75  del aws_config[CONF_NAME]
76  if CONF_CONTEXT in aws_config:
77  del aws_config[CONF_CONTEXT]
78 
79  if not aws_config:
80  # no platform config, use the first aws component credential instead
81  if hass.data[DATA_SESSIONS]:
82  session = next(iter(hass.data[DATA_SESSIONS].values()))
83  else:
84  _LOGGER.error("Missing aws credential for %s", config[CONF_NAME])
85  return None
86 
87  if session is None:
88  credential_name = aws_config.get(CONF_CREDENTIAL_NAME)
89  if credential_name is not None:
90  session = hass.data[DATA_SESSIONS].get(credential_name)
91  if session is None:
92  _LOGGER.warning("No available aws session for %s", credential_name)
93  del aws_config[CONF_CREDENTIAL_NAME]
94 
95  if session is None:
96  if (profile := aws_config.get(CONF_PROFILE_NAME)) is not None:
97  session = AioSession(profile=profile)
98  del aws_config[CONF_PROFILE_NAME]
99  else:
100  session = AioSession()
101 
102  aws_config[CONF_REGION] = region_name
103 
104  if service == "lambda":
105  context_str = json.dumps(
106  {"custom": conf.get(CONF_CONTEXT, {})}, cls=JSONEncoder
107  )
108  context_b64 = base64.b64encode(context_str.encode("utf-8"))
109  context = context_b64.decode("utf-8")
110  return AWSLambda(session, aws_config, context)
111 
112  if service == "sns":
113  return AWSSNS(session, aws_config)
114 
115  if service == "sqs":
116  return AWSSQS(session, aws_config)
117 
118  if service == "events":
119  return AWSEventBridge(session, aws_config)
120 
121  # should not reach here since service was checked in schema
122  return None
123 
124 
125 class AWSNotify(BaseNotificationService):
126  """Implement the notification service for the AWS service."""
127 
128  def __init__(self, session, aws_config):
129  """Initialize the service."""
130  self.sessionsession = session
131  self.aws_configaws_config = aws_config
132 
133 
135  """Implement the notification service for the AWS Lambda service."""
136 
137  service = "lambda"
138 
139  def __init__(self, session, aws_config, context):
140  """Initialize the service."""
141  super().__init__(session, aws_config)
142  self.contextcontext = context
143 
144  async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
145  """Send notification to specified LAMBDA ARN."""
146  if not kwargs.get(ATTR_TARGET):
147  _LOGGER.error("At least one target is required")
148  return
149 
150  cleaned_kwargs = {k: v for k, v in kwargs.items() if v is not None}
151  payload = {"message": message}
152  payload.update(cleaned_kwargs)
153  json_payload = json.dumps(payload)
154 
155  async with self.sessionsession.create_client(
156  self.serviceservice, **self.aws_configaws_config
157  ) as client:
158  tasks = [
159  client.invoke(
160  FunctionName=target,
161  Payload=json_payload,
162  ClientContext=self.contextcontext,
163  )
164  for target in kwargs.get(ATTR_TARGET, [])
165  ]
166 
167  if tasks:
168  await asyncio.gather(*tasks)
169 
170 
172  """Implement the notification service for the AWS SNS service."""
173 
174  service = "sns"
175 
176  async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
177  """Send notification to specified SNS ARN."""
178  if not kwargs.get(ATTR_TARGET):
179  _LOGGER.error("At least one target is required")
180  return
181 
182  message_attributes = {}
183  if data := kwargs.get(ATTR_DATA):
184  message_attributes = {
185  k: {"StringValue": v, "DataType": "String"}
186  for k, v in data.items()
187  if v is not None
188  }
189  subject = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
190 
191  async with self.sessionsession.create_client(
192  self.serviceservice, **self.aws_configaws_config
193  ) as client:
194  tasks = [
195  client.publish(
196  TargetArn=target,
197  Message=message,
198  Subject=subject,
199  MessageAttributes=message_attributes,
200  )
201  for target in kwargs.get(ATTR_TARGET, [])
202  ]
203 
204  if tasks:
205  await asyncio.gather(*tasks)
206 
207 
209  """Implement the notification service for the AWS SQS service."""
210 
211  service = "sqs"
212 
213  async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
214  """Send notification to specified SQS ARN."""
215  if not kwargs.get(ATTR_TARGET):
216  _LOGGER.error("At least one target is required")
217  return
218 
219  cleaned_kwargs = {k: v for k, v in kwargs.items() if v is not None}
220  message_body = {"message": message}
221  message_body.update(cleaned_kwargs)
222  json_body = json.dumps(message_body)
223  message_attributes = {}
224  for key, val in cleaned_kwargs.items():
225  message_attributes[key] = {
226  "StringValue": json.dumps(val),
227  "DataType": "String",
228  }
229 
230  async with self.sessionsession.create_client(
231  self.serviceservice, **self.aws_configaws_config
232  ) as client:
233  tasks = [
234  client.send_message(
235  QueueUrl=target,
236  MessageBody=json_body,
237  MessageAttributes=message_attributes,
238  )
239  for target in kwargs.get(ATTR_TARGET, [])
240  ]
241 
242  if tasks:
243  await asyncio.gather(*tasks)
244 
245 
247  """Implement the notification service for the AWS EventBridge service."""
248 
249  service = "events"
250 
251  async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
252  """Send notification to specified EventBus."""
253 
254  cleaned_kwargs = {k: v for k, v in kwargs.items() if v is not None}
255  data = cleaned_kwargs.get(ATTR_DATA, {})
256  detail = (
257  json.dumps(data["detail"])
258  if "detail" in data
259  else json.dumps({"message": message})
260  )
261 
262  async with self.sessionsession.create_client(
263  self.serviceservice, **self.aws_configaws_config
264  ) as client:
265  entries = []
266  for target in kwargs.get(ATTR_TARGET, [None]):
267  entry = {
268  "Source": data.get("source", "homeassistant"),
269  "Resources": data.get("resources", []),
270  "Detail": detail,
271  "DetailType": data.get("detail_type", ""),
272  }
273  if target:
274  entry["EventBusName"] = target
275 
276  entries.append(entry)
277  tasks = [
278  client.put_events(Entries=entries[i : min(i + 10, len(entries))])
279  for i in range(0, len(entries), 10)
280  ]
281 
282  if tasks:
283  results = await asyncio.gather(*tasks)
284  for result in results:
285  for entry in result["Entries"]:
286  if len(entry.get("EventId", "")) == 0:
287  _LOGGER.error(
288  "Failed to send event: ErrorCode=%s ErrorMessage=%s",
289  entry["ErrorCode"],
290  entry["ErrorMessage"],
291  )
None async_send_message(self, str message="", **Any kwargs)
Definition: notify.py:251
None async_send_message(self, str message="", **Any kwargs)
Definition: notify.py:144
def __init__(self, session, aws_config, context)
Definition: notify.py:139
def __init__(self, session, aws_config)
Definition: notify.py:128
None async_send_message(self, str message="", **Any kwargs)
Definition: notify.py:176
None async_send_message(self, str message="", **Any kwargs)
Definition: notify.py:213
def get_available_regions(hass, service)
Definition: notify.py:35
AWSNotify|None async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:45
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
JellyfinClient create_client(str device_id, str|None device_name=None)