1 """Camera that loads a picture from an MQTT topic."""
3 from __future__
import annotations
5 from base64
import b64decode
7 from typing
import TYPE_CHECKING
9 import voluptuous
as vol
20 from .
import subscription
21 from .config
import MQTT_BASE_SCHEMA
22 from .const
import CONF_TOPIC
23 from .entity
import MqttEntity, async_setup_entity_entry_helper
24 from .models
import ReceiveMessage
25 from .schemas
import MQTT_ENTITY_COMMON_SCHEMA
26 from .util
import valid_subscribe_topic
28 _LOGGER = logging.getLogger(__name__)
32 CONF_IMAGE_ENCODING =
"image_encoding"
34 DEFAULT_NAME =
"MQTT Camera"
36 MQTT_CAMERA_ATTRIBUTES_BLOCKED = frozenset(
45 PLATFORM_SCHEMA_BASE = MQTT_BASE_SCHEMA.extend(
47 vol.Optional(CONF_NAME): vol.Any(cv.string,
None),
48 vol.Required(CONF_TOPIC): valid_subscribe_topic,
49 vol.Optional(CONF_IMAGE_ENCODING):
"b64",
51 ).extend(MQTT_ENTITY_COMMON_SCHEMA.schema)
53 PLATFORM_SCHEMA_MODERN = vol.All(
54 PLATFORM_SCHEMA_BASE.schema,
57 DISCOVERY_SCHEMA = PLATFORM_SCHEMA_BASE.extend({}, extra=vol.REMOVE_EXTRA)
62 config_entry: ConfigEntry,
63 async_add_entities: AddEntitiesCallback,
65 """Set up MQTT camera through YAML and through MQTT discovery."""
73 PLATFORM_SCHEMA_MODERN,
78 """representation of a MQTT camera."""
80 _default_name = DEFAULT_NAME
81 _entity_id_format: str = camera.ENTITY_ID_FORMAT
82 _attributes_extra_blocked: frozenset[str] = MQTT_CAMERA_ATTRIBUTES_BLOCKED
83 _last_image: bytes |
None =
None
89 config_entry: ConfigEntry,
90 discovery_data: DiscoveryInfoType |
None,
92 """Initialize the MQTT Camera."""
94 MqttEntity.__init__(self, hass, config, config_entry, discovery_data)
98 """Return the config schema."""
99 return DISCOVERY_SCHEMA
103 """Handle new MQTT messages."""
104 if CONF_IMAGE_ENCODING
in self.
_config_config:
108 assert isinstance(msg.payload, bytes)
113 """(Re)Subscribe to topics."""
116 CONF_TOPIC, self.
_image_received_image_received,
None, disable_encoding=
True
120 """(Re)Subscribe to topics."""
121 subscription.async_subscribe_topics_internal(self.
hasshasshass, self.
_sub_state_sub_state)
124 self, width: int |
None =
None, height: int |
None =
None
126 """Return image response."""
bytes|None async_camera_image(self, int|None width=None, int|None height=None)
None _image_received(self, ReceiveMessage msg)
None _subscribe_topics(self)
vol.Schema config_schema()
None __init__(self, HomeAssistant hass, ConfigType config, ConfigEntry config_entry, DiscoveryInfoType|None discovery_data)
None _prepare_subscribe_topics(self)
bool add_subscription(self, str state_topic_config_key, Callable[[ReceiveMessage], None] msg_callback, set[str]|None tracked_attributes, bool disable_encoding=False)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
None async_setup_entity_entry_helper(HomeAssistant hass, ConfigEntry entry, type[MqttEntity]|None entity_class, str domain, AddEntitiesCallback async_add_entities, VolSchemaType discovery_schema, VolSchemaType platform_schema_modern, dict[str, type[MqttEntity]]|None schema_class_mapping=None)