1 """Config flow for Google Tasks."""
3 from collections.abc
import Mapping
7 from google.oauth2.credentials
import Credentials
8 from googleapiclient.discovery
import build
9 from googleapiclient.errors
import HttpError
10 from googleapiclient.http
import HttpRequest
16 from .const
import DOMAIN, OAUTH2_SCOPES
20 config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
22 """Config flow to handle Google Tasks OAuth2 authentication."""
29 return logging.getLogger(__name__)
33 """Extra data that needs to be appended to the authorize url."""
35 "scope":
" ".join(OAUTH2_SCOPES),
37 "access_type":
"offline",
42 """Create an entry for the flow."""
43 credentials = Credentials(token=data[CONF_TOKEN][CONF_ACCESS_TOKEN])
45 user_resource =
build(
48 credentials=credentials,
50 user_resource_cmd: HttpRequest = user_resource.userinfo().
get()
51 user_resource_info = await self.hass.async_add_executor_job(
52 user_resource_cmd.execute
57 credentials=credentials,
59 cmd: HttpRequest = resource.tasklists().
list()
60 await self.hass.async_add_executor_job(cmd.execute)
61 except HttpError
as ex:
63 return self.async_abort(
64 reason=
"access_not_configured",
65 description_placeholders={
"message": error},
68 self.
loggerlogger.exception(
"Unknown error occurred")
69 return self.async_abort(reason=
"unknown")
70 user_id = user_resource_info[
"id"]
71 await self.async_set_unique_id(user_id)
73 if self.source != SOURCE_REAUTH:
74 self._abort_if_unique_id_configured()
75 return self.async_create_entry(title=user_resource_info[
"name"], data=data)
77 reauth_entry = self._get_reauth_entry()
78 if reauth_entry.unique_id:
79 self._abort_if_unique_id_mismatch(reason=
"wrong_account")
81 return self.async_update_reload_and_abort(
82 reauth_entry, unique_id=user_id, data=data
86 self, entry_data: Mapping[str, Any]
87 ) -> ConfigFlowResult:
88 """Perform reauth upon an API authentication error."""
92 self, user_input: dict[str, Any] |
None =
None
93 ) -> ConfigFlowResult:
94 """Confirm reauth dialog."""
95 if user_input
is None:
96 return self.async_show_form(step_id=
"reauth_confirm")
97 return await self.async_step_user()
dict[str, Any] extra_authorize_data(self)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
logging.Logger logger(self)
ConfigFlowResult async_oauth_create_entry(self, dict[str, Any] data)
web.Response get(self, web.Request request, str config_key)