1 """The repairs integration."""
3 from __future__
import annotations
7 import voluptuous
as vol
9 from homeassistant
import data_entry_flow
14 async_process_integration_platforms,
17 from .const
import DOMAIN
18 from .models
import RepairsFlow, RepairsProtocol
22 """Handler for an issue fixing flow without any side effects."""
25 self, user_input: dict[str, str] |
None =
None
27 """Handle the first step of a fix flow."""
28 return await self.async_step_confirm()
31 self, user_input: dict[str, str] |
None =
None
33 """Handle the confirm step of a fix flow."""
34 if user_input
is not None:
37 issue_registry = ir.async_get(self.hass)
38 description_placeholders =
None
39 if issue := issue_registry.async_get_issue(self.handler, self.issue_id):
40 description_placeholders = issue.translation_placeholders
44 data_schema=vol.Schema({}),
45 description_placeholders=description_placeholders,
50 """Manage repairs flows."""
57 data: dict[str, Any] |
None =
None,
59 """Create a flow. platform is a repairs module."""
60 assert data
and "issue_id" in data
61 issue_id = data[
"issue_id"]
63 issue_registry = ir.async_get(self.
hasshass)
64 issue = issue_registry.async_get_issue(handler_key, issue_id)
65 if issue
is None or not issue.is_fixable:
68 if "platforms" not in self.
hasshass.data[DOMAIN]:
71 platforms: dict[str, RepairsProtocol] = self.
hasshass.data[DOMAIN][
"platforms"]
72 if handler_key
not in platforms:
75 platform = platforms[handler_key]
76 flow = await platform.async_create_fix_flow(self.
hasshass, issue_id, issue.data)
78 flow.issue_id = issue_id
79 flow.data = issue.data
85 """Complete a fix flow.
87 This method is called when a flow step returns FlowResultType.ABORT or
88 FlowResultType.CREATE_ENTRY.
90 if result.get(
"type") != data_entry_flow.FlowResultType.ABORT:
91 ir.async_delete_issue(self.
hasshass, flow.handler, flow.init_data[
"issue_id"])
92 if "result" not in result:
93 result[
"result"] =
None
99 """Initialize repairs."""
104 """Start processing repairs platforms."""
105 hass.data[DOMAIN][
"platforms"] = {}
108 hass, DOMAIN, _register_repairs_platform, wait_for_platforms=
True
114 hass: HomeAssistant, integration_domain: str, platform: RepairsProtocol
116 """Register a repairs platform."""
117 if not hasattr(platform,
"async_create_fix_flow"):
119 hass.data[DOMAIN][
"platforms"][integration_domain] = platform
data_entry_flow.FlowResult async_step_init(self, dict[str, str]|None user_input=None)
RepairsFlow async_create_flow(self, str handler_key, *data_entry_flow.FlowContext|None context=None, dict[str, Any]|None data=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_finish_flow(self, FlowHandler[_FlowContextT, _FlowResultT, _HandlerT] flow, _FlowResultT result)
None _register_repairs_platform(HomeAssistant hass, str integration_domain, RepairsProtocol platform)
None async_process_repairs_platforms(HomeAssistant hass)
None async_setup(HomeAssistant hass)
config_entries.ConfigFlowResult async_step_confirm(self, dict[str, Any]|None user_input=None)