1 """Config flow for Google Mail integration."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
7 from typing
import Any, cast
9 from google.oauth2.credentials
import Credentials
10 from googleapiclient.discovery
import build
16 from .const
import DEFAULT_ACCESS, DOMAIN
20 config_entry_oauth2_flow.AbstractOAuth2FlowHandler, domain=DOMAIN
22 """Config flow to handle Google Mail OAuth2 authentication."""
29 return logging.getLogger(__name__)
33 """Extra data that needs to be appended to the authorize url."""
35 "scope":
" ".join(DEFAULT_ACCESS),
37 "access_type":
"offline",
42 self, entry_data: Mapping[str, Any]
43 ) -> ConfigFlowResult:
44 """Perform reauth upon an API authentication error."""
48 self, user_input: dict[str, Any] |
None =
None
49 ) -> ConfigFlowResult:
50 """Confirm reauth dialog."""
51 if user_input
is None:
52 return self.async_show_form(step_id=
"reauth_confirm")
53 return await self.async_step_user()
56 """Create an entry for the flow, or update existing entry."""
58 def _get_profile() -> str:
59 """Get profile from inside the executor."""
60 users =
build(
"gmail",
"v1", credentials=credentials).users()
61 return users.getProfile(userId=
"me").
execute()[
"emailAddress"]
63 credentials = Credentials(data[CONF_TOKEN][CONF_ACCESS_TOKEN])
64 email = await self.hass.async_add_executor_job(_get_profile)
66 await self.async_set_unique_id(email)
67 if self.source != SOURCE_REAUTH:
68 self._abort_if_unique_id_configured()
70 return self.async_create_entry(title=email, data=data)
72 reauth_entry = self._get_reauth_entry()
73 self._abort_if_unique_id_mismatch(
74 reason=
"wrong_account",
75 description_placeholders={
"email": cast(str, reauth_entry.unique_id)},
77 return self.async_update_reload_and_abort(reauth_entry, data=data)
ConfigFlowResult async_step_reauth(self, Mapping[str, Any] entry_data)
ConfigFlowResult async_oauth_create_entry(self, dict[str, Any] data)
logging.Logger logger(self)
dict[str, Any] extra_authorize_data(self)
ConfigFlowResult async_step_reauth_confirm(self, dict[str, Any]|None user_input=None)
def execute(hass, filename, source, data=None, return_response=False)