Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow to configure the iCloud integration."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import logging
7 import os
8 from typing import TYPE_CHECKING, Any
9 
10 from pyicloud import PyiCloudService
11 from pyicloud.exceptions import (
12  PyiCloudException,
13  PyiCloudFailedLoginException,
14  PyiCloudNoDevicesException,
15  PyiCloudServiceNotActivatedException,
16 )
17 import voluptuous as vol
18 
19 from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
20 from homeassistant.const import CONF_PASSWORD, CONF_USERNAME
21 from homeassistant.helpers.storage import Store
22 
23 from .const import (
24  CONF_GPS_ACCURACY_THRESHOLD,
25  CONF_MAX_INTERVAL,
26  CONF_WITH_FAMILY,
27  DEFAULT_GPS_ACCURACY_THRESHOLD,
28  DEFAULT_MAX_INTERVAL,
29  DEFAULT_WITH_FAMILY,
30  DOMAIN,
31  STORAGE_KEY,
32  STORAGE_VERSION,
33 )
34 
35 CONF_TRUSTED_DEVICE = "trusted_device"
36 CONF_VERIFICATION_CODE = "verification_code"
37 
38 _LOGGER = logging.getLogger(__name__)
39 
40 
41 class IcloudFlowHandler(ConfigFlow, domain=DOMAIN):
42  """Handle a iCloud config flow."""
43 
44  VERSION = 1
45 
46  def __init__(self) -> None:
47  """Initialize iCloud config flow."""
48  self.apiapi = None
49  self._username_username = None
50  self._password_password = None
51  self._with_family_with_family = None
52  self._max_interval_max_interval = None
53  self._gps_accuracy_threshold_gps_accuracy_threshold = None
54 
55  self._trusted_device_trusted_device = None
56  self._verification_code_verification_code = None
57 
58  self._existing_entry_data_existing_entry_data: dict[str, Any] | None = None
59  self._description_placeholders_description_placeholders: dict[str, str] | None = None
60 
61  def _show_setup_form(self, user_input=None, errors=None, step_id="user"):
62  """Show the setup form to the user."""
63 
64  if user_input is None:
65  user_input = {}
66 
67  if step_id == "user":
68  schema = {
69  vol.Required(
70  CONF_USERNAME, default=user_input.get(CONF_USERNAME, "")
71  ): str,
72  vol.Required(
73  CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")
74  ): str,
75  vol.Optional(
76  CONF_WITH_FAMILY,
77  default=user_input.get(CONF_WITH_FAMILY, DEFAULT_WITH_FAMILY),
78  ): bool,
79  }
80  else:
81  schema = {
82  vol.Required(
83  CONF_PASSWORD, default=user_input.get(CONF_PASSWORD, "")
84  ): str,
85  }
86 
87  return self.async_show_formasync_show_formasync_show_form(
88  step_id=step_id,
89  data_schema=vol.Schema(schema),
90  errors=errors or {},
91  description_placeholders=self._description_placeholders_description_placeholders,
92  )
93 
94  async def _validate_and_create_entry(self, user_input, step_id):
95  """Check if config is valid and create entry if so."""
96  self._password_password = user_input[CONF_PASSWORD]
97 
98  extra_inputs = user_input
99 
100  # If an existing entry was found, meaning this is a password update attempt,
101  # use those to get config values that aren't changing
102  if self._existing_entry_data_existing_entry_data:
103  extra_inputs = self._existing_entry_data_existing_entry_data
104 
105  self._username_username = extra_inputs[CONF_USERNAME]
106  self._with_family_with_family = extra_inputs.get(CONF_WITH_FAMILY, DEFAULT_WITH_FAMILY)
107  self._max_interval_max_interval = extra_inputs.get(CONF_MAX_INTERVAL, DEFAULT_MAX_INTERVAL)
108  self._gps_accuracy_threshold_gps_accuracy_threshold = extra_inputs.get(
109  CONF_GPS_ACCURACY_THRESHOLD, DEFAULT_GPS_ACCURACY_THRESHOLD
110  )
111 
112  # Check if already configured
113  if self.unique_idunique_id is None:
114  await self.async_set_unique_idasync_set_unique_id(self._username_username)
115  self._abort_if_unique_id_configured_abort_if_unique_id_configured()
116 
117  try:
118  self.apiapi = await self.hass.async_add_executor_job(
119  PyiCloudService,
120  self._username_username,
121  self._password_password,
122  Store(self.hass, STORAGE_VERSION, STORAGE_KEY).path,
123  True,
124  None,
125  self._with_family_with_family,
126  )
127  except PyiCloudFailedLoginException as error:
128  _LOGGER.error("Error logging into iCloud service: %s", error)
129  self.apiapi = None
130  errors = {CONF_PASSWORD: "invalid_auth"}
131  return self._show_setup_form_show_setup_form(user_input, errors, step_id)
132 
133  if self.apiapi.requires_2fa:
134  return await self.async_step_verification_codeasync_step_verification_code()
135 
136  if self.apiapi.requires_2sa:
137  return await self.async_step_trusted_deviceasync_step_trusted_device()
138 
139  try:
140  devices = await self.hass.async_add_executor_job(
141  getattr, self.apiapi, "devices"
142  )
143  if not devices:
144  raise PyiCloudNoDevicesException # noqa: TRY301
145  except (PyiCloudServiceNotActivatedException, PyiCloudNoDevicesException):
146  _LOGGER.error("No device found in the iCloud account: %s", self._username_username)
147  self.apiapi = None
148  return self.async_abortasync_abortasync_abort(reason="no_device")
149 
150  data = {
151  CONF_USERNAME: self._username_username,
152  CONF_PASSWORD: self._password_password,
153  CONF_WITH_FAMILY: self._with_family_with_family,
154  CONF_MAX_INTERVAL: self._max_interval_max_interval,
155  CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold_gps_accuracy_threshold,
156  }
157 
158  # If this is a password update attempt, update the entry instead of creating one
159  if step_id == "user":
160  return self.async_create_entryasync_create_entryasync_create_entry(title=self._username_username, data=data)
161 
162  entry = await self.async_set_unique_idasync_set_unique_id(self.unique_idunique_id)
163  self.hass.config_entries.async_update_entry(entry, data=data)
164  await self.hass.config_entries.async_reload(entry.entry_id)
165  return self.async_abortasync_abortasync_abort(reason="reauth_successful")
166 
167  async def async_step_user(
168  self, user_input: dict[str, Any] | None = None
169  ) -> ConfigFlowResult:
170  """Handle a flow initiated by the user."""
171  errors: dict[str, str] = {}
172 
173  icloud_dir = Store[Any](self.hass, STORAGE_VERSION, STORAGE_KEY)
174 
175  if not os.path.exists(icloud_dir.path):
176  await self.hass.async_add_executor_job(os.makedirs, icloud_dir.path)
177 
178  if user_input is None:
179  return self._show_setup_form_show_setup_form(user_input, errors)
180 
181  return await self._validate_and_create_entry_validate_and_create_entry(user_input, "user")
182 
183  async def async_step_reauth(
184  self, entry_data: Mapping[str, Any]
185  ) -> ConfigFlowResult:
186  """Initialise re-authentication."""
187  # Store existing entry data so it can be used later and set unique ID
188  # so existing config entry can be updated
189  await self.async_set_unique_idasync_set_unique_id(self.context["unique_id"])
190  self._existing_entry_data_existing_entry_data = {**entry_data}
191  self._description_placeholders_description_placeholders = {"username": entry_data[CONF_USERNAME]}
192  return await self.async_step_reauth_confirmasync_step_reauth_confirm()
193 
195  self, user_input: dict[str, Any] | None = None
196  ) -> ConfigFlowResult:
197  """Update password for a config entry that can't authenticate."""
198  if user_input is None:
199  return self._show_setup_form_show_setup_form(step_id="reauth_confirm")
200 
201  return await self._validate_and_create_entry_validate_and_create_entry(user_input, "reauth_confirm")
202 
204  self,
205  user_input: dict[str, Any] | None = None,
206  errors: dict[str, str] | None = None,
207  ) -> ConfigFlowResult:
208  """We need a trusted device."""
209  if errors is None:
210  errors = {}
211 
212  if TYPE_CHECKING:
213  assert self.apiapi is not None
214  trusted_devices = await self.hass.async_add_executor_job(
215  getattr, self.apiapi, "trusted_devices"
216  )
217  trusted_devices_for_form = {}
218  for i, device in enumerate(trusted_devices):
219  trusted_devices_for_form[i] = device.get(
220  "deviceName", f"SMS to {device.get('phoneNumber')}"
221  )
222 
223  if user_input is None:
224  return await self._show_trusted_device_form_show_trusted_device_form(
225  trusted_devices_for_form, errors
226  )
227 
228  self._trusted_device_trusted_device = trusted_devices[int(user_input[CONF_TRUSTED_DEVICE])]
229 
230  if not await self.hass.async_add_executor_job(
231  self.apiapi.send_verification_code, self._trusted_device_trusted_device
232  ):
233  _LOGGER.error("Failed to send verification code")
234  self._trusted_device_trusted_device = None
235  errors[CONF_TRUSTED_DEVICE] = "send_verification_code"
236 
237  return await self._show_trusted_device_form_show_trusted_device_form(
238  trusted_devices_for_form, errors
239  )
240 
241  return await self.async_step_verification_codeasync_step_verification_code()
242 
244  self, trusted_devices, errors: dict[str, str] | None = None
245  ) -> ConfigFlowResult:
246  """Show the trusted_device form to the user."""
247 
248  return self.async_show_formasync_show_formasync_show_form(
249  step_id="trusted_device",
250  data_schema=vol.Schema(
251  {
252  vol.Required(CONF_TRUSTED_DEVICE): vol.All(
253  vol.Coerce(int), vol.In(trusted_devices)
254  )
255  }
256  ),
257  errors=errors or {},
258  )
259 
261  self,
262  user_input: dict[str, Any] | None = None,
263  errors: dict[str, str] | None = None,
264  ) -> ConfigFlowResult:
265  """Ask the verification code to the user."""
266  if errors is None:
267  errors = {}
268 
269  if user_input is None:
270  return await self._show_verification_code_form_show_verification_code_form(errors)
271 
272  if TYPE_CHECKING:
273  assert self.apiapi is not None
274 
275  self._verification_code_verification_code = user_input[CONF_VERIFICATION_CODE]
276 
277  try:
278  if self.apiapi.requires_2fa:
279  if not await self.hass.async_add_executor_job(
280  self.apiapi.validate_2fa_code, self._verification_code_verification_code
281  ):
282  raise PyiCloudException("The code you entered is not valid.") # noqa: TRY301
283  elif not await self.hass.async_add_executor_job(
284  self.apiapi.validate_verification_code,
285  self._trusted_device_trusted_device,
286  self._verification_code_verification_code,
287  ):
288  raise PyiCloudException("The code you entered is not valid.") # noqa: TRY301
289  except PyiCloudException as error:
290  # Reset to the initial 2FA state to allow the user to retry
291  _LOGGER.error("Failed to verify verification code: %s", error)
292  self._trusted_device_trusted_device = None
293  self._verification_code_verification_code = None
294  errors["base"] = "validate_verification_code"
295 
296  if self.apiapi.requires_2fa:
297  try:
298  self.apiapi = await self.hass.async_add_executor_job(
299  PyiCloudService,
300  self._username_username,
301  self._password_password,
302  Store(self.hass, STORAGE_VERSION, STORAGE_KEY).path,
303  True,
304  None,
305  self._with_family_with_family,
306  )
307  return await self.async_step_verification_codeasync_step_verification_code(None, errors)
308  except PyiCloudFailedLoginException as error_login:
309  _LOGGER.error("Error logging into iCloud service: %s", error_login)
310  self.apiapi = None
311  errors = {CONF_PASSWORD: "invalid_auth"}
312  return self._show_setup_form_show_setup_form(user_input, errors, "user")
313  else:
314  return await self.async_step_trusted_deviceasync_step_trusted_device(None, errors)
315 
316  return await self.async_step_userasync_step_userasync_step_user(
317  {
318  CONF_USERNAME: self._username_username,
319  CONF_PASSWORD: self._password_password,
320  CONF_WITH_FAMILY: self._with_family_with_family,
321  CONF_MAX_INTERVAL: self._max_interval_max_interval,
322  CONF_GPS_ACCURACY_THRESHOLD: self._gps_accuracy_threshold_gps_accuracy_threshold,
323  }
324  )
325 
327  self, errors: dict[str, str] | None = None
328  ) -> ConfigFlowResult:
329  """Show the verification_code form to the user."""
330 
331  return self.async_show_formasync_show_formasync_show_form(
332  step_id="verification_code",
333  data_schema=vol.Schema({vol.Required(CONF_VERIFICATION_CODE): str}),
334  errors=errors,
335  )
ConfigFlowResult _show_trusted_device_form(self, trusted_devices, dict[str, str]|None errors=None)
Definition: config_flow.py:245
def _show_setup_form(self, user_input=None, errors=None, step_id="user")
Definition: config_flow.py:61
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
Definition: config_flow.py:185
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:169
ConfigFlowResult _show_verification_code_form(self, dict[str, str]|None errors=None)
Definition: config_flow.py:328
ConfigFlowResult async_step_trusted_device(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
Definition: config_flow.py:207
ConfigFlowResult async_step_verification_code(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None)
Definition: config_flow.py:264
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:196
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)