1 """Repairs implementation for the cloud integration."""
3 from __future__
import annotations
8 import voluptuous
as vol
19 from .const
import DATA_CLOUD, DOMAIN
20 from .subscription
import async_migrate_paypal_agreement, async_subscription_info
29 subscription_info: dict[str, Any],
31 """Manage the legacy subscription issue.
33 If the provider is "legacy" create an issue,
34 in all other cases remove the issue.
36 if subscription_info.get(
"provider") ==
"legacy":
37 ir.async_create_issue(
40 issue_id=
"legacy_subscription",
42 severity=ir.IssueSeverity.WARNING,
43 translation_key=
"legacy_subscription",
46 ir.async_delete_issue(hass=hass, domain=DOMAIN, issue_id=
"legacy_subscription")
50 """Handler for an issue fixing flow."""
52 wait_task: asyncio.Task |
None =
None
53 _data: dict[str, Any] |
None =
None
56 """Handle the first step of a fix flow."""
57 return await self.async_step_confirm_change_plan()
59 async
def async_step_confirm_change_plan(
61 user_input: dict[str, str] |
None =
None,
63 """Handle the confirm step of a fix flow."""
64 if user_input
is not None:
65 return await self.async_step_change_plan()
67 return self.async_show_form(
68 step_id=
"confirm_change_plan", data_schema=vol.Schema({})
71 async
def async_step_change_plan(self, _: None =
None) -> FlowResult:
72 """Wait for the user to authorize the app installation."""
74 cloud = self.hass.data[DATA_CLOUD]
76 async
def _async_wait_for_plan_change() -> None:
79 assert flow_manager
is not None
82 while retries < MAX_RETRIES:
84 if self.
_data_data
is not None and self.
_data_data[
"provider"] !=
"legacy":
88 await asyncio.sleep(BACKOFF_TIME)
90 self.hass.async_create_task(
91 flow_manager.async_configure(flow_id=self.flow_id)
95 self.
wait_taskwait_task = self.hass.async_create_task(
96 _async_wait_for_plan_change(), eager_start=
False
99 return self.async_external_step(
100 step_id=
"change_plan",
101 url=migration[
"url"]
if migration
else "https://account.nabucasa.com/",
106 if self.
_data_data
is None or self.
_data_data[
"provider"] ==
"legacy":
108 return self.async_external_step_done(next_step_id=
"timeout")
110 return self.async_external_step_done(next_step_id=
"complete")
112 async
def async_step_complete(self, _: None =
None) -> FlowResult:
113 """Handle the final step of a fix flow."""
114 return self.async_create_entry(data={})
116 async
def async_step_timeout(self, _: None =
None) -> FlowResult:
117 """Handle the final step of a fix flow."""
118 return self.async_abort(reason=
"operation_took_too_long")
124 data: dict[str, str | int | float |
None] |
None,
127 if issue_id ==
"legacy_subscription":
129 return ConfirmRepairFlow()
FlowResult async_step_init(self, None _=None)
RepairsFlow async_create_fix_flow(HomeAssistant hass, str issue_id, dict[str, str|int|float|None]|None data)
None async_manage_legacy_subscription_issue(HomeAssistant hass, dict[str, Any] subscription_info)
dict[str, Any]|None async_migrate_paypal_agreement(Cloud[CloudClient] cloud)
dict[str, Any]|None async_subscription_info(Cloud[CloudClient] cloud)
RepairsFlowManager|None repairs_flow_manager(HomeAssistant hass)