4 from collections
import deque
5 from contextlib
import suppress
7 from tempfile
import NamedTemporaryFile
9 from go2rtc_client
import Go2RtcRestClient
15 from .const
import HA_MANAGED_API_PORT, HA_MANAGED_URL
17 _LOGGER = logging.getLogger(__name__)
18 _TERMINATE_TIMEOUT = 5
20 _SUCCESSFUL_BOOT_MESSAGE =
"INF [api] listen addr="
21 _LOCALHOST_IP =
"127.0.0.1"
22 _LOG_BUFFER_SIZE = 512
29 _GO2RTC_CONFIG_FORMAT =
r"""# This file is managed by Home Assistant
30 # Do not edit it manually
33 listen: "{api_ip}:{api_port}"
36 listen: "127.0.0.1:18554"
47 "WRN": logging.WARNING,
48 "ERR": logging.WARNING,
55 """Raised when server does not start."""
57 _message =
"Go2rtc server didn't start correctly"
61 """Raised on watchdog error."""
65 """Create temporary config file."""
68 with NamedTemporaryFile(prefix=
"go2rtc_", suffix=
".yaml", delete=
False)
as file:
70 _GO2RTC_CONFIG_FORMAT.format(
71 api_ip=api_ip, api_port=HA_MANAGED_API_PORT
81 self, hass: HomeAssistant, binary: str, *, enable_ui: bool =
False
83 """Initialize the server."""
86 self._log_buffer: deque[str] = deque(maxlen=_LOG_BUFFER_SIZE)
87 self.
_process_process: asyncio.subprocess.Process |
None =
None
94 self._watchdog_tasks: list[asyncio.Task] = []
97 """Start the server."""
100 self.
_watchdog_watchdog(), name=
"Go2rtc respawn"
104 """Start the server."""
105 _LOGGER.debug(
"Starting go2rtc server")
106 config_file = await self.
_hass_hass.async_add_executor_job(
107 _create_temp_file, self.
_api_ip_api_ip
112 self.
_process_process = await asyncio.create_subprocess_exec(
116 stdout=asyncio.subprocess.PIPE,
117 stderr=asyncio.subprocess.STDOUT,
121 self.
_hass_hass.async_create_background_task(
126 async
with asyncio.timeout(_SETUP_TIMEOUT):
128 except TimeoutError
as err:
129 msg =
"Go2rtc server didn't start correctly"
130 _LOGGER.exception(msg)
132 await self.
_stop_stop()
133 raise Go2RTCServerStartError
from err
137 await client.validate_server_version()
139 async
def _log_output(self, process: asyncio.subprocess.Process) ->
None:
140 """Log the output of the process."""
141 assert process.stdout
is not None
143 async
for line
in process.stdout:
144 msg = line[:-1].decode().strip()
145 self._log_buffer.append(msg)
146 loglevel = logging.WARNING
147 if len(split_msg := msg.split(
" ", 2)) == 3:
148 loglevel = _LOG_LEVEL_MAP.get(split_msg[1], loglevel)
149 _LOGGER.log(loglevel, msg)
150 if not self.
_startup_complete_startup_complete.is_set()
and _SUCCESSFUL_BOOT_MESSAGE
in msg:
154 """Log captured process output, then clear the log buffer."""
155 for line
in list(self._log_buffer):
156 _LOGGER.log(loglevel, line)
157 self._log_buffer.clear()
160 """Keep respawning go2rtc servers.
162 A new go2rtc server is spawned if the process terminates or the API
167 monitor_process_task = asyncio.create_task(self.
_monitor_process_monitor_process())
168 self._watchdog_tasks.append(monitor_process_task)
169 monitor_process_task.add_done_callback(self._watchdog_tasks.remove)
170 monitor_api_task = asyncio.create_task(self.
_monitor_api_monitor_api())
171 self._watchdog_tasks.append(monitor_api_task)
172 monitor_api_task.add_done_callback(self._watchdog_tasks.remove)
174 await asyncio.gather(monitor_process_task, monitor_api_task)
175 except Go2RTCWatchdogError:
176 _LOGGER.debug(
"Caught Go2RTCWatchdogError")
177 for task
in self._watchdog_tasks:
179 if not task.cancelled():
183 await asyncio.sleep(_RESPAWN_COOLDOWN)
185 await self.
_stop_stop()
186 _LOGGER.warning(
"Go2rtc unexpectedly stopped, server log:")
188 _LOGGER.debug(
"Spawning new go2rtc server")
189 with suppress(Go2RTCServerStartError):
193 "Unexpected error when restarting go2rtc server"
196 _LOGGER.exception(
"Unexpected error in go2rtc server watchdog")
199 """Raise if the go2rtc process terminates."""
200 _LOGGER.debug(
"Monitoring go2rtc server process")
203 _LOGGER.debug(
"go2rtc server terminated")
207 """Raise if the go2rtc process terminates."""
210 _LOGGER.debug(
"Monitoring go2rtc API")
213 await client.validate_server_version()
214 await asyncio.sleep(10)
215 except Exception
as err:
216 _LOGGER.debug(
"go2rtc API did not reply", exc_info=
True)
220 """Handle watchdog stop request."""
221 tasks: list[asyncio.Task] = []
224 tasks.append(watchdog_task)
225 watchdog_task.cancel()
226 for task
in self._watchdog_tasks:
229 await asyncio.gather(*tasks, return_exceptions=
True)
232 """Stop the server and abort the watchdog task."""
233 _LOGGER.debug(
"Server stop requested")
235 await self.
_stop_stop()
238 """Stop the server."""
240 _LOGGER.debug(
"Stopping go2rtc server")
243 with suppress(ProcessLookupError):
246 await asyncio.wait_for(process.wait(), timeout=_TERMINATE_TIMEOUT)
248 _LOGGER.warning(
"Go2rtc server didn't terminate gracefully. Killing it")
249 with suppress(ProcessLookupError):
252 _LOGGER.debug(
"Go2rtc server has been stopped")
None _log_output(self, asyncio.subprocess.Process process)
None _log_server_output(self, int loglevel)
None _monitor_process(self)
None __init__(self, HomeAssistant hass, str binary, *bool enable_ui=False)
None _stop_watchdog(self)
str _create_temp_file(str api_ip)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)