1 """Flock platform for notify component."""
3 from __future__
import annotations
6 from http
import HTTPStatus
9 import voluptuous
as vol
12 PLATFORM_SCHEMA
as NOTIFY_PLATFORM_SCHEMA,
13 BaseNotificationService,
21 _LOGGER = logging.getLogger(__name__)
22 _RESOURCE =
"https://api.flock.com/hooks/sendMessage/"
24 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
25 {vol.Required(CONF_ACCESS_TOKEN): cv.string}
32 discovery_info: DiscoveryInfoType |
None =
None,
33 ) -> FlockNotificationService:
34 """Get the Flock notification service."""
35 access_token = config.get(CONF_ACCESS_TOKEN)
36 url = f
"{_RESOURCE}{access_token}"
43 """Implement the notification service for Flock."""
46 """Initialize the Flock notification service."""
51 """Send the message to the user."""
52 payload = {
"text": message}
54 _LOGGER.debug(
"Attempting to call Flock at %s", self.
_url_url)
57 async
with asyncio.timeout(10):
59 result = await response.json()
61 if response.status != HTTPStatus.OK
or "error" in result:
63 "Flock service returned HTTP status %d, response %s",
68 _LOGGER.error(
"Timeout accessing Flock at %s", self.
_url_url)
def async_send_message(self, message, **kwargs)
def __init__(self, url, session)
web.Response post(self, web.Request request, str config_key)
FlockNotificationService async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)