Home Assistant Unofficial Reference 2024.12.1
upnp.py
Go to the documentation of this file.
1 """Support UPNP discovery method that mimics Hue hubs."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from contextlib import suppress
7 import logging
8 import socket
9 from typing import cast
10 
11 from aiohttp import web
12 
13 from homeassistant import core
14 from homeassistant.components.http import HomeAssistantView
15 
16 from .config import Config
17 from .const import HUE_SERIAL_NUMBER, HUE_UUID
18 
19 _LOGGER = logging.getLogger(__name__)
20 
21 BROADCAST_PORT = 1900
22 BROADCAST_ADDR = "239.255.255.250"
23 
24 
25 class DescriptionXmlView(HomeAssistantView):
26  """Handles requests for the description.xml file."""
27 
28  url = "/description.xml"
29  name = "description:xml"
30  requires_auth = False
31 
32  def __init__(self, config: Config) -> None:
33  """Initialize the instance of the view."""
34  self.configconfig = config
35 
36  @core.callback
37  def get(self, request: web.Request) -> web.Response:
38  """Handle a GET request."""
39  resp_text = f"""<?xml version="1.0" encoding="UTF-8" ?>
40 <root xmlns="urn:schemas-upnp-org:device-1-0">
41 <specVersion>
42 <major>1</major>
43 <minor>0</minor>
44 </specVersion>
45 <URLBase>http://{self.config.advertise_ip}:{self.config.advertise_port}/</URLBase>
46 <device>
47 <deviceType>urn:schemas-upnp-org:device:Basic:1</deviceType>
48 <friendlyName>Home Assistant Bridge ({self.config.advertise_ip})</friendlyName>
49 <manufacturer>Royal Philips Electronics</manufacturer>
50 <manufacturerURL>http://www.philips.com</manufacturerURL>
51 <modelDescription>Philips hue Personal Wireless Lighting</modelDescription>
52 <modelName>Philips hue bridge 2015</modelName>
53 <modelNumber>BSB002</modelNumber>
54 <modelURL>http://www.meethue.com</modelURL>
55 <serialNumber>{HUE_SERIAL_NUMBER}</serialNumber>
56 <UDN>uuid:{HUE_UUID}</UDN>
57 </device>
58 </root>
59 """
60 
61  return web.Response(text=resp_text, content_type="text/xml")
62 
63 
64 class UPNPResponderProtocol(asyncio.Protocol):
65  """Handle responding to UPNP/SSDP discovery requests."""
66 
67  def __init__(
68  self,
69  loop: asyncio.AbstractEventLoop,
70  ssdp_socket: socket.socket,
71  advertise_ip: str,
72  advertise_port: int,
73  ) -> None:
74  """Initialize the class."""
75  self.transporttransport: asyncio.DatagramTransport | None = None
76  self._loop_loop = loop
77  self._sock_sock = ssdp_socket
78  self.advertise_ipadvertise_ip = advertise_ip
79  self.advertise_portadvertise_port = advertise_port
80  self._upnp_root_response_upnp_root_response = self._prepare_response_prepare_response(
81  "upnp:rootdevice", f"uuid:{HUE_UUID}::upnp:rootdevice"
82  )
83  self._upnp_device_response_upnp_device_response = self._prepare_response_prepare_response(
84  "urn:schemas-upnp-org:device:basic:1", f"uuid:{HUE_UUID}"
85  )
86 
87  def connection_made(self, transport: asyncio.BaseTransport) -> None:
88  """Set the transport."""
89  self.transporttransport = cast(asyncio.DatagramTransport, transport)
90 
91  def connection_lost(self, exc: Exception | None) -> None:
92  """Handle connection lost."""
93 
94  def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
95  """Respond to msearch packets."""
96  decoded_data = data.decode("utf-8", errors="ignore")
97 
98  if "M-SEARCH" not in decoded_data:
99  return
100 
101  _LOGGER.debug("UPNP Responder M-SEARCH method received: %s", data)
102  # SSDP M-SEARCH method received, respond to it with our info
103  response = self._handle_request_handle_request(decoded_data)
104  _LOGGER.debug("UPNP Responder responding with: %s", response)
105  assert self.transporttransport is not None
106  self.transporttransport.sendto(response, addr)
107 
108  def error_received(self, exc: Exception) -> None:
109  """Log UPNP errors."""
110  _LOGGER.error("UPNP Error received: %s", exc)
111 
112  def close(self) -> None:
113  """Stop the server."""
114  _LOGGER.info("UPNP responder shutting down")
115  if self.transporttransport:
116  self.transporttransport.close()
117  self._loop_loop.remove_writer(self._sock_sock.fileno())
118  self._loop_loop.remove_reader(self._sock_sock.fileno())
119  self._sock_sock.close()
120 
121  def _handle_request(self, decoded_data: str) -> bytes:
122  if "upnp:rootdevice" in decoded_data:
123  return self._upnp_root_response_upnp_root_response
124 
125  return self._upnp_device_response_upnp_device_response
126 
127  def _prepare_response(self, search_target: str, unique_service_name: str) -> bytes:
128  # Note that the double newline at the end of
129  # this string is required per the SSDP spec
130  response = f"""HTTP/1.1 200 OK
131 CACHE-CONTROL: max-age=60
132 EXT:
133 LOCATION: http://{self.advertise_ip}:{self.advertise_port}/description.xml
134 SERVER: FreeRTOS/6.0.5, UPnP/1.0, IpBridge/1.16.0
135 hue-bridgeid: {HUE_SERIAL_NUMBER}
136 ST: {search_target}
137 USN: {unique_service_name}
138 
139 """
140  return response.replace("\n", "\r\n").encode("utf-8")
141 
142 
144  host_ip_addr: str,
145  upnp_bind_multicast: bool,
146  advertise_ip: str,
147  advertise_port: int,
148 ) -> UPNPResponderProtocol:
149  """Create the UPNP socket and protocol."""
150  # Listen for UDP port 1900 packets sent to SSDP multicast address
151  ssdp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
152  ssdp_socket.setblocking(False)
153 
154  # Required for receiving multicast
155  # Note: some code duplication from async_upnp_client/ssdp.py here.
156  ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
157  with suppress(AttributeError):
158  ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
159 
160  ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
161 
162  ssdp_socket.setsockopt(
163  socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(host_ip_addr)
164  )
165 
166  ssdp_socket.setsockopt(
167  socket.SOL_IP,
168  socket.IP_ADD_MEMBERSHIP,
169  socket.inet_aton(BROADCAST_ADDR) + socket.inet_aton(host_ip_addr),
170  )
171 
172  ssdp_socket.bind(("" if upnp_bind_multicast else host_ip_addr, BROADCAST_PORT))
173 
174  loop = asyncio.get_event_loop()
175 
176  transport_protocol = await loop.create_datagram_endpoint(
177  lambda: UPNPResponderProtocol(loop, ssdp_socket, advertise_ip, advertise_port),
178  sock=ssdp_socket,
179  )
180  return transport_protocol[1]
web.Response get(self, web.Request request)
Definition: upnp.py:37
bytes _prepare_response(self, str search_target, str unique_service_name)
Definition: upnp.py:127
None datagram_received(self, bytes data, tuple[str, int] addr)
Definition: upnp.py:94
None __init__(self, asyncio.AbstractEventLoop loop, socket.socket ssdp_socket, str advertise_ip, int advertise_port)
Definition: upnp.py:73
None connection_made(self, asyncio.BaseTransport transport)
Definition: upnp.py:87
UPNPResponderProtocol async_create_upnp_datagram_endpoint(str host_ip_addr, bool upnp_bind_multicast, str advertise_ip, int advertise_port)
Definition: upnp.py:148