1 """Support for Amcrest IP cameras."""
3 from __future__
import annotations
6 from collections.abc
import AsyncIterator, Callable
7 from contextlib
import asynccontextmanager, suppress
8 from dataclasses
import dataclass
9 from datetime
import datetime, timedelta
12 from typing
import Any
15 from amcrest
import AmcrestError, ApiWrapper, LoginError
17 import voluptuous
as vol
35 HTTP_BASIC_AUTHENTICATION,
47 from .binary_sensor
import BINARY_SENSOR_KEYS, BINARY_SENSORS, check_binary_sensors
48 from .camera
import CAMERA_SERVICES, STREAM_SOURCE_LIST
60 from .helpers
import service_signal
61 from .sensor
import SENSOR_KEYS
62 from .switch
import SWITCH_KEYS
64 _LOGGER = logging.getLogger(__name__)
66 CONF_RESOLUTION =
"resolution"
67 CONF_STREAM_SOURCE =
"stream_source"
68 CONF_FFMPEG_ARGUMENTS =
"ffmpeg_arguments"
69 CONF_CONTROL_LIGHT =
"control_light"
71 DEFAULT_NAME =
"Amcrest Camera"
73 DEFAULT_RESOLUTION =
"high"
74 DEFAULT_ARGUMENTS =
"-pred 1"
78 NOTIFICATION_ID =
"amcrest_notification"
79 NOTIFICATION_TITLE =
"Amcrest Camera Setup"
83 AUTHENTICATION_LIST = {
"basic":
"basic"}
87 names = [device[CONF_NAME]
for device
in devices]
88 vol.Schema(vol.Unique())(names)
92 AMCREST_SCHEMA = vol.Schema(
94 vol.Required(CONF_HOST): cv.string,
95 vol.Required(CONF_USERNAME): cv.string,
96 vol.Required(CONF_PASSWORD): cv.string,
97 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
98 vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
99 vol.Optional(CONF_AUTHENTICATION, default=HTTP_BASIC_AUTHENTICATION): vol.All(
100 vol.In(AUTHENTICATION_LIST)
102 vol.Optional(CONF_RESOLUTION, default=DEFAULT_RESOLUTION): vol.All(
103 vol.In(RESOLUTION_LIST)
105 vol.Optional(CONF_STREAM_SOURCE, default=STREAM_SOURCE_LIST[0]): vol.All(
106 vol.In(STREAM_SOURCE_LIST)
108 vol.Optional(CONF_FFMPEG_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string,
109 vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period,
110 vol.Optional(CONF_BINARY_SENSORS): vol.All(
112 [vol.In(BINARY_SENSOR_KEYS)],
114 check_binary_sensors,
116 vol.Optional(CONF_SWITCHES): vol.All(
117 cv.ensure_list, [vol.In(SWITCH_KEYS)], vol.Unique()
119 vol.Optional(CONF_SENSORS): vol.All(
120 cv.ensure_list, [vol.In(SENSOR_KEYS)], vol.Unique()
122 vol.Optional(CONF_CONTROL_LIGHT, default=
True): cv.boolean,
126 CONFIG_SCHEMA = vol.Schema(
127 {DOMAIN: vol.All(cv.ensure_list, [AMCREST_SCHEMA], _has_unique_names)},
128 extra=vol.ALLOW_EXTRA,
133 """amcrest.ApiWrapper wrapper for catching errors."""
155 self.
_unsub_recheck_unsub_recheck: Callable[[],
None] |
None =
None
161 retries_connection=COMM_RETRIES,
162 timeout_protocol=COMM_TIMEOUT,
167 """Return if camera's API is responding."""
172 """Return event flag that indicates if camera's API is responding."""
177 """Return event flag that indicates if camera's API is responding."""
191 def command(self, *args: Any, **kwargs: Any) -> Any:
192 """amcrest.ApiWrapper.command wrapper to catch errors."""
194 ret = super().
command(*args, **kwargs)
195 except LoginError
as ex:
205 """amcrest.ApiWrapper.command wrapper to catch errors."""
211 self, *args: Any, **kwargs: Any
212 ) -> AsyncIterator[httpx.Response]:
213 """amcrest.ApiWrapper.command wrapper to catch errors."""
224 except LoginError
as ex:
236 """Handle camera offline status shared between threads and event loop.
238 Returns if the camera was online as a bool.
244 if not was_login_err:
245 _LOGGER.error(
"%s camera offline: Login error: %s", self.
_wrap_name_wrap_name, ex)
249 """Handle camera offline status from a thread."""
259 """Handle camera error status shared between threads and event loop.
261 Returns if the camera was online and is now offline as
268 _LOGGER.debug(
"%s camera errs: %i", self.
_wrap_name_wrap_name, errs)
269 return was_online
and offline
272 """Handle camera error status from a thread."""
274 _LOGGER.error(
"%s camera offline: Too many errors", self.
_wrap_name_wrap_name)
279 """Handle camera error status from the event loop."""
281 _LOGGER.error(
"%s camera offline: Too many errors", self.
_wrap_name_wrap_name)
285 """Set camera online status shared between threads and event loop.
287 Returns if the camera was offline as a bool.
290 was_offline =
not self.
availableavailable
296 """Set camera online status from a thread."""
302 """Set camera online status from the event loop."""
308 """Signal that camera is back online."""
312 _LOGGER.error(
"%s camera back online", self.
_wrap_name_wrap_name)
320 """Test if camera is back online."""
321 _LOGGER.debug(
"Testing if %s back online", self.
_wrap_name_wrap_name)
322 with suppress(AmcrestError):
323 await self.async_current_time
330 event_codes: set[str],
333 api.available_flag.wait()
335 for code, payload
in api.event_actions(
"All"):
336 event_data = {
"camera": name,
"event": code,
"payload": payload}
337 hass.bus.fire(
"amcrest", event_data)
338 if code
in event_codes:
341 str(key).lower() ==
"action" and str(val).lower() ==
"start"
342 for key, val
in payload.items()
344 _LOGGER.debug(
"Sending signal: '%s': %s", signal, start)
346 except AmcrestError
as error:
348 "Error while processing events from %s camera: %r", name, error
356 event_codes: set[str],
358 thread = threading.Thread(
359 target=_monitor_events,
360 name=f
"Amcrest {name}",
361 args=(hass, name, api, event_codes),
367 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
368 """Set up the Amcrest IP Camera component."""
369 hass.data.setdefault(DATA_AMCREST, {DEVICES: {}, CAMERAS: []})
371 for device
in config[DOMAIN]:
372 name: str = device[CONF_NAME]
373 username: str = device[CONF_USERNAME]
374 password: str = device[CONF_PASSWORD]
377 hass, name, device[CONF_HOST], device[CONF_PORT], username, password
380 ffmpeg_arguments = device[CONF_FFMPEG_ARGUMENTS]
381 resolution = RESOLUTION_LIST[device[CONF_RESOLUTION]]
382 binary_sensors = device.get(CONF_BINARY_SENSORS)
383 sensors = device.get(CONF_SENSORS)
384 switches = device.get(CONF_SWITCHES)
385 stream_source = device[CONF_STREAM_SOURCE]
386 control_light = device.get(CONF_CONTROL_LIGHT)
390 if device[CONF_AUTHENTICATION] == HTTP_BASIC_AUTHENTICATION:
391 authentication: aiohttp.BasicAuth |
None = aiohttp.BasicAuth(
395 authentication =
None
406 hass.async_create_task(
407 discovery.async_load_platform(
408 hass, Platform.CAMERA, DOMAIN, {CONF_NAME: name}, config
414 hass.async_create_task(
415 discovery.async_load_platform(
417 Platform.BINARY_SENSOR,
419 {CONF_NAME: name, CONF_BINARY_SENSORS: binary_sensors},
425 for sensor
in BINARY_SENSORS
426 if sensor.key
in binary_sensors
427 and not sensor.should_poll
428 and sensor.event_codes
is not None
429 for event_code
in sensor.event_codes
435 hass.async_create_task(
436 discovery.async_load_platform(
440 {CONF_NAME: name, CONF_SENSORS: sensors},
446 hass.async_create_task(
447 discovery.async_load_platform(
451 {CONF_NAME: name, CONF_SWITCHES: switches},
456 if not hass.data[DATA_AMCREST][DEVICES]:
459 def have_permission(user: User |
None, entity_id: str) -> bool:
460 return not user
or user.permissions.check_entity(entity_id, POLICY_CONTROL)
462 async
def async_extract_from_service(call: ServiceCall) -> list[str]:
463 if call.context.user_id:
464 user = await hass.auth.async_get_user(call.context.user_id)
470 if call.data.get(ATTR_ENTITY_ID) == ENTITY_MATCH_ALL:
474 for entity_id
in hass.data[DATA_AMCREST][CAMERAS]
475 if have_permission(user, entity_id)
478 if call.data.get(ATTR_ENTITY_ID) == ENTITY_MATCH_NONE:
483 for entity_id
in hass.data[DATA_AMCREST][CAMERAS]:
484 if entity_id
not in call_ids:
486 if not have_permission(user, entity_id):
488 context=call.context, entity_id=entity_id, permission=POLICY_CONTROL
490 entity_ids.append(entity_id)
493 async
def async_service_handler(call: ServiceCall) ->
None:
494 args = [call.data[arg]
for arg
in CAMERA_SERVICES[call.service][2]]
495 for entity_id
in await async_extract_from_service(call):
498 for service, params
in CAMERA_SERVICES.items():
499 hass.services.async_register(DOMAIN, service, async_service_handler, params[0])
506 """Representation of a base Amcrest discovery device."""
509 authentication: aiohttp.BasicAuth |
None
510 ffmpeg_arguments: list[str]
httpx.Response async_command(self, *Any args, **Any kwargs)
None __init__(self, HomeAssistant hass, str name, str host, int port, str user, str password)
bool _set_online_thread_safe(self)
None _async_handle_offline(self, Exception ex)
AsyncIterator[None] _async_command_wrapper(self)
asyncio.Event async_available_flag(self)
None _handle_offline(self, Exception ex)
None _async_set_online(self)
AsyncIterator[httpx.Response] async_stream_command(self, *Any args, **Any kwargs)
bool _handle_offline_thread_safe(self, Exception ex)
bool _handle_error_thread_safe(self)
Any command(self, *Any args, **Any kwargs)
None _async_start_recovery(self)
threading.Event available_flag(self)
None _async_handle_error(self)
None _async_signal_online(self)
None _wrap_test_online(self, datetime now)
str service_signal(str service, *str args)
bool async_setup(HomeAssistant hass, ConfigType config)
None _start_event_monitor(HomeAssistant hass, str name, AmcrestChecker api, set[str] event_codes)
list[dict[str, Any]] _has_unique_names(list[dict[str, Any]] devices)
None _monitor_events(HomeAssistant hass, str name, AmcrestChecker api, set[str] event_codes)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
set[str] async_extract_entity_ids(HomeAssistant hass, ServiceCall service_call, bool expand_group=True)