1 """API for Google Tasks bound to Home Assistant OAuth."""
3 from functools
import partial
8 from google.oauth2.credentials
import Credentials
9 from googleapiclient.discovery
import Resource, build
10 from googleapiclient.errors
import HttpError
11 from googleapiclient.http
import BatchHttpRequest, HttpRequest
17 from .exceptions
import GoogleTasksApiError
19 _LOGGER = logging.getLogger(__name__)
21 MAX_TASK_RESULTS = 100
25 """Raise a GoogleTasksApiError if the response contains an error."""
26 if not isinstance(result, dict):
28 f
"Google Tasks API replied with unexpected response: {result}"
30 if error := result.get(
"error"):
31 message = error.get(
"message",
"Unknown Error")
36 """Provide Google Tasks authentication tied to an OAuth2 based config entry."""
41 oauth2_session: config_entry_oauth2_flow.OAuth2Session,
43 """Initialize Google Tasks Auth."""
48 """Return a valid access token."""
53 """Get current resource."""
55 return await self.
_hass_hass.async_add_executor_job(
56 partial(build,
"tasks",
"v1", credentials=Credentials(token=token))
60 """Get all TaskList resources."""
62 cmd: HttpRequest = service.tasklists().
list()
63 result = await self.
_execute_execute(cmd)
64 return result[
"items"]
66 async
def list_tasks(self, task_list_id: str) -> list[dict[str, Any]]:
67 """Get all Task resources for the task list."""
69 cmd: HttpRequest = service.tasks().
list(
70 tasklist=task_list_id,
71 maxResults=MAX_TASK_RESULTS,
75 result = await self.
_execute_execute(cmd)
76 return result[
"items"]
83 """Create a new Task resource on the task list."""
85 cmd: HttpRequest = service.tasks().
insert(
86 tasklist=task_list_id,
97 """Update a task resource."""
99 cmd: HttpRequest = service.tasks().
patch(
100 tasklist=task_list_id,
111 """Delete a task resources."""
113 batch: BatchHttpRequest = service.new_batch_http_request()
115 def response_handler(_, response, exception: HttpError) ->
None:
116 if exception
is not None:
118 f
"Google Tasks API responded with error ({exception.status_code})"
121 data = json.loads(response)
124 for task_id
in task_ids:
127 tasklist=task_list_id,
131 callback=response_handler,
139 previous: str |
None,
141 """Move a task resource to a specific position within the task list."""
143 cmd: HttpRequest = service.tasks().
move(
144 tasklist=task_list_id,
150 async
def _execute(self, request: HttpRequest | BatchHttpRequest) -> Any:
152 result = await self.
_hass_hass.async_add_executor_job(request.execute)
153 except HttpError
as err:
155 f
"Google Tasks API responded with error ({err.status_code})"
Any _execute(self, HttpRequest|BatchHttpRequest request)
None __init__(self, HomeAssistant hass, config_entry_oauth2_flow.OAuth2Session oauth2_session)
None move(self, str task_list_id, str task_id, str|None previous)
list[dict[str, Any]] list_task_lists(self)
list[dict[str, Any]] list_tasks(self, str task_list_id)
Resource _get_service(self)
None insert(self, str task_list_id, dict[str, Any] task)
None delete(self, str task_list_id, list[str] task_ids)
None patch(self, str task_list_id, str task_id, dict[str, Any] task)
str async_get_access_token(self)
None _raise_if_error(Any|dict[str, Any] result)