1 """Support for ESPHome cameras."""
3 from __future__
import annotations
6 from collections.abc
import Callable
7 from functools
import partial
10 from aioesphomeapi
import CameraInfo, CameraState
11 from aiohttp
import web
17 from .entity
import EsphomeEntity, platform_async_setup_entry
21 """A camera implementation for ESPHome."""
23 def __init__(self, *args: Any, **kwargs: Any) ->
None:
26 EsphomeEntity.__init__(self, *args, **kwargs)
27 self.
_loop_loop = asyncio.get_running_loop()
28 self._image_futures: list[asyncio.Future[bool |
None]] = []
32 """Set futures to done."""
33 for future
in self._image_futures:
35 future.set_result(result)
36 self._image_futures.clear()
40 """Handle device going available or unavailable."""
47 """Notify listeners of new image when update arrives."""
52 self, width: int |
None =
None, height: int |
None =
None
54 """Return single camera image bytes."""
58 self, request_method: Callable[[],
None]
60 """Wait for an image to be available and return it."""
63 image_future = self.
_loop_loop.create_future()
64 self._image_futures.append(image_future)
66 if not await image_future:
68 return self.
_state_state.data
71 self, request: web.Request
72 ) -> web.StreamResponse:
73 """Serve an HTTP MJPEG stream from the camera."""
74 stream_request = partial(
77 return await camera.async_get_still_stream(
78 request, stream_request, camera.DEFAULT_CONTENT_TYPE, 0.0
82 async_setup_entry = partial(
83 platform_async_setup_entry,
85 entity_type=EsphomeCamera,
86 state_type=CameraState,
None __init__(self, *Any args, **Any kwargs)
bytes|None async_camera_image(self, int|None width=None, int|None height=None)
None _set_futures(self, bool result)
None _on_state_update(self)
None _on_device_update(self)
bytes|None _async_request_image(self, Callable[[], None] request_method)
web.StreamResponse handle_async_mjpeg_stream(self, web.Request request)