Home Assistant Unofficial Reference 2024.12.1
server.py
Go to the documentation of this file.
1 """Go2rtc server."""
2 
3 import asyncio
4 from collections import deque
5 from contextlib import suppress
6 import logging
7 from tempfile import NamedTemporaryFile
8 
9 from go2rtc_client import Go2RtcRestClient
10 
11 from homeassistant.core import HomeAssistant
12 from homeassistant.exceptions import HomeAssistantError
13 from homeassistant.helpers.aiohttp_client import async_get_clientsession
14 
15 from .const import HA_MANAGED_API_PORT, HA_MANAGED_URL
16 
17 _LOGGER = logging.getLogger(__name__)
18 _TERMINATE_TIMEOUT = 5
19 _SETUP_TIMEOUT = 30
20 _SUCCESSFUL_BOOT_MESSAGE = "INF [api] listen addr="
21 _LOCALHOST_IP = "127.0.0.1"
22 _LOG_BUFFER_SIZE = 512
23 _RESPAWN_COOLDOWN = 1
24 
25 # Default configuration for HA
26 # - Api is listening only on localhost
27 # - Enable rtsp for localhost only as ffmpeg needs it
28 # - Clear default ice servers
29 _GO2RTC_CONFIG_FORMAT = r"""# This file is managed by Home Assistant
30 # Do not edit it manually
31 
32 api:
33  listen: "{api_ip}:{api_port}"
34 
35 rtsp:
36  listen: "127.0.0.1:18554"
37 
38 webrtc:
39  listen: ":18555/tcp"
40  ice_servers: []
41 """
42 
43 _LOG_LEVEL_MAP = {
44  "TRC": logging.DEBUG,
45  "DBG": logging.DEBUG,
46  "INF": logging.DEBUG,
47  "WRN": logging.WARNING,
48  "ERR": logging.WARNING,
49  "FTL": logging.ERROR,
50  "PNC": logging.ERROR,
51 }
52 
53 
55  """Raised when server does not start."""
56 
57  _message = "Go2rtc server didn't start correctly"
58 
59 
61  """Raised on watchdog error."""
62 
63 
64 def _create_temp_file(api_ip: str) -> str:
65  """Create temporary config file."""
66  # Set delete=False to prevent the file from being deleted when the file is closed
67  # Linux is clearing tmp folder on reboot, so no need to delete it manually
68  with NamedTemporaryFile(prefix="go2rtc_", suffix=".yaml", delete=False) as file:
69  file.write(
70  _GO2RTC_CONFIG_FORMAT.format(
71  api_ip=api_ip, api_port=HA_MANAGED_API_PORT
72  ).encode()
73  )
74  return file.name
75 
76 
77 class Server:
78  """Go2rtc server."""
79 
80  def __init__(
81  self, hass: HomeAssistant, binary: str, *, enable_ui: bool = False
82  ) -> None:
83  """Initialize the server."""
84  self._hass_hass = hass
85  self._binary_binary = binary
86  self._log_buffer: deque[str] = deque(maxlen=_LOG_BUFFER_SIZE)
87  self._process_process: asyncio.subprocess.Process | None = None
88  self._startup_complete_startup_complete = asyncio.Event()
89  self._api_ip_api_ip = _LOCALHOST_IP
90  if enable_ui:
91  # Listen on all interfaces for allowing access from all ips
92  self._api_ip_api_ip = ""
93  self._watchdog_task_watchdog_task: asyncio.Task | None = None
94  self._watchdog_tasks: list[asyncio.Task] = []
95 
96  async def start(self) -> None:
97  """Start the server."""
98  await self._start_start()
99  self._watchdog_task_watchdog_task = asyncio.create_task(
100  self._watchdog_watchdog(), name="Go2rtc respawn"
101  )
102 
103  async def _start(self) -> None:
104  """Start the server."""
105  _LOGGER.debug("Starting go2rtc server")
106  config_file = await self._hass_hass.async_add_executor_job(
107  _create_temp_file, self._api_ip_api_ip
108  )
109 
110  self._startup_complete_startup_complete.clear()
111 
112  self._process_process = await asyncio.create_subprocess_exec(
113  self._binary_binary,
114  "-c",
115  config_file,
116  stdout=asyncio.subprocess.PIPE,
117  stderr=asyncio.subprocess.STDOUT,
118  close_fds=False, # required for posix_spawn on CPython < 3.13
119  )
120 
121  self._hass_hass.async_create_background_task(
122  self._log_output_log_output(self._process_process), "Go2rtc log output"
123  )
124 
125  try:
126  async with asyncio.timeout(_SETUP_TIMEOUT):
127  await self._startup_complete_startup_complete.wait()
128  except TimeoutError as err:
129  msg = "Go2rtc server didn't start correctly"
130  _LOGGER.exception(msg)
131  self._log_server_output_log_server_output(logging.WARNING)
132  await self._stop_stop()
133  raise Go2RTCServerStartError from err
134 
135  # Check the server version
136  client = Go2RtcRestClient(async_get_clientsession(self._hass_hass), HA_MANAGED_URL)
137  await client.validate_server_version()
138 
139  async def _log_output(self, process: asyncio.subprocess.Process) -> None:
140  """Log the output of the process."""
141  assert process.stdout is not None
142 
143  async for line in process.stdout:
144  msg = line[:-1].decode().strip()
145  self._log_buffer.append(msg)
146  loglevel = logging.WARNING
147  if len(split_msg := msg.split(" ", 2)) == 3:
148  loglevel = _LOG_LEVEL_MAP.get(split_msg[1], loglevel)
149  _LOGGER.log(loglevel, msg)
150  if not self._startup_complete_startup_complete.is_set() and _SUCCESSFUL_BOOT_MESSAGE in msg:
151  self._startup_complete_startup_complete.set()
152 
153  def _log_server_output(self, loglevel: int) -> None:
154  """Log captured process output, then clear the log buffer."""
155  for line in list(self._log_buffer): # Copy the deque to avoid mutation error
156  _LOGGER.log(loglevel, line)
157  self._log_buffer.clear()
158 
159  async def _watchdog(self) -> None:
160  """Keep respawning go2rtc servers.
161 
162  A new go2rtc server is spawned if the process terminates or the API
163  stops responding.
164  """
165  while True:
166  try:
167  monitor_process_task = asyncio.create_task(self._monitor_process_monitor_process())
168  self._watchdog_tasks.append(monitor_process_task)
169  monitor_process_task.add_done_callback(self._watchdog_tasks.remove)
170  monitor_api_task = asyncio.create_task(self._monitor_api_monitor_api())
171  self._watchdog_tasks.append(monitor_api_task)
172  monitor_api_task.add_done_callback(self._watchdog_tasks.remove)
173  try:
174  await asyncio.gather(monitor_process_task, monitor_api_task)
175  except Go2RTCWatchdogError:
176  _LOGGER.debug("Caught Go2RTCWatchdogError")
177  for task in self._watchdog_tasks:
178  if task.done():
179  if not task.cancelled():
180  task.exception()
181  continue
182  task.cancel()
183  await asyncio.sleep(_RESPAWN_COOLDOWN)
184  try:
185  await self._stop_stop()
186  _LOGGER.warning("Go2rtc unexpectedly stopped, server log:")
187  self._log_server_output_log_server_output(logging.WARNING)
188  _LOGGER.debug("Spawning new go2rtc server")
189  with suppress(Go2RTCServerStartError):
190  await self._start_start()
191  except Exception:
192  _LOGGER.exception(
193  "Unexpected error when restarting go2rtc server"
194  )
195  except Exception:
196  _LOGGER.exception("Unexpected error in go2rtc server watchdog")
197 
198  async def _monitor_process(self) -> None:
199  """Raise if the go2rtc process terminates."""
200  _LOGGER.debug("Monitoring go2rtc server process")
201  if self._process_process:
202  await self._process_process.wait()
203  _LOGGER.debug("go2rtc server terminated")
204  raise Go2RTCWatchdogError("Process ended")
205 
206  async def _monitor_api(self) -> None:
207  """Raise if the go2rtc process terminates."""
208  client = Go2RtcRestClient(async_get_clientsession(self._hass_hass), HA_MANAGED_URL)
209 
210  _LOGGER.debug("Monitoring go2rtc API")
211  try:
212  while True:
213  await client.validate_server_version()
214  await asyncio.sleep(10)
215  except Exception as err:
216  _LOGGER.debug("go2rtc API did not reply", exc_info=True)
217  raise Go2RTCWatchdogError("API error") from err
218 
219  async def _stop_watchdog(self) -> None:
220  """Handle watchdog stop request."""
221  tasks: list[asyncio.Task] = []
222  if watchdog_task := self._watchdog_task_watchdog_task:
223  self._watchdog_task_watchdog_task = None
224  tasks.append(watchdog_task)
225  watchdog_task.cancel()
226  for task in self._watchdog_tasks:
227  tasks.append(task)
228  task.cancel()
229  await asyncio.gather(*tasks, return_exceptions=True)
230 
231  async def stop(self) -> None:
232  """Stop the server and abort the watchdog task."""
233  _LOGGER.debug("Server stop requested")
234  await self._stop_watchdog_stop_watchdog()
235  await self._stop_stop()
236 
237  async def _stop(self) -> None:
238  """Stop the server."""
239  if self._process_process:
240  _LOGGER.debug("Stopping go2rtc server")
241  process = self._process_process
242  self._process_process = None
243  with suppress(ProcessLookupError):
244  process.terminate()
245  try:
246  await asyncio.wait_for(process.wait(), timeout=_TERMINATE_TIMEOUT)
247  except TimeoutError:
248  _LOGGER.warning("Go2rtc server didn't terminate gracefully. Killing it")
249  with suppress(ProcessLookupError):
250  process.kill()
251  else:
252  _LOGGER.debug("Go2rtc server has been stopped")
None _log_output(self, asyncio.subprocess.Process process)
Definition: server.py:139
None _log_server_output(self, int loglevel)
Definition: server.py:153
None __init__(self, HomeAssistant hass, str binary, *bool enable_ui=False)
Definition: server.py:82
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)