1 """Support ezviz camera devices."""
3 from __future__
import annotations
7 from pyezviz.exceptions
import HTTPError, InvalidHost, PyEzvizError
15 SOURCE_INTEGRATION_DISCOVERY,
23 async_get_current_platform,
28 CONF_FFMPEG_ARGUMENTS,
30 DEFAULT_CAMERA_USERNAME,
31 DEFAULT_FFMPEG_ARGUMENTS,
35 from .coordinator
import EzvizDataUpdateCoordinator
36 from .entity
import EzvizEntity
38 _LOGGER = logging.getLogger(__name__)
42 hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
44 """Set up EZVIZ cameras based on a config entry."""
46 coordinator: EzvizDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id][
52 for camera, value
in coordinator.data.items():
55 for item
in hass.config_entries.async_entries(DOMAIN)
56 if item.unique_id == camera
and item.source != SOURCE_IGNORE
60 ffmpeg_arguments = camera_rtsp_entry[0].options[CONF_FFMPEG_ARGUMENTS]
61 camera_username = camera_rtsp_entry[0].data[CONF_USERNAME]
62 camera_password = camera_rtsp_entry[0].data[CONF_PASSWORD]
64 camera_rtsp_stream = f
"rtsp://{camera_username}:{camera_password}@{value['local_ip']}:{value['local_rtsp_port']}{ffmpeg_arguments}"
66 "Configuring Camera %s with ip: %s rtsp port: %s ffmpeg arguments: %s",
69 value[
"local_rtsp_port"],
74 discovery_flow.async_create_flow(
77 context={
"source": SOURCE_INTEGRATION_DISCOVERY},
80 CONF_IP_ADDRESS: value[
"local_ip"],
86 "Found camera with serial %s without configuration. Please go to"
87 " integration to complete setup"
92 ffmpeg_arguments = DEFAULT_FFMPEG_ARGUMENTS
93 camera_username = DEFAULT_CAMERA_USERNAME
94 camera_password =
None
95 camera_rtsp_stream =
""
97 camera_entities.append(
105 value[
"local_rtsp_port"],
114 platform.async_register_entity_service(
115 SERVICE_WAKE_DEVICE,
None,
"perform_wake_device"
120 """An implementation of a EZVIZ security camera."""
127 coordinator: EzvizDataUpdateCoordinator,
129 camera_username: str,
130 camera_password: str |
None,
131 camera_rtsp_stream: str |
None,
132 local_rtsp_port: int,
133 ffmpeg_arguments: str |
None,
135 """Initialize a EZVIZ security camera."""
136 super().
__init__(coordinator, serial)
137 Camera.__init__(self)
138 self.stream_options[CONF_USE_WALLCLOCK_AS_TIMESTAMPS] =
True
151 """Return True if entity is available."""
152 return self.
datadatadata[
"status"] != 2
156 """Return true if on."""
161 """Return true if the device is recording."""
162 return self.
datadatadata[
"alarm_notify"]
166 """Camera Motion Detection Status."""
167 return self.
datadatadata[
"alarm_notify"]
170 """Enable motion detection in camera."""
172 self.coordinator.ezviz_client.set_camera_defence(self.
_serial_serial, 1)
174 except InvalidHost
as err:
175 raise InvalidHost(
"Error enabling motion detection")
from err
178 """Disable motion detection."""
180 self.coordinator.ezviz_client.set_camera_defence(self.
_serial_serial, 0)
182 except InvalidHost
as err:
183 raise InvalidHost(
"Error disabling motion detection")
from err
186 self, width: int |
None =
None, height: int |
None =
None
188 """Return a frame from the camera stream."""
191 return await ffmpeg.async_get_image(
196 """Return the stream source."""
199 local_ip = self.
datadatadata[
"local_ip"]
201 f
"rtsp://{self._username}:{self._password}@"
202 f
"{local_ip}:{self._local_rtsp_port}{self._ffmpeg_arguments}"
205 "Configuring Camera %s with ip: %s rtsp port: %s ffmpeg arguments: %s",
215 """Basically wakes the camera by querying the device."""
217 self.coordinator.ezviz_client.get_detection_sensibility(self.
_serial_serial)
218 except (HTTPError, PyEzvizError)
as err:
219 raise PyEzvizError(
"Cannot wake device")
from err
str|None stream_source(self)
bytes|None async_camera_image(self, int|None width=None, int|None height=None)
None disable_motion_detection(self)
bool motion_detection_enabled(self)
None perform_wake_device(self)
None __init__(self, HomeAssistant hass, EzvizDataUpdateCoordinator coordinator, str serial, str camera_username, str|None camera_password, str|None camera_rtsp_stream, int local_rtsp_port, str|None ffmpeg_arguments)
None enable_motion_detection(self)
dict[str, Any] data(self)
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
FFmpegManager get_ffmpeg_manager(HomeAssistant hass)