Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow to configure Nest.
2 
3 This configuration flow supports the following:
4  - SDM API with Web OAuth flow with redirect back to Home Assistant
5  - Legacy Nest API auth flow with where user enters an auth code manually
6 
7 NestFlowHandler is an implementation of AbstractOAuth2FlowHandler with
8 some overrides to custom steps inserted in the middle of the flow.
9 """
10 
11 from __future__ import annotations
12 
13 from collections.abc import Iterable, Mapping
14 import logging
15 from typing import TYPE_CHECKING, Any
16 
17 from google_nest_sdm.admin_client import (
18  AdminClient,
19  EligibleSubscriptions,
20  EligibleTopics,
21 )
22 from google_nest_sdm.exceptions import ApiException
23 from google_nest_sdm.structure import Structure
24 import voluptuous as vol
25 
26 from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlowResult
27 from homeassistant.helpers import config_entry_oauth2_flow
28 from homeassistant.util import get_random_string
29 
30 from . import api
31 from .const import (
32  CONF_CLOUD_PROJECT_ID,
33  CONF_PROJECT_ID,
34  CONF_SUBSCRIBER_ID_IMPORTED,
35  CONF_SUBSCRIPTION_NAME,
36  CONF_TOPIC_NAME,
37  DATA_SDM,
38  DOMAIN,
39  OAUTH2_AUTHORIZE,
40  SDM_SCOPES,
41 )
42 
43 DATA_FLOW_IMPL = "nest_flow_implementation"
44 SUBSCRIPTION_FORMAT = "projects/{cloud_project_id}/subscriptions/home-assistant-{rnd}"
45 SUBSCRIPTION_RAND_LENGTH = 10
46 
47 MORE_INFO_URL = "https://www.home-assistant.io/integrations/nest/#configuration"
48 
49 # URLs for Configure Cloud Project step
50 CLOUD_CONSOLE_URL = "https://console.cloud.google.com/home/dashboard"
51 SDM_API_URL = (
52  "https://console.cloud.google.com/apis/library/smartdevicemanagement.googleapis.com"
53 )
54 PUBSUB_API_URL = "https://console.cloud.google.com/apis/library/pubsub.googleapis.com"
55 
56 # URLs for Configure Device Access Project step
57 DEVICE_ACCESS_CONSOLE_URL = "https://console.nest.google.com/device-access/"
58 
59 DEVICE_ACCESS_CONSOLE_EDIT_URL = (
60  "https://console.nest.google.com/device-access/project/{project_id}/information"
61 )
62 CREATE_NEW_SUBSCRIPTION_KEY = "create_new_subscription"
63 
64 _LOGGER = logging.getLogger(__name__)
65 
66 
67 def _generate_subscription_id(cloud_project_id: str) -> str:
68  """Create a new subscription id."""
69  rnd = get_random_string(SUBSCRIPTION_RAND_LENGTH)
70  return SUBSCRIPTION_FORMAT.format(cloud_project_id=cloud_project_id, rnd=rnd)
71 
72 
73 def generate_config_title(structures: Iterable[Structure]) -> str | None:
74  """Pick a user friendly config title based on the Google Home name(s)."""
75  names: list[str] = [
76  structure.info.custom_name
77  for structure in structures
78  if structure.info and structure.info.custom_name
79  ]
80  if not names:
81  return None
82  return ", ".join(names)
83 
84 
86  config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
87 ):
88  """Config flow to handle authentication for both APIs."""
89 
90  DOMAIN = DOMAIN
91  VERSION = 1
92 
93  def __init__(self) -> None:
94  """Initialize NestFlowHandler."""
95  super().__init__()
96  self._data: dict[str, Any] = {DATA_SDM: {}}
97  # Possible name to use for config entry based on the Google Home name
98  self._structure_config_title_structure_config_title: str | None = None
99  self._admin_client_admin_client: AdminClient | None = None
100  self._eligible_topics_eligible_topics: EligibleTopics | None = None
101  self._eligible_subscriptions: EligibleSubscriptions | None = None
102 
103  @property
104  def logger(self) -> logging.Logger:
105  """Return logger."""
106  return logging.getLogger(__name__)
107 
108  @property
109  def extra_authorize_data(self) -> dict[str, str]:
110  """Extra data that needs to be appended to the authorize url."""
111  return {
112  "scope": " ".join(SDM_SCOPES),
113  # Add params to ensure we get back a refresh token
114  "access_type": "offline",
115  "prompt": "consent",
116  }
117 
118  async def async_generate_authorize_url(self) -> str:
119  """Generate a url for the user to authorize based on user input."""
120  project_id = self._data.get(CONF_PROJECT_ID)
121  query = await super().async_generate_authorize_url()
122  authorize_url = OAUTH2_AUTHORIZE.format(project_id=project_id)
123  return f"{authorize_url}{query}"
124 
125  async def async_oauth_create_entry(self, data: dict[str, Any]) -> ConfigFlowResult:
126  """Complete OAuth setup and finish pubsub or finish."""
127  _LOGGER.debug("Finishing post-oauth configuration")
128  self._data.update(data)
129  _LOGGER.debug("self.source=%s", self.sourcesource)
130  if self.sourcesource == SOURCE_REAUTH:
131  _LOGGER.debug("Skipping Pub/Sub configuration")
132  return await self._async_finish_async_finish()
133  return await self.async_step_pubsubasync_step_pubsub()
134 
135  async def async_step_reauth(
136  self, entry_data: Mapping[str, Any]
137  ) -> ConfigFlowResult:
138  """Perform reauth upon an API authentication error."""
139  _LOGGER.debug("async_step_reauth %s", self.sourcesource)
140  self._data.update(entry_data)
141 
142  return await self.async_step_reauth_confirmasync_step_reauth_confirm()
143 
145  self, user_input: dict[str, Any] | None = None
146  ) -> ConfigFlowResult:
147  """Confirm reauth dialog."""
148  if user_input is None:
149  return self.async_show_form(step_id="reauth_confirm")
150  return await self.async_step_userasync_step_user()
151 
152  async def async_step_user(
153  self, user_input: dict[str, Any] | None = None
154  ) -> ConfigFlowResult:
155  """Handle a flow initialized by the user."""
156  self._data[DATA_SDM] = {}
157  if self.sourcesource == SOURCE_REAUTH:
158  return await super().async_step_user(user_input)
159  # Application Credentials setup needs information from the user
160  # before creating the OAuth URL
161  return await self.async_step_create_cloud_projectasync_step_create_cloud_project()
162 
164  self, user_input: dict[str, Any] | None = None
165  ) -> ConfigFlowResult:
166  """Handle initial step in app credentials flow."""
167  implementations = await config_entry_oauth2_flow.async_get_implementations(
168  self.hass, self.DOMAINDOMAIN
169  )
170  if implementations:
171  return await self.async_step_cloud_projectasync_step_cloud_project()
172  # This informational step explains to the user how to setup the
173  # cloud console and other pre-requisites needed before setting up
174  # an application credential. This extra step also allows discovery
175  # to start the config flow rather than aborting. The abort step will
176  # redirect the user to the right panel in the UI then return with a
177  # valid auth implementation.
178  if user_input is not None:
179  return self.async_abort(reason="missing_credentials")
180  return self.async_show_form(
181  step_id="create_cloud_project",
182  description_placeholders={
183  "cloud_console_url": CLOUD_CONSOLE_URL,
184  "sdm_api_url": SDM_API_URL,
185  "pubsub_api_url": PUBSUB_API_URL,
186  "more_info_url": MORE_INFO_URL,
187  },
188  )
189 
191  self, user_input: dict | None = None
192  ) -> ConfigFlowResult:
193  """Handle cloud project in user input."""
194  if user_input is not None:
195  self._data.update(user_input)
196  return await self.async_step_device_projectasync_step_device_project()
197  return self.async_show_form(
198  step_id="cloud_project",
199  data_schema=vol.Schema(
200  {
201  vol.Required(CONF_CLOUD_PROJECT_ID): str,
202  }
203  ),
204  description_placeholders={
205  "cloud_console_url": CLOUD_CONSOLE_URL,
206  "more_info_url": MORE_INFO_URL,
207  },
208  )
209 
211  self, user_input: dict | None = None
212  ) -> ConfigFlowResult:
213  """Collect device access project from user input."""
214  errors = {}
215  if user_input is not None:
216  project_id = user_input[CONF_PROJECT_ID]
217  if project_id == self._data[CONF_CLOUD_PROJECT_ID]:
218  _LOGGER.error(
219  "Device Access Project ID and Cloud Project ID must not be the"
220  " same, see documentation"
221  )
222  errors[CONF_PROJECT_ID] = "wrong_project_id"
223  else:
224  self._data.update(user_input)
225  await self.async_set_unique_id(project_id)
226  self._abort_if_unique_id_configured()
227  return await super().async_step_user()
228 
229  return self.async_show_form(
230  step_id="device_project",
231  data_schema=vol.Schema(
232  {
233  vol.Required(CONF_PROJECT_ID): str,
234  }
235  ),
236  description_placeholders={
237  "device_access_console_url": DEVICE_ACCESS_CONSOLE_URL,
238  "more_info_url": MORE_INFO_URL,
239  },
240  errors=errors,
241  )
242 
243  async def async_step_pubsub(
244  self, user_input: dict[str, Any] | None = None
245  ) -> ConfigFlowResult:
246  """Configure and the pre-requisites to configure Pub/Sub topics and subscriptions."""
247  data = {
248  **self._data,
249  **(user_input if user_input is not None else {}),
250  }
251  cloud_project_id = data.get(CONF_CLOUD_PROJECT_ID, "").strip()
252  device_access_project_id = data[CONF_PROJECT_ID]
253 
254  errors: dict[str, str] = {}
255  if cloud_project_id:
256  access_token = self._data["token"]["access_token"]
257  self._admin_client_admin_client = api.new_pubsub_admin_client(
258  self.hass, access_token=access_token, cloud_project_id=cloud_project_id
259  )
260  try:
261  eligible_topics = await self._admin_client_admin_client.list_eligible_topics(
262  device_access_project_id=device_access_project_id
263  )
264  except ApiException as err:
265  _LOGGER.error("Error listing eligible Pub/Sub topics: %s", err)
266  errors["base"] = "pubsub_api_error"
267  else:
268  if not eligible_topics.topic_names:
269  errors["base"] = "no_pubsub_topics"
270  if not errors:
271  self._data[CONF_CLOUD_PROJECT_ID] = cloud_project_id
272  self._eligible_topics_eligible_topics = eligible_topics
273  return await self.async_step_pubsub_topicasync_step_pubsub_topic()
274 
275  return self.async_show_form(
276  step_id="pubsub",
277  data_schema=vol.Schema(
278  {
279  vol.Required(CONF_CLOUD_PROJECT_ID, default=cloud_project_id): str,
280  }
281  ),
282  description_placeholders={
283  "url": CLOUD_CONSOLE_URL,
284  "device_access_console_url": DEVICE_ACCESS_CONSOLE_URL,
285  "more_info_url": MORE_INFO_URL,
286  },
287  errors=errors,
288  )
289 
291  self, user_input: dict[str, Any] | None = None
292  ) -> ConfigFlowResult:
293  """Configure and create Pub/Sub topic."""
294  if TYPE_CHECKING:
295  assert self._eligible_topics_eligible_topics
296  if user_input is not None:
297  self._data.update(user_input)
298  return await self.async_step_pubsub_subscriptionasync_step_pubsub_subscription()
299  topics = list(self._eligible_topics_eligible_topics.topic_names)
300  return self.async_show_form(
301  step_id="pubsub_topic",
302  data_schema=vol.Schema(
303  {
304  vol.Optional(CONF_TOPIC_NAME, default=topics[0]): vol.In(topics),
305  }
306  ),
307  description_placeholders={
308  "device_access_console_url": DEVICE_ACCESS_CONSOLE_URL,
309  "more_info_url": MORE_INFO_URL,
310  },
311  )
312 
314  self, user_input: dict[str, Any] | None = None
315  ) -> ConfigFlowResult:
316  """Configure and create Pub/Sub subscription."""
317  if TYPE_CHECKING:
318  assert self._admin_client_admin_client
319  errors = {}
320  if user_input is not None:
321  subscription_name = user_input[CONF_SUBSCRIPTION_NAME]
322  if subscription_name == CREATE_NEW_SUBSCRIPTION_KEY:
323  topic_name = self._data[CONF_TOPIC_NAME]
324  subscription_name = _generate_subscription_id(
325  self._data[CONF_CLOUD_PROJECT_ID]
326  )
327  _LOGGER.debug(
328  "Creating subscription %s on topic %s",
329  subscription_name,
330  topic_name,
331  )
332  try:
333  await self._admin_client_admin_client.create_subscription(
334  topic_name,
335  subscription_name,
336  )
337  except ApiException as err:
338  _LOGGER.error("Error creatingPub/Sub subscription: %s", err)
339  errors["base"] = "pubsub_api_error"
340  else:
341  user_input[CONF_SUBSCRIPTION_NAME] = subscription_name
342  else:
343  # The user created this subscription themselves so do not delete when removing the integration.
344  user_input[CONF_SUBSCRIBER_ID_IMPORTED] = True
345 
346  if not errors:
347  self._data.update(user_input)
348  subscriber = api.new_subscriber_with_token(
349  self.hass,
350  self._data["token"]["access_token"],
351  self._data[CONF_PROJECT_ID],
352  subscription_name,
353  )
354  try:
355  device_manager = await subscriber.async_get_device_manager()
356  except ApiException as err:
357  # Generating a user friendly home name is best effort
358  _LOGGER.debug("Error fetching structures: %s", err)
359  else:
360  self._structure_config_title_structure_config_title = generate_config_title(
361  device_manager.structures.values()
362  )
363  return await self._async_finish_async_finish()
364 
365  subscriptions = {}
366  try:
367  eligible_subscriptions = (
368  await self._admin_client_admin_client.list_eligible_subscriptions(
369  expected_topic_name=self._data[CONF_TOPIC_NAME],
370  )
371  )
372  except ApiException as err:
373  _LOGGER.error(
374  "Error talking to API to list eligible Pub/Sub subscriptions: %s", err
375  )
376  errors["base"] = "pubsub_api_error"
377  else:
378  subscriptions.update(
379  {name: name for name in eligible_subscriptions.subscription_names}
380  )
381  subscriptions[CREATE_NEW_SUBSCRIPTION_KEY] = "Create New"
382  return self.async_show_form(
383  step_id="pubsub_subscription",
384  data_schema=vol.Schema(
385  {
386  vol.Optional(
387  CONF_SUBSCRIPTION_NAME,
388  default=next(iter(subscriptions)),
389  ): vol.In(subscriptions),
390  }
391  ),
392  description_placeholders={
393  "topic": self._data[CONF_TOPIC_NAME],
394  "more_info_url": MORE_INFO_URL,
395  },
396  errors=errors,
397  )
398 
399  async def _async_finish(self) -> ConfigFlowResult:
400  """Create an entry for the SDM flow."""
401  _LOGGER.debug("Creating/updating configuration entry")
402  # Update existing config entry when in the reauth flow.
403  if self.sourcesource == SOURCE_REAUTH:
404  return self.async_update_reload_and_abort(
405  self._get_reauth_entry(),
406  data=self._data,
407  )
408  title = self.flow_impl.name
409  if self._structure_config_title_structure_config_title:
410  title = self._structure_config_title_structure_config_title
411  return self.async_create_entry(title=title, data=self._data)
ConfigFlowResult async_step_pubsub(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:245
ConfigFlowResult async_step_device_project(self, dict|None user_input=None)
Definition: config_flow.py:212
ConfigFlowResult async_step_pubsub_topic(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:292
ConfigFlowResult async_step_create_cloud_project(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:165
ConfigFlowResult async_step_cloud_project(self, dict|None user_input=None)
Definition: config_flow.py:192
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
Definition: config_flow.py:137
ConfigFlowResult async_step_pubsub_subscription(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:315
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:146
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:154
ConfigFlowResult async_oauth_create_entry(self, dict[str, Any] data)
Definition: config_flow.py:125
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
str _generate_subscription_id(str cloud_project_id)
Definition: config_flow.py:67
str|None generate_config_title(Iterable[Structure] structures)
Definition: config_flow.py:73
str get_random_string(int length=10)
Definition: __init__.py:92