Home Assistant Unofficial Reference 2024.12.1
notify.py
Go to the documentation of this file.
1 """Kodi notification service."""
2 
3 from __future__ import annotations
4 
5 import logging
6 
7 import aiohttp
8 import jsonrpc_async
9 import voluptuous as vol
10 
12  ATTR_DATA,
13  ATTR_TITLE,
14  ATTR_TITLE_DEFAULT,
15  PLATFORM_SCHEMA as NOTIFY_PLATFORM_SCHEMA,
16  BaseNotificationService,
17 )
18 from homeassistant.const import (
19  ATTR_ICON,
20  CONF_HOST,
21  CONF_PASSWORD,
22  CONF_PORT,
23  CONF_PROXY_SSL,
24  CONF_USERNAME,
25 )
26 from homeassistant.core import HomeAssistant
27 from homeassistant.helpers.aiohttp_client import async_get_clientsession
29 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
30 
31 _LOGGER = logging.getLogger(__name__)
32 
33 DEFAULT_PORT = 8080
34 DEFAULT_PROXY_SSL = False
35 DEFAULT_TIMEOUT = 5
36 
37 PLATFORM_SCHEMA = NOTIFY_PLATFORM_SCHEMA.extend(
38  {
39  vol.Required(CONF_HOST): cv.string,
40  vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
41  vol.Optional(CONF_PROXY_SSL, default=DEFAULT_PROXY_SSL): cv.boolean,
42  vol.Inclusive(CONF_USERNAME, "auth"): cv.string,
43  vol.Inclusive(CONF_PASSWORD, "auth"): cv.string,
44  }
45 )
46 
47 ATTR_DISPLAYTIME = "displaytime"
48 
49 
51  hass: HomeAssistant,
52  config: ConfigType,
53  discovery_info: DiscoveryInfoType | None = None,
54 ) -> KodiNotificationService:
55  """Return the notify service."""
56  username: str | None = config.get(CONF_USERNAME)
57  password: str | None = config.get(CONF_PASSWORD)
58 
59  host: str = config[CONF_HOST]
60  port: int = config[CONF_PORT]
61  encryption = config.get(CONF_PROXY_SSL)
62 
63  if host.startswith(("http://", "https://")):
64  host = host[host.index("://") + 3 :]
65  _LOGGER.warning(
66  "Kodi host name should no longer contain http:// See updated "
67  "definitions here: "
68  "https://www.home-assistant.io/integrations/kodi/"
69  )
70 
71  http_protocol = "https" if encryption else "http"
72  url = f"{http_protocol}://{host}:{port}/jsonrpc"
73 
74  if username is not None and password is not None:
75  auth = aiohttp.BasicAuth(username, password)
76  else:
77  auth = None
78 
79  return KodiNotificationService(hass, url, auth)
80 
81 
82 class KodiNotificationService(BaseNotificationService):
83  """Implement the notification service for Kodi."""
84 
85  def __init__(self, hass, url, auth=None):
86  """Initialize the service."""
87  self._url_url = url
88 
89  kwargs = {"timeout": DEFAULT_TIMEOUT, "session": async_get_clientsession(hass)}
90 
91  if auth is not None:
92  kwargs["auth"] = auth
93 
94  self._server_server = jsonrpc_async.Server(self._url_url, **kwargs)
95 
96  async def async_send_message(self, message="", **kwargs):
97  """Send a message to Kodi."""
98  try:
99  data = kwargs.get(ATTR_DATA) or {}
100 
101  displaytime = int(data.get(ATTR_DISPLAYTIME, 10000))
102  icon = data.get(ATTR_ICON, "info")
103  title = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
104  await self._server_server.GUI.ShowNotification(title, message, icon, displaytime)
105 
106  except jsonrpc_async.TransportError:
107  _LOGGER.warning("Unable to fetch Kodi data. Is Kodi online?")
def async_send_message(self, message="", **kwargs)
Definition: notify.py:96
KodiNotificationService async_get_service(HomeAssistant hass, ConfigType config, DiscoveryInfoType|None discovery_info=None)
Definition: notify.py:54
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)