1 """Support for Roborock image."""
4 from datetime
import datetime
6 from itertools
import chain
8 from roborock
import RoborockCommand
9 from vacuum_map_parser_base.config.color
import ColorsPalette
10 from vacuum_map_parser_base.config.drawable
import Drawable
11 from vacuum_map_parser_base.config.image_config
import ImageConfig
12 from vacuum_map_parser_base.config.size
import Sizes
13 from vacuum_map_parser_roborock.map_data_parser
import RoborockMapDataParser
23 from .
import RoborockConfigEntry
24 from .const
import DEFAULT_DRAWABLES, DOMAIN, DRAWABLES, IMAGE_CACHE_INTERVAL, MAP_SLEEP
25 from .coordinator
import RoborockDataUpdateCoordinator
26 from .entity
import RoborockCoordinatedEntityV1
31 config_entry: RoborockConfigEntry,
32 async_add_entities: AddEntitiesCallback,
34 """Set up Roborock image platform."""
38 for drawable, default_value
in DEFAULT_DRAWABLES.items()
39 if config_entry.options.get(DRAWABLES, {}).
get(drawable, default_value)
46 for coord
in config_entry.runtime_data.v1
55 """A class to let you visualize the map."""
57 _attr_has_entity_name =
True
58 image_last_updated: datetime
63 coordinator: RoborockDataUpdateCoordinator,
67 drawables: list[Drawable],
69 """Initialize a Roborock map."""
70 RoborockCoordinatedEntityV1.__init__(self, unique_id, coordinator)
71 ImageEntity.__init__(self, coordinator.hass)
73 self.
parserparser = RoborockMapDataParser(
74 ColorsPalette(), Sizes(), drawables, ImageConfig(), []
80 except HomeAssistantError:
89 """Determines if the entity is available."""
94 """Return if this map is the currently selected map."""
95 return self.
map_flagmap_flag == self.coordinator.current_map
98 """Update the map if it is valid.
100 Update this map if it is the currently active map, and the
101 vacuum is cleaning, or if it has never been set at all.
106 and self.coordinator.roborock_device_info.props.status
is not None
107 and bool(self.coordinator.roborock_device_info.props.status.in_cleaning)
116 ).total_seconds() > IMAGE_CACHE_INTERVAL
and self.
is_map_validis_map_valid():
121 """Update the image if it is not cached."""
123 response = await asyncio.gather(
125 return_exceptions=
True,
127 if not isinstance(response[0], bytes):
129 translation_domain=DOMAIN,
130 translation_key=
"map_failure",
132 map_data = response[0]
137 """Create an image using the map parser."""
138 parsed_map = self.
parserparser.parse(map_bytes)
139 if parsed_map.image
is None:
141 translation_domain=DOMAIN,
142 translation_key=
"map_failure",
144 img_byte_arr = io.BytesIO()
145 parsed_map.image.data.save(img_byte_arr, format=
"PNG")
146 return img_byte_arr.getvalue()
150 coord: RoborockDataUpdateCoordinator, drawables: list[Drawable]
151 ) -> list[RoborockMap]:
152 """Get the starting map information for all maps for this device.
154 The following steps must be done synchronously.
155 Only one map can be loaded at a time per device.
158 cur_map = coord.current_map
160 assert cur_map
is not None
164 coord.maps.items(), key=
lambda data: data[0] == cur_map, reverse=
True
166 for map_flag, map_info
in maps_info:
168 if map_flag != cur_map:
170 await coord.api.send_command(RoborockCommand.LOAD_MULTI_MAP, [map_flag])
171 coord.current_map = map_flag
174 await asyncio.sleep(MAP_SLEEP)
176 map_update = await asyncio.gather(
177 *[coord.cloud_api.get_map_v1(), coord.get_rooms()], return_exceptions=
True
181 api_data: bytes = map_update[0]
if isinstance(map_update[0], bytes)
else b
""
184 f
"{slugify(coord.duid)}_map_{map_info.name}",
192 if len(coord.maps) != 1:
196 await coord.cloud_api.send_command(RoborockCommand.LOAD_MULTI_MAP, [cur_map])
197 coord.current_map = cur_map
datetime|None image_last_updated(self)
RoborockMqttClientV1 cloud_api(self)
None _handle_coordinator_update(self)
bytes|None async_image(self)
None __init__(self, str unique_id, RoborockDataUpdateCoordinator coordinator, int map_flag, bytes starting_map, str map_name, list[Drawable] drawables)
bytes _create_image(self, bytes map_bytes)
web.Response get(self, web.Request request, str config_key)
list[RoborockMap] create_coordinator_maps(RoborockDataUpdateCoordinator coord, list[Drawable] drawables)
None async_setup_entry(HomeAssistant hass, RoborockConfigEntry config_entry, AddEntitiesCallback async_add_entities)