Home Assistant Unofficial Reference 2024.12.1
trigger.py
Go to the documentation of this file.
1 """Offer calendar automation rules."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Awaitable, Callable, Coroutine
6 from dataclasses import dataclass
7 import datetime
8 import logging
9 from typing import Any
10 
11 import voluptuous as vol
12 
13 from homeassistant.const import CONF_ENTITY_ID, CONF_EVENT, CONF_OFFSET, CONF_PLATFORM
14 from homeassistant.core import CALLBACK_TYPE, HassJob, HomeAssistant, callback
15 from homeassistant.exceptions import HomeAssistantError
16 from homeassistant.helpers import config_validation as cv
17 from homeassistant.helpers.entity_component import EntityComponent
18 from homeassistant.helpers.event import (
19  async_track_point_in_time,
20  async_track_time_interval,
21 )
22 from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
23 from homeassistant.helpers.typing import ConfigType
24 from homeassistant.util import dt as dt_util
25 
26 from . import CalendarEntity, CalendarEvent
27 from .const import DATA_COMPONENT, DOMAIN
28 
29 _LOGGER = logging.getLogger(__name__)
30 
31 EVENT_START = "start"
32 EVENT_END = "end"
33 UPDATE_INTERVAL = datetime.timedelta(minutes=15)
34 
35 TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
36  {
37  vol.Required(CONF_PLATFORM): DOMAIN,
38  vol.Required(CONF_ENTITY_ID): cv.entity_id,
39  vol.Optional(CONF_EVENT, default=EVENT_START): vol.In({EVENT_START, EVENT_END}),
40  vol.Optional(CONF_OFFSET, default=datetime.timedelta(0)): cv.time_period,
41  }
42 )
43 
44 # mypy: disallow-any-generics
45 
46 
47 @dataclass
49  """An event that is queued to be fired in the future."""
50 
51  trigger_time: datetime.datetime
52  event: CalendarEvent
53 
54 
55 @dataclass
56 class Timespan:
57  """A time range part of start/end dates, used for considering active events."""
58 
59  start: datetime.datetime
60  """The start datetime of the interval."""
61 
62  end: datetime.datetime
63  """The end datetime (exclusive) of the interval."""
64 
65  def with_offset(self, offset: datetime.timedelta) -> Timespan:
66  """Return a new interval shifted by the specified offset."""
67  return Timespan(self.start + offset, self.end + offset)
68 
69  def __contains__(self, trigger: datetime.datetime) -> bool:
70  """Return true if the trigger time is within the time span."""
71  return self.start <= trigger < self.end
72 
74  self, now: datetime.datetime, interval: datetime.timedelta
75  ) -> Timespan:
76  """Return a subsequent time span following the current time span.
77 
78  This effectively gives us a cursor like interface for advancing through
79  time using the interval as a hint. The returned span may have a
80  different interval than the one specified. For example, time span may
81  be longer during a daylight saving time transition, or may extend due to
82  drift if the current interval is old. The returned time span is
83  adjacent and non-overlapping.
84  """
85  return Timespan(self.end, max(self.end, now) + interval)
86 
87  def __str__(self) -> str:
88  """Return a string representing the half open interval time span."""
89  return f"[{self.start}, {self.end})"
90 
91 
92 type EventFetcher = Callable[[Timespan], Awaitable[list[CalendarEvent]]]
93 type QueuedEventFetcher = Callable[[Timespan], Awaitable[list[QueuedCalendarEvent]]]
94 
95 
96 def get_entity(hass: HomeAssistant, entity_id: str) -> CalendarEntity:
97  """Get the calendar entity for the provided entity_id."""
98  component: EntityComponent[CalendarEntity] = hass.data[DATA_COMPONENT]
99  if not (entity := component.get_entity(entity_id)) or not isinstance(
100  entity, CalendarEntity
101  ):
102  raise HomeAssistantError(
103  f"Entity does not exist {entity_id} or is not a calendar entity"
104  )
105  return entity
106 
107 
108 def event_fetcher(hass: HomeAssistant, entity_id: str) -> EventFetcher:
109  """Build an async_get_events wrapper to fetch events during a time span."""
110 
111  async def async_get_events(timespan: Timespan) -> list[CalendarEvent]:
112  """Return events active in the specified time span."""
113  entity = get_entity(hass, entity_id)
114  # Expand by one second to make the end time exclusive
115  end_time = timespan.end + datetime.timedelta(seconds=1)
116  return await entity.async_get_events(hass, timespan.start, end_time)
117 
118  return async_get_events
119 
120 
122  fetcher: EventFetcher, event_type: str, offset: datetime.timedelta
123 ) -> QueuedEventFetcher:
124  """Build a fetcher that produces a schedule of upcoming trigger events."""
125 
126  def get_trigger_time(event: CalendarEvent) -> datetime.datetime:
127  if event_type == EVENT_START:
128  return event.start_datetime_local
129  return event.end_datetime_local
130 
131  async def async_get_events(timespan: Timespan) -> list[QueuedCalendarEvent]:
132  """Get calendar event triggers eligible to fire in the time span."""
133  offset_timespan = timespan.with_offset(-1 * offset)
134  active_events = await fetcher(offset_timespan)
135 
136  # Determine the trigger eligibility of events during this time span.
137  # Example: For an EVENT_END trigger the event may start during this
138  # time span, but need to be triggered later when the end happens.
139  results = []
140  for trigger_time, event in zip(
141  map(get_trigger_time, active_events), active_events, strict=False
142  ):
143  if trigger_time not in offset_timespan:
144  continue
145  results.append(QueuedCalendarEvent(trigger_time + offset, event))
146 
147  _LOGGER.debug(
148  "Scan events @ %s%s found %s eligible of %s active",
149  offset_timespan,
150  f" (offset={offset})" if offset else "",
151  len(results),
152  len(active_events),
153  )
154  results.sort(key=lambda x: x.trigger_time)
155  return results
156 
157  return async_get_events
158 
159 
161  """Helper class to listen to calendar events.
162 
163  This listener will poll every UPDATE_INTERVAL to fetch a set of upcoming
164  calendar events in the upcoming window of time, putting them into a queue.
165  The queue is drained by scheduling an alarm for the next upcoming event
166  trigger time, one event at a time.
167  """
168 
169  def __init__(
170  self,
171  hass: HomeAssistant,
172  job: HassJob[..., Coroutine[Any, Any, None]],
173  trigger_data: dict[str, Any],
174  fetcher: QueuedEventFetcher,
175  ) -> None:
176  """Initialize CalendarEventListener."""
177  self._hass_hass = hass
178  self._job_job = job
179  self._trigger_data_trigger_data = trigger_data
180  self._unsub_event_unsub_event: CALLBACK_TYPE | None = None
181  self._unsub_refresh_unsub_refresh: CALLBACK_TYPE | None = None
182  self._fetcher_fetcher = fetcher
183  now = dt_util.now()
184  self._timespan_timespan = Timespan(now, now + UPDATE_INTERVAL)
185  self._events: list[QueuedCalendarEvent] = []
186 
187  async def async_attach(self) -> None:
188  """Attach a calendar event listener."""
189  self._events.extend(await self._fetcher_fetcher(self._timespan_timespan))
191  self._hass_hass, self._handle_refresh_handle_refresh, UPDATE_INTERVAL
192  )
193  self._listen_next_calendar_event_listen_next_calendar_event()
194 
195  @callback
196  def async_detach(self) -> None:
197  """Detach the calendar event listener."""
198  self._clear_event_listener_clear_event_listener()
199  if self._unsub_refresh_unsub_refresh:
200  self._unsub_refresh_unsub_refresh()
201  self._unsub_refresh_unsub_refresh = None
202 
203  @callback
204  def _listen_next_calendar_event(self) -> None:
205  """Set up the calendar event listener."""
206  if not self._events:
207  return
208 
209  _LOGGER.debug(
210  "Scheduled next event trigger for %s", self._events[0].trigger_time
211  )
213  self._hass_hass,
214  self._handle_calendar_event_handle_calendar_event,
215  self._events[0].trigger_time,
216  )
217 
218  def _clear_event_listener(self) -> None:
219  """Reset the event listener."""
220  if self._unsub_event_unsub_event:
221  self._unsub_event_unsub_event()
222  self._unsub_event_unsub_event = None
223 
224  async def _handle_calendar_event(self, now: datetime.datetime) -> None:
225  """Handle calendar event."""
226  _LOGGER.debug("Calendar event @ %s", dt_util.as_local(now))
227  self._dispatch_events_dispatch_events(now)
228  self._clear_event_listener_clear_event_listener()
229  self._listen_next_calendar_event_listen_next_calendar_event()
230 
231  def _dispatch_events(self, now: datetime.datetime) -> None:
232  """Dispatch all events that are eligible to fire."""
233  while self._events and self._events[0].trigger_time <= now:
234  queued_event = self._events.pop(0)
235  _LOGGER.debug("Dispatching event: %s", queued_event.event)
236  self._hass_hass.async_run_hass_job(
237  self._job_job,
238  {
239  "trigger": {
240  **self._trigger_data_trigger_data,
241  "calendar_event": queued_event.event.as_dict(),
242  }
243  },
244  )
245 
246  async def _handle_refresh(self, now_utc: datetime.datetime) -> None:
247  """Handle core config update."""
248  now = dt_util.as_local(now_utc)
249  _LOGGER.debug("Refresh events @ %s", now)
250  # Dispatch any eligible events in the boundary case where refresh
251  # fires before the calendar event.
252  self._dispatch_events_dispatch_events(now)
253  self._clear_event_listener_clear_event_listener()
254  self._timespan_timespan = self._timespan_timespan.next_upcoming(now, UPDATE_INTERVAL)
255  try:
256  self._events.extend(await self._fetcher_fetcher(self._timespan_timespan))
257  except HomeAssistantError as ex:
258  _LOGGER.error("Calendar trigger failed to fetch events: %s", ex)
259  self._listen_next_calendar_event_listen_next_calendar_event()
260 
261 
263  hass: HomeAssistant,
264  config: ConfigType,
265  action: TriggerActionType,
266  trigger_info: TriggerInfo,
267 ) -> CALLBACK_TYPE:
268  """Attach trigger for the specified calendar."""
269  entity_id = config[CONF_ENTITY_ID]
270  event_type = config[CONF_EVENT]
271  offset = config[CONF_OFFSET]
272 
273  # Validate the entity id is valid
274  get_entity(hass, entity_id)
275 
276  trigger_data = {
277  **trigger_info["trigger_data"],
278  "platform": DOMAIN,
279  "event": event_type,
280  "offset": offset,
281  }
282  listener = CalendarEventListener(
283  hass,
284  HassJob(action),
285  trigger_data,
286  queued_event_fetcher(event_fetcher(hass, entity_id), event_type, offset),
287  )
288  await listener.async_attach()
289  return listener.async_detach
None _handle_calendar_event(self, datetime.datetime now)
Definition: trigger.py:224
None __init__(self, HomeAssistant hass, HassJob[..., Coroutine[Any, Any, None]] job, dict[str, Any] trigger_data, QueuedEventFetcher fetcher)
Definition: trigger.py:175
None _handle_refresh(self, datetime.datetime now_utc)
Definition: trigger.py:246
Timespan with_offset(self, datetime.timedelta offset)
Definition: trigger.py:65
Timespan next_upcoming(self, datetime.datetime now, datetime.timedelta interval)
Definition: trigger.py:75
bool __contains__(self, datetime.datetime trigger)
Definition: trigger.py:69
EventFetcher event_fetcher(HomeAssistant hass, str entity_id)
Definition: trigger.py:108
CalendarEntity get_entity(HomeAssistant hass, str entity_id)
Definition: trigger.py:96
CALLBACK_TYPE async_attach_trigger(HomeAssistant hass, ConfigType config, TriggerActionType action, TriggerInfo trigger_info)
Definition: trigger.py:267
QueuedEventFetcher queued_event_fetcher(EventFetcher fetcher, str event_type, datetime.timedelta offset)
Definition: trigger.py:123
CALLBACK_TYPE async_track_point_in_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Definition: event.py:1462
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679