1 """Support for Google Calendar Search binary sensors."""
3 from __future__
import annotations
5 from collections.abc
import Iterable
6 from datetime
import datetime, timedelta
10 from gcal_sync.api
import GoogleCalendarService, ListEventsRequest
11 from gcal_sync.exceptions
import ApiException
12 from gcal_sync.model
import Event
13 from gcal_sync.sync
import CalendarEventSyncManager
14 from gcal_sync.timeline
import Timeline
15 from ical.iter
import SortableItemValue
23 _LOGGER = logging.getLogger(__name__)
28 MAX_UPCOMING_EVENTS = 20
32 """Truncate the timeline to a maximum number of events.
34 This is used to avoid repeated expansion of recurring events during
35 state machine updates.
37 upcoming = timeline.active_after(dt_util.now())
38 truncated =
list(itertools.islice(upcoming, max_events))
41 SortableItemValue(event.timespan_of(dt_util.get_default_time_zone()), event)
42 for event
in truncated
48 """Coordinator for calendar RPC calls that use an efficient sync."""
50 config_entry: ConfigEntry
55 sync: CalendarEventSyncManager,
58 """Create the CalendarSyncUpdateCoordinator."""
63 update_interval=MIN_TIME_BETWEEN_UPDATES,
69 """Fetch data from API endpoint."""
72 except ApiException
as err:
73 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
75 timeline = await self.
syncsync.store_service.async_get_timeline(
76 dt_util.get_default_time_zone()
82 self, start_date: datetime, end_date: datetime
84 """Get all events in a specific time frame."""
87 "Unable to get events: Sync from server has not completed"
89 return self.
datadata.overlapping(
96 """Return upcoming events if any."""
103 """Coordinator for calendar RPC calls.
105 This sends a polling RPC, not using sync, as a workaround
106 for limitations in the calendar API for supporting search.
109 config_entry: ConfigEntry
114 calendar_service: GoogleCalendarService,
119 """Create the CalendarQueryUpdateCoordinator."""
124 update_interval=MIN_TIME_BETWEEN_UPDATES,
131 self, start_date: datetime, end_date: datetime
132 ) -> Iterable[Event]:
133 """Get all events in a specific time frame."""
134 request = ListEventsRequest(
136 start_time=start_date,
142 result = await self.
calendar_servicecalendar_service.async_list_events(request)
143 async
for result_page
in result:
144 result_items.extend(result_page.items)
145 except ApiException
as err:
151 """Fetch data from API endpoint."""
152 request = ListEventsRequest(calendar_id=self.
calendar_idcalendar_id, search=self.
_search_search)
154 result = await self.
calendar_servicecalendar_service.async_list_events(request)
155 except ApiException
as err:
156 raise UpdateFailed(f
"Error communicating with API: {err}")
from err
161 """Return the next upcoming event if any."""
Iterable[Event]|None upcoming(self)
list[Event] _async_update_data(self)
None __init__(self, HomeAssistant hass, GoogleCalendarService calendar_service, str name, str calendar_id, str|None search)
Iterable[Event] async_get_events(self, datetime start_date, datetime end_date)
None __init__(self, HomeAssistant hass, CalendarEventSyncManager sync, str name)
Iterable[Event] async_get_events(self, datetime start_date, datetime end_date)
Timeline _async_update_data(self)
Iterable[Event]|None upcoming(self)
None async_set_update_error(self, Exception err)
Timeline _truncate_timeline(Timeline timeline, int max_events)
int run(RuntimeConfig runtime_config)