1 """AWS platform for notify component."""
3 from __future__
import annotations
11 from aiobotocore.session
import AioSession
18 BaseNotificationService,
30 from .const
import CONF_CONTEXT, CONF_CREDENTIAL_NAME, CONF_REGION, DATA_SESSIONS
32 _LOGGER = logging.getLogger(__name__)
36 """Get available regions for a service."""
37 session = AioSession()
38 return await session.get_available_regions(service)
44 discovery_info: DiscoveryInfoType |
None =
None,
45 ) -> AWSNotify |
None:
46 """Get the AWS notification service."""
47 if discovery_info
is None:
48 _LOGGER.error(
"Please config aws notify platform in aws component")
55 service = conf[CONF_SERVICE]
56 region_name = conf[CONF_REGION]
59 if region_name
not in available_regions:
61 "Region %s is not available for %s service, must in %s",
68 aws_config = conf.copy()
70 del aws_config[CONF_SERVICE]
71 del aws_config[CONF_REGION]
72 if CONF_PLATFORM
in aws_config:
73 del aws_config[CONF_PLATFORM]
74 if CONF_NAME
in aws_config:
75 del aws_config[CONF_NAME]
76 if CONF_CONTEXT
in aws_config:
77 del aws_config[CONF_CONTEXT]
81 if hass.data[DATA_SESSIONS]:
82 session = next(iter(hass.data[DATA_SESSIONS].values()))
84 _LOGGER.error(
"Missing aws credential for %s", config[CONF_NAME])
88 credential_name = aws_config.get(CONF_CREDENTIAL_NAME)
89 if credential_name
is not None:
90 session = hass.data[DATA_SESSIONS].
get(credential_name)
92 _LOGGER.warning(
"No available aws session for %s", credential_name)
93 del aws_config[CONF_CREDENTIAL_NAME]
96 if (profile := aws_config.get(CONF_PROFILE_NAME))
is not None:
97 session = AioSession(profile=profile)
98 del aws_config[CONF_PROFILE_NAME]
100 session = AioSession()
102 aws_config[CONF_REGION] = region_name
104 if service ==
"lambda":
105 context_str = json.dumps(
106 {
"custom": conf.get(CONF_CONTEXT, {})}, cls=JSONEncoder
108 context_b64 = base64.b64encode(context_str.encode(
"utf-8"))
109 context = context_b64.decode(
"utf-8")
110 return AWSLambda(session, aws_config, context)
113 return AWSSNS(session, aws_config)
116 return AWSSQS(session, aws_config)
118 if service ==
"events":
126 """Implement the notification service for the AWS service."""
129 """Initialize the service."""
135 """Implement the notification service for the AWS Lambda service."""
140 """Initialize the service."""
141 super().
__init__(session, aws_config)
145 """Send notification to specified LAMBDA ARN."""
146 if not kwargs.get(ATTR_TARGET):
147 _LOGGER.error(
"At least one target is required")
150 cleaned_kwargs = {k: v
for k, v
in kwargs.items()
if v
is not None}
151 payload = {
"message": message}
152 payload.update(cleaned_kwargs)
153 json_payload = json.dumps(payload)
161 Payload=json_payload,
162 ClientContext=self.
contextcontext,
164 for target
in kwargs.get(ATTR_TARGET, [])
168 await asyncio.gather(*tasks)
172 """Implement the notification service for the AWS SNS service."""
177 """Send notification to specified SNS ARN."""
178 if not kwargs.get(ATTR_TARGET):
179 _LOGGER.error(
"At least one target is required")
182 message_attributes = {}
183 if data := kwargs.get(ATTR_DATA):
184 message_attributes = {
185 k: {
"StringValue": v,
"DataType":
"String"}
186 for k, v
in data.items()
189 subject = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
199 MessageAttributes=message_attributes,
201 for target
in kwargs.get(ATTR_TARGET, [])
205 await asyncio.gather(*tasks)
209 """Implement the notification service for the AWS SQS service."""
214 """Send notification to specified SQS ARN."""
215 if not kwargs.get(ATTR_TARGET):
216 _LOGGER.error(
"At least one target is required")
219 cleaned_kwargs = {k: v
for k, v
in kwargs.items()
if v
is not None}
220 message_body = {
"message": message}
221 message_body.update(cleaned_kwargs)
222 json_body = json.dumps(message_body)
223 message_attributes = {}
224 for key, val
in cleaned_kwargs.items():
225 message_attributes[key] = {
226 "StringValue": json.dumps(val),
227 "DataType":
"String",
236 MessageBody=json_body,
237 MessageAttributes=message_attributes,
239 for target
in kwargs.get(ATTR_TARGET, [])
243 await asyncio.gather(*tasks)
247 """Implement the notification service for the AWS EventBridge service."""
252 """Send notification to specified EventBus."""
254 cleaned_kwargs = {k: v
for k, v
in kwargs.items()
if v
is not None}
255 data = cleaned_kwargs.get(ATTR_DATA, {})
257 json.dumps(data[
"detail"])
259 else json.dumps({
"message": message})
266 for target
in kwargs.get(ATTR_TARGET, [
None]):
268 "Source": data.get(
"source",
"homeassistant"),
269 "Resources": data.get(
"resources", []),
271 "DetailType": data.get(
"detail_type",
""),
274 entry[
"EventBusName"] = target
276 entries.append(entry)
278 client.put_events(Entries=entries[i :
min(i + 10, len(entries))])
279 for i
in range(0, len(entries), 10)
283 results = await asyncio.gather(*tasks)
284 for result
in results:
285 for entry
in result[
"Entries"]:
286 if len(entry.get(
"EventId",
"")) == 0:
288 "Failed to send event: ErrorCode=%s ErrorMessage=%s",
290 entry[
"ErrorMessage"],
None async_send_message(self, str message="", **Any kwargs)
None async_send_message(self, str message="", **Any kwargs)
def __init__(self, session, aws_config, context)
def __init__(self, session, aws_config)
None async_send_message(self, str message="", **Any kwargs)
None async_send_message(self, str message="", **Any kwargs)
def get_available_regions(hass, service)
AWSNotify|None async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
web.Response get(self, web.Request request, str config_key)
JellyfinClient create_client(str device_id, str|None device_name=None)