Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Amcrest IP cameras."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import AsyncIterator, Callable
7 from contextlib import asynccontextmanager, suppress
8 from dataclasses import dataclass
9 from datetime import datetime, timedelta
10 import logging
11 import threading
12 from typing import Any
13 
14 import aiohttp
15 from amcrest import AmcrestError, ApiWrapper, LoginError
16 import httpx
17 import voluptuous as vol
18 
19 from homeassistant.auth.models import User
20 from homeassistant.auth.permissions.const import POLICY_CONTROL
21 from homeassistant.const import (
22  ATTR_ENTITY_ID,
23  CONF_AUTHENTICATION,
24  CONF_BINARY_SENSORS,
25  CONF_HOST,
26  CONF_NAME,
27  CONF_PASSWORD,
28  CONF_PORT,
29  CONF_SCAN_INTERVAL,
30  CONF_SENSORS,
31  CONF_SWITCHES,
32  CONF_USERNAME,
33  ENTITY_MATCH_ALL,
34  ENTITY_MATCH_NONE,
35  HTTP_BASIC_AUTHENTICATION,
36  Platform,
37 )
38 from homeassistant.core import HomeAssistant, ServiceCall, callback
39 from homeassistant.exceptions import Unauthorized, UnknownUser
40 from homeassistant.helpers import discovery
42 from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
43 from homeassistant.helpers.event import async_track_time_interval
44 from homeassistant.helpers.service import async_extract_entity_ids
45 from homeassistant.helpers.typing import ConfigType
46 
47 from .binary_sensor import BINARY_SENSOR_KEYS, BINARY_SENSORS, check_binary_sensors
48 from .camera import CAMERA_SERVICES, STREAM_SOURCE_LIST
49 from .const import (
50  CAMERAS,
51  COMM_RETRIES,
52  COMM_TIMEOUT,
53  DATA_AMCREST,
54  DEVICES,
55  DOMAIN,
56  RESOLUTION_LIST,
57  SERVICE_EVENT,
58  SERVICE_UPDATE,
59 )
60 from .helpers import service_signal
61 from .sensor import SENSOR_KEYS
62 from .switch import SWITCH_KEYS
63 
64 _LOGGER = logging.getLogger(__name__)
65 
66 CONF_RESOLUTION = "resolution"
67 CONF_STREAM_SOURCE = "stream_source"
68 CONF_FFMPEG_ARGUMENTS = "ffmpeg_arguments"
69 CONF_CONTROL_LIGHT = "control_light"
70 
71 DEFAULT_NAME = "Amcrest Camera"
72 DEFAULT_PORT = 80
73 DEFAULT_RESOLUTION = "high"
74 DEFAULT_ARGUMENTS = "-pred 1"
75 MAX_ERRORS = 5
76 RECHECK_INTERVAL = timedelta(minutes=1)
77 
78 NOTIFICATION_ID = "amcrest_notification"
79 NOTIFICATION_TITLE = "Amcrest Camera Setup"
80 
81 SCAN_INTERVAL = timedelta(seconds=10)
82 
83 AUTHENTICATION_LIST = {"basic": "basic"}
84 
85 
86 def _has_unique_names(devices: list[dict[str, Any]]) -> list[dict[str, Any]]:
87  names = [device[CONF_NAME] for device in devices]
88  vol.Schema(vol.Unique())(names)
89  return devices
90 
91 
92 AMCREST_SCHEMA = vol.Schema(
93  {
94  vol.Required(CONF_HOST): cv.string,
95  vol.Required(CONF_USERNAME): cv.string,
96  vol.Required(CONF_PASSWORD): cv.string,
97  vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
98  vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
99  vol.Optional(CONF_AUTHENTICATION, default=HTTP_BASIC_AUTHENTICATION): vol.All(
100  vol.In(AUTHENTICATION_LIST)
101  ),
102  vol.Optional(CONF_RESOLUTION, default=DEFAULT_RESOLUTION): vol.All(
103  vol.In(RESOLUTION_LIST)
104  ),
105  vol.Optional(CONF_STREAM_SOURCE, default=STREAM_SOURCE_LIST[0]): vol.All(
106  vol.In(STREAM_SOURCE_LIST)
107  ),
108  vol.Optional(CONF_FFMPEG_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string,
109  vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period,
110  vol.Optional(CONF_BINARY_SENSORS): vol.All(
111  cv.ensure_list,
112  [vol.In(BINARY_SENSOR_KEYS)],
113  vol.Unique(),
114  check_binary_sensors,
115  ),
116  vol.Optional(CONF_SWITCHES): vol.All(
117  cv.ensure_list, [vol.In(SWITCH_KEYS)], vol.Unique()
118  ),
119  vol.Optional(CONF_SENSORS): vol.All(
120  cv.ensure_list, [vol.In(SENSOR_KEYS)], vol.Unique()
121  ),
122  vol.Optional(CONF_CONTROL_LIGHT, default=True): cv.boolean,
123  }
124 )
125 
126 CONFIG_SCHEMA = vol.Schema(
127  {DOMAIN: vol.All(cv.ensure_list, [AMCREST_SCHEMA], _has_unique_names)},
128  extra=vol.ALLOW_EXTRA,
129 )
130 
131 
132 class AmcrestChecker(ApiWrapper):
133  """amcrest.ApiWrapper wrapper for catching errors."""
134 
135  def __init__(
136  self,
137  hass: HomeAssistant,
138  name: str,
139  host: str,
140  port: int,
141  user: str,
142  password: str,
143  ) -> None:
144  """Initialize."""
145  self._hass_hass = hass
146  self._wrap_name_wrap_name = name
147  self._wrap_errors_wrap_errors = 0
148  self._wrap_lock_wrap_lock = threading.Lock()
149  self._async_wrap_lock_async_wrap_lock = asyncio.Lock()
150  self._wrap_login_err_wrap_login_err = False
151  self._wrap_event_flag_wrap_event_flag = threading.Event()
152  self._wrap_event_flag_wrap_event_flag.set()
153  self._async_wrap_event_flag_async_wrap_event_flag = asyncio.Event()
154  self._async_wrap_event_flag_async_wrap_event_flag.set()
155  self._unsub_recheck_unsub_recheck: Callable[[], None] | None = None
156  super().__init__(
157  host,
158  port,
159  user,
160  password,
161  retries_connection=COMM_RETRIES,
162  timeout_protocol=COMM_TIMEOUT,
163  )
164 
165  @property
166  def available(self) -> bool:
167  """Return if camera's API is responding."""
168  return self._wrap_errors_wrap_errors <= MAX_ERRORS and not self._wrap_login_err_wrap_login_err
169 
170  @property
171  def available_flag(self) -> threading.Event:
172  """Return event flag that indicates if camera's API is responding."""
173  return self._wrap_event_flag_wrap_event_flag
174 
175  @property
176  def async_available_flag(self) -> asyncio.Event:
177  """Return event flag that indicates if camera's API is responding."""
178  return self._async_wrap_event_flag_async_wrap_event_flag
179 
180  @callback
181  def _async_start_recovery(self) -> None:
182  self.available_flagavailable_flag.clear()
183  self.async_available_flagasync_available_flag.clear()
185  self._hass_hass, service_signal(SERVICE_UPDATE, self._wrap_name_wrap_name)
186  )
188  self._hass_hass, self._wrap_test_online_wrap_test_online, RECHECK_INTERVAL
189  )
190 
191  def command(self, *args: Any, **kwargs: Any) -> Any:
192  """amcrest.ApiWrapper.command wrapper to catch errors."""
193  try:
194  ret = super().command(*args, **kwargs)
195  except LoginError as ex:
196  self._handle_offline_handle_offline(ex)
197  raise
198  except AmcrestError:
199  self._handle_error_handle_error()
200  raise
201  self._set_online_set_online()
202  return ret
203 
204  async def async_command(self, *args: Any, **kwargs: Any) -> httpx.Response:
205  """amcrest.ApiWrapper.command wrapper to catch errors."""
206  async with self._async_command_wrapper_async_command_wrapper():
207  return await super().async_command(*args, **kwargs)
208 
209  @asynccontextmanager
211  self, *args: Any, **kwargs: Any
212  ) -> AsyncIterator[httpx.Response]:
213  """amcrest.ApiWrapper.command wrapper to catch errors."""
214  async with (
215  self._async_command_wrapper_async_command_wrapper(),
216  super().async_stream_command(*args, **kwargs) as ret,
217  ):
218  yield ret
219 
220  @asynccontextmanager
221  async def _async_command_wrapper(self) -> AsyncIterator[None]:
222  try:
223  yield
224  except LoginError as ex:
225  async with self._async_wrap_lock_async_wrap_lock:
226  self._async_handle_offline_async_handle_offline(ex)
227  raise
228  except AmcrestError:
229  async with self._async_wrap_lock_async_wrap_lock:
230  self._async_handle_error_async_handle_error()
231  raise
232  async with self._async_wrap_lock_async_wrap_lock:
233  self._async_set_online_async_set_online()
234 
235  def _handle_offline_thread_safe(self, ex: Exception) -> bool:
236  """Handle camera offline status shared between threads and event loop.
237 
238  Returns if the camera was online as a bool.
239  """
240  with self._wrap_lock_wrap_lock:
241  was_online = self.availableavailable
242  was_login_err = self._wrap_login_err_wrap_login_err
243  self._wrap_login_err_wrap_login_err = True
244  if not was_login_err:
245  _LOGGER.error("%s camera offline: Login error: %s", self._wrap_name_wrap_name, ex)
246  return was_online
247 
248  def _handle_offline(self, ex: Exception) -> None:
249  """Handle camera offline status from a thread."""
250  if self._handle_offline_thread_safe_handle_offline_thread_safe(ex):
251  self._hass_hass.loop.call_soon_threadsafe(self._async_start_recovery_async_start_recovery)
252 
253  @callback
254  def _async_handle_offline(self, ex: Exception) -> None:
255  if self._handle_offline_thread_safe_handle_offline_thread_safe(ex):
256  self._async_start_recovery_async_start_recovery()
257 
258  def _handle_error_thread_safe(self) -> bool:
259  """Handle camera error status shared between threads and event loop.
260 
261  Returns if the camera was online and is now offline as
262  a bool.
263  """
264  with self._wrap_lock_wrap_lock:
265  was_online = self.availableavailable
266  errs = self._wrap_errors_wrap_errors = self._wrap_errors_wrap_errors + 1
267  offline = not self.availableavailable
268  _LOGGER.debug("%s camera errs: %i", self._wrap_name_wrap_name, errs)
269  return was_online and offline
270 
271  def _handle_error(self) -> None:
272  """Handle camera error status from a thread."""
273  if self._handle_error_thread_safe_handle_error_thread_safe():
274  _LOGGER.error("%s camera offline: Too many errors", self._wrap_name_wrap_name)
275  self._hass_hass.loop.call_soon_threadsafe(self._async_start_recovery_async_start_recovery)
276 
277  @callback
278  def _async_handle_error(self) -> None:
279  """Handle camera error status from the event loop."""
280  if self._handle_error_thread_safe_handle_error_thread_safe():
281  _LOGGER.error("%s camera offline: Too many errors", self._wrap_name_wrap_name)
282  self._async_start_recovery_async_start_recovery()
283 
284  def _set_online_thread_safe(self) -> bool:
285  """Set camera online status shared between threads and event loop.
286 
287  Returns if the camera was offline as a bool.
288  """
289  with self._wrap_lock_wrap_lock:
290  was_offline = not self.availableavailable
291  self._wrap_errors_wrap_errors = 0
292  self._wrap_login_err_wrap_login_err = False
293  return was_offline
294 
295  def _set_online(self) -> None:
296  """Set camera online status from a thread."""
297  if self._set_online_thread_safe_set_online_thread_safe():
298  self._hass_hass.loop.call_soon_threadsafe(self._async_signal_online_async_signal_online)
299 
300  @callback
301  def _async_set_online(self) -> None:
302  """Set camera online status from the event loop."""
303  if self._set_online_thread_safe_set_online_thread_safe():
304  self._async_signal_online_async_signal_online()
305 
306  @callback
307  def _async_signal_online(self) -> None:
308  """Signal that camera is back online."""
309  assert self._unsub_recheck_unsub_recheck is not None
310  self._unsub_recheck_unsub_recheck()
311  self._unsub_recheck_unsub_recheck = None
312  _LOGGER.error("%s camera back online", self._wrap_name_wrap_name)
313  self.available_flagavailable_flag.set()
314  self.async_available_flagasync_available_flag.set()
316  self._hass_hass, service_signal(SERVICE_UPDATE, self._wrap_name_wrap_name)
317  )
318 
319  async def _wrap_test_online(self, now: datetime) -> None:
320  """Test if camera is back online."""
321  _LOGGER.debug("Testing if %s back online", self._wrap_name_wrap_name)
322  with suppress(AmcrestError):
323  await self.async_current_time
324 
325 
327  hass: HomeAssistant,
328  name: str,
329  api: AmcrestChecker,
330  event_codes: set[str],
331 ) -> None:
332  while True:
333  api.available_flag.wait()
334  try:
335  for code, payload in api.event_actions("All"):
336  event_data = {"camera": name, "event": code, "payload": payload}
337  hass.bus.fire("amcrest", event_data)
338  if code in event_codes:
339  signal = service_signal(SERVICE_EVENT, name, code)
340  start = any(
341  str(key).lower() == "action" and str(val).lower() == "start"
342  for key, val in payload.items()
343  )
344  _LOGGER.debug("Sending signal: '%s': %s", signal, start)
345  dispatcher_send(hass, signal, start)
346  except AmcrestError as error:
347  _LOGGER.warning(
348  "Error while processing events from %s camera: %r", name, error
349  )
350 
351 
353  hass: HomeAssistant,
354  name: str,
355  api: AmcrestChecker,
356  event_codes: set[str],
357 ) -> None:
358  thread = threading.Thread(
359  target=_monitor_events,
360  name=f"Amcrest {name}",
361  args=(hass, name, api, event_codes),
362  daemon=True,
363  )
364  thread.start()
365 
366 
367 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
368  """Set up the Amcrest IP Camera component."""
369  hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
370 
371  for device in config[DOMAIN]:
372  name: str = device[CONF_NAME]
373  username: str = device[CONF_USERNAME]
374  password: str = device[CONF_PASSWORD]
375 
376  api = AmcrestChecker(
377  hass, name, device[CONF_HOST], device[CONF_PORT], username, password
378  )
379 
380  ffmpeg_arguments = device[CONF_FFMPEG_ARGUMENTS]
381  resolution = RESOLUTION_LIST[device[CONF_RESOLUTION]]
382  binary_sensors = device.get(CONF_BINARY_SENSORS)
383  sensors = device.get(CONF_SENSORS)
384  switches = device.get(CONF_SWITCHES)
385  stream_source = device[CONF_STREAM_SOURCE]
386  control_light = device.get(CONF_CONTROL_LIGHT)
387 
388  # currently aiohttp only works with basic authentication
389  # only valid for mjpeg streaming
390  if device[CONF_AUTHENTICATION] == HTTP_BASIC_AUTHENTICATION:
391  authentication: aiohttp.BasicAuth | None = aiohttp.BasicAuth(
392  username, password
393  )
394  else:
395  authentication = None
396 
397  hass.data[DATA_AMCREST][DEVICES][name] = AmcrestDevice(
398  api,
399  authentication,
400  ffmpeg_arguments,
401  stream_source,
402  resolution,
403  control_light,
404  )
405 
406  hass.async_create_task(
407  discovery.async_load_platform(
408  hass, Platform.CAMERA, DOMAIN, {CONF_NAME: name}, config
409  )
410  )
411 
412  event_codes = set()
413  if binary_sensors:
414  hass.async_create_task(
415  discovery.async_load_platform(
416  hass,
417  Platform.BINARY_SENSOR,
418  DOMAIN,
419  {CONF_NAME: name, CONF_BINARY_SENSORS: binary_sensors},
420  config,
421  )
422  )
423  event_codes = {
424  event_code
425  for sensor in BINARY_SENSORS
426  if sensor.key in binary_sensors
427  and not sensor.should_poll
428  and sensor.event_codes is not None
429  for event_code in sensor.event_codes
430  }
431 
432  _start_event_monitor(hass, name, api, event_codes)
433 
434  if sensors:
435  hass.async_create_task(
436  discovery.async_load_platform(
437  hass,
438  Platform.SENSOR,
439  DOMAIN,
440  {CONF_NAME: name, CONF_SENSORS: sensors},
441  config,
442  )
443  )
444 
445  if switches:
446  hass.async_create_task(
447  discovery.async_load_platform(
448  hass,
449  Platform.SWITCH,
450  DOMAIN,
451  {CONF_NAME: name, CONF_SWITCHES: switches},
452  config,
453  )
454  )
455 
456  if not hass.data[DATA_AMCREST][DEVICES]:
457  return False
458 
459  def have_permission(user: User | None, entity_id: str) -> bool:
460  return not user or user.permissions.check_entity(entity_id, POLICY_CONTROL)
461 
462  async def async_extract_from_service(call: ServiceCall) -> list[str]:
463  if call.context.user_id:
464  user = await hass.auth.async_get_user(call.context.user_id)
465  if user is None:
466  raise UnknownUser(context=call.context)
467  else:
468  user = None
469 
470  if call.data.get(ATTR_ENTITY_ID) == ENTITY_MATCH_ALL:
471  # Return all entity_ids user has permission to control.
472  return [
473  entity_id
474  for entity_id in hass.data[DATA_AMCREST][CAMERAS]
475  if have_permission(user, entity_id)
476  ]
477 
478  if call.data.get(ATTR_ENTITY_ID) == ENTITY_MATCH_NONE:
479  return []
480 
481  call_ids = await async_extract_entity_ids(hass, call)
482  entity_ids = []
483  for entity_id in hass.data[DATA_AMCREST][CAMERAS]:
484  if entity_id not in call_ids:
485  continue
486  if not have_permission(user, entity_id):
487  raise Unauthorized(
488  context=call.context, entity_id=entity_id, permission=POLICY_CONTROL
489  )
490  entity_ids.append(entity_id)
491  return entity_ids
492 
493  async def async_service_handler(call: ServiceCall) -> None:
494  args = [call.data[arg] for arg in CAMERA_SERVICES[call.service][2]]
495  for entity_id in await async_extract_from_service(call):
496  async_dispatcher_send(hass, service_signal(call.service, entity_id), *args)
497 
498  for service, params in CAMERA_SERVICES.items():
499  hass.services.async_register(DOMAIN, service, async_service_handler, params[0])
500 
501  return True
502 
503 
504 @dataclass
506  """Representation of a base Amcrest discovery device."""
507 
508  api: AmcrestChecker
509  authentication: aiohttp.BasicAuth | None
510  ffmpeg_arguments: list[str]
511  stream_source: str
512  resolution: int
513  control_light: bool
514  channel: int = 0
httpx.Response async_command(self, *Any args, **Any kwargs)
Definition: __init__.py:204
None __init__(self, HomeAssistant hass, str name, str host, int port, str user, str password)
Definition: __init__.py:143
None _async_handle_offline(self, Exception ex)
Definition: __init__.py:254
AsyncIterator[None] _async_command_wrapper(self)
Definition: __init__.py:221
None _handle_offline(self, Exception ex)
Definition: __init__.py:248
AsyncIterator[httpx.Response] async_stream_command(self, *Any args, **Any kwargs)
Definition: __init__.py:212
bool _handle_offline_thread_safe(self, Exception ex)
Definition: __init__.py:235
Any command(self, *Any args, **Any kwargs)
Definition: __init__.py:191
None _wrap_test_online(self, datetime now)
Definition: __init__.py:319
str service_signal(str service, *str args)
Definition: helpers.py:12
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:367
None _start_event_monitor(HomeAssistant hass, str name, AmcrestChecker api, set[str] event_codes)
Definition: __init__.py:357
list[dict[str, Any]] _has_unique_names(list[dict[str, Any]] devices)
Definition: __init__.py:86
None _monitor_events(HomeAssistant hass, str name, AmcrestChecker api, set[str] event_codes)
Definition: __init__.py:331
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:137
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
Definition: dispatcher.py:193
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679
set[str] async_extract_entity_ids(HomeAssistant hass, ServiceCall service_call, bool expand_group=True)
Definition: service.py:490