1 """Support for Cameras with FFmpeg as decoder."""
3 from __future__
import annotations
7 from aiohttp
import web
8 from haffmpeg.camera
import CameraMjpeg
9 from haffmpeg.tools
import IMAGE_JPEG
10 import voluptuous
as vol
13 PLATFORM_SCHEMA
as CAMERA_PLATFORM_SCHEMA,
32 DEFAULT_NAME =
"FFmpeg"
33 DEFAULT_ARGUMENTS =
"-pred 1"
35 PLATFORM_SCHEMA = CAMERA_PLATFORM_SCHEMA.extend(
37 vol.Required(CONF_INPUT): cv.string,
38 vol.Optional(CONF_EXTRA_ARGUMENTS, default=DEFAULT_ARGUMENTS): cv.string,
39 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
47 async_add_entities: AddEntitiesCallback,
48 discovery_info: DiscoveryInfoType |
None =
None,
50 """Set up a FFmpeg camera."""
55 """An implementation of an FFmpeg camera."""
57 _attr_supported_features = CameraEntityFeature.STREAM
59 def __init__(self, hass: HomeAssistant, config: dict[str, Any]) ->
None:
60 """Initialize a FFmpeg camera."""
63 self._manager: FFmpegManager = hass.data[DATA_FFMPEG]
64 self._name: str = config[CONF_NAME]
65 self._input: str = config[CONF_INPUT]
66 self._extra_arguments: str = config[CONF_EXTRA_ARGUMENTS]
69 """Return the stream source."""
70 return self._input.split(
" ")[-1]
73 self, width: int |
None =
None, height: int |
None =
None
75 """Return a still image response from the camera."""
79 output_format=IMAGE_JPEG,
80 extra_cmd=self._extra_arguments,
84 self, request: web.Request
85 ) -> web.StreamResponse:
86 """Generate an HTTP MJPEG stream from the camera."""
88 stream = CameraMjpeg(self._manager.binary)
89 await stream.open_camera(self._input, extra_cmd=self._extra_arguments)
92 stream_reader = await stream.get_reader()
97 self._manager.ffmpeg_stream_content_type,
104 """Return the name of this camera."""
None __init__(self, HomeAssistant hass, dict[str, Any] config)
web.StreamResponse handle_async_mjpeg_stream(self, web.Request request)
bytes|None async_camera_image(self, int|None width=None, int|None height=None)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
bytes|None async_get_image(HomeAssistant hass, str input_source, str output_format=IMAGE_JPEG, str|None extra_cmd=None, int|None width=None, int|None height=None)
web.StreamResponse async_aiohttp_proxy_stream(HomeAssistant hass, web.BaseRequest request, aiohttp.StreamReader stream, str|None content_type, int buffer_size=102400, int timeout=10)