1 """Support for Mailgun."""
8 from aiohttp
import web
9 import voluptuous
as vol
19 from .const
import DOMAIN
21 _LOGGER = logging.getLogger(__name__)
23 CONF_SANDBOX =
"sandbox"
25 DEFAULT_SANDBOX =
False
27 MESSAGE_RECEIVED = f
"{DOMAIN}_message_received"
29 CONFIG_SCHEMA = vol.Schema(
31 vol.Optional(DOMAIN): vol.Schema(
33 vol.Required(CONF_API_KEY): cv.string,
34 vol.Required(CONF_DOMAIN): cv.string,
35 vol.Optional(CONF_SANDBOX, default=DEFAULT_SANDBOX): cv.boolean,
39 extra=vol.ALLOW_EXTRA,
43 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
44 """Set up the Mailgun component."""
45 if DOMAIN
not in config:
48 hass.data[DOMAIN] = config[DOMAIN]
53 hass: HomeAssistant, webhook_id: str, request: web.Request
55 """Handle incoming webhook with Mailgun inbound messages."""
56 body = await request.text()
58 data = json.loads(body)
if body
else {}
63 isinstance(data, dict)
64 and "signature" in data
67 data[
"webhook_id"] = webhook_id
68 hass.bus.async_fire(MESSAGE_RECEIVED, data)
72 "Mailgun webhook received an unauthenticated message - webhook_id: %s",
78 """Verify webhook was signed by Mailgun."""
79 if DOMAIN
not in hass.data:
80 _LOGGER.warning(
"Cannot validate Mailgun webhook, missing API Key")
83 if not (token
and timestamp
and signature):
86 hmac_digest = hmac.new(
87 key=bytes(hass.data[DOMAIN][CONF_API_KEY],
"utf-8"),
88 msg=bytes(f
"{timestamp}{token}",
"utf-8"),
89 digestmod=hashlib.sha256,
92 return hmac.compare_digest(signature, hmac_digest)
96 """Configure based on config entry."""
97 webhook.async_register(
98 hass, DOMAIN,
"Mailgun", entry.data[CONF_WEBHOOK_ID], handle_webhook
104 """Unload a config entry."""
105 webhook.async_unregister(hass, entry.data[CONF_WEBHOOK_ID])
109 async_remove_entry = config_entry_flow.webhook_async_remove_entry
bool async_setup(HomeAssistant hass, ConfigType config)
None handle_webhook(HomeAssistant hass, str webhook_id, web.Request request)
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
def verify_webhook(hass, token=None, timestamp=None, signature=None)