1 """Repairs implementation for the cloud integration."""
3 from __future__
import annotations
5 from typing
import cast
7 import voluptuous
as vol
14 REQUIRED_KEYS = (
"entity_id",
"entity_uuid",
"integration_name")
18 """Handler for an issue fixing flow."""
20 def __init__(self, data: dict[str, str | int | float |
None] |
None) ->
None:
22 if not data
or any(key
not in data
for key
in REQUIRED_KEYS):
23 raise ValueError(
"Missing data")
27 """Handle the first step of a fix flow."""
32 user_input: dict[str, str] |
None =
None,
34 """Handle the confirm step of a fix flow."""
35 if user_input
is not None:
36 entity_registry = er.async_get(self.hass)
37 entity_entry = entity_registry.async_get(
38 cast(str, self.
_data_data[
"entity_uuid"])
41 entity_registry.async_update_entity(
42 entity_entry.entity_id, disabled_by=er.RegistryEntryDisabler.USER
44 return self.async_create_entry(data={})
46 description_placeholders: dict[str, str] = {
47 "assist_satellite_domain": ASSIST_SATELLITE_DOMAIN,
48 "entity_id": cast(str, self.
_data_data[
"entity_id"]),
49 "integration_name": cast(str, self.
_data_data[
"integration_name"]),
51 return self.async_show_form(
52 step_id=
"confirm_disable_entity",
53 data_schema=vol.Schema({}),
54 description_placeholders=description_placeholders,
FlowResult async_step_confirm_disable_entity(self, dict[str, str]|None user_input=None)
FlowResult async_step_init(self, None _=None)
None __init__(self, dict[str, str|int|float|None]|None data)