1 """API for Google Nest Device Access bound to Home Assistant OAuth."""
3 from __future__
import annotations
7 from typing
import cast
9 from aiohttp
import ClientSession
10 from google.oauth2.credentials
import Credentials
11 from google_nest_sdm.admin_client
import PUBSUB_API_HOST, AdminClient
12 from google_nest_sdm.auth
import AbstractAuth
13 from google_nest_sdm.google_nest_subscriber
import GoogleNestSubscriber
23 CONF_SUBSCRIPTION_NAME,
28 _LOGGER = logging.getLogger(__name__)
32 """Provide Google Nest Device Access authentication tied to an OAuth2 based config entry."""
36 websession: ClientSession,
37 oauth_session: config_entry_oauth2_flow.OAuth2Session,
41 """Initialize Google Nest Device Access auth."""
42 super().
__init__(websession, API_URL)
48 """Return a valid access token for SDM API."""
50 return cast(str, self.
_oauth_session_oauth_session.token[
"access_token"])
53 """Return an OAuth credential for Pub/Sub Subscriber."""
62 token=token[
"access_token"],
63 refresh_token=token[
"refresh_token"],
64 token_uri=OAUTH2_TOKEN,
69 creds.expiry = datetime.datetime.fromtimestamp(token[
"expires_at"])
74 """Authentication implementation used during config flow, without refresh.
76 This exists to allow the config flow to use the API before it has fully
77 created a config entry required by OAuth2Session. This does not support
78 refreshing tokens, which is fine since it should have been just created.
83 websession: ClientSession,
87 """Init the Nest client library auth implementation."""
92 """Return the access token."""
96 """Return an OAuth credential for Pub/Sub Subscriber."""
99 token_uri=OAUTH2_TOKEN,
105 hass: HomeAssistant, entry: ConfigEntry
106 ) -> GoogleNestSubscriber |
None:
107 """Create a GoogleNestSubscriber."""
109 await config_entry_oauth2_flow.async_get_config_entry_implementation(
114 implementation, config_entry_oauth2_flow.LocalOAuth2Implementation
116 raise TypeError(f
"Unexpected auth implementation {implementation}")
117 if (subscription_name := entry.data.get(CONF_SUBSCRIPTION_NAME))
is None:
118 subscription_name = entry.data[CONF_SUBSCRIBER_ID]
120 aiohttp_client.async_get_clientsession(hass),
121 config_entry_oauth2_flow.OAuth2Session(hass, entry, implementation),
122 implementation.client_id,
123 implementation.client_secret,
125 return GoogleNestSubscriber(auth, entry.data[CONF_PROJECT_ID], subscription_name)
132 subscription_name: str,
133 ) -> GoogleNestSubscriber:
134 """Create a GoogleNestSubscriber with an access token."""
135 return GoogleNestSubscriber(
137 aiohttp_client.async_get_clientsession(hass),
149 cloud_project_id: str,
151 """Create a Nest AdminClient with an access token."""
154 aiohttp_client.async_get_clientsession(hass),
158 cloud_project_id=cloud_project_id,
None __init__(self, ClientSession websession, str access_token, str host)
str async_get_access_token(self)
Credentials async_get_creds(self)
Credentials async_get_creds(self)
str async_get_access_token(self)
None __init__(self, ClientSession websession, config_entry_oauth2_flow.OAuth2Session oauth_session, str client_id, str client_secret)
GoogleNestSubscriber new_subscriber_with_token(HomeAssistant hass, str access_token, str project_id, str subscription_name)
AdminClient new_pubsub_admin_client(HomeAssistant hass, str access_token, str cloud_project_id)
GoogleNestSubscriber|None new_subscriber(HomeAssistant hass, ConfigEntry entry)