Home Assistant Unofficial Reference 2024.12.1
dt.py
Go to the documentation of this file.
1 """Helper methods to handle the time in Home Assistant."""
2 
3 from __future__ import annotations
4 
5 import bisect
6 from contextlib import suppress
7 import datetime as dt
8 from functools import lru_cache, partial
9 import re
10 from typing import Any, Literal, overload
11 import zoneinfo
12 
13 from aiozoneinfo import async_get_time_zone as _async_get_time_zone
14 import ciso8601
15 
16 DATE_STR_FORMAT = "%Y-%m-%d"
17 UTC = dt.UTC
18 DEFAULT_TIME_ZONE: dt.tzinfo = dt.UTC
19 
20 # EPOCHORDINAL is not exposed as a constant
21 # https://github.com/python/cpython/blob/3.10/Lib/zoneinfo/_zoneinfo.py#L12
22 EPOCHORDINAL = dt.datetime(1970, 1, 1).toordinal()
23 
24 # Copyright (c) Django Software Foundation and individual contributors.
25 # All rights reserved.
26 # https://github.com/django/django/blob/main/LICENSE
27 DATETIME_RE = re.compile(
28  r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
29  r"[T ](?P<hour>\d{1,2}):(?P<minute>\d{1,2})"
30  r"(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?"
31  r"(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$"
32 )
33 
34 # Copyright (c) Django Software Foundation and individual contributors.
35 # All rights reserved.
36 # https://github.com/django/django/blob/main/LICENSE
37 STANDARD_DURATION_RE = re.compile(
38  r"^"
39  r"(?:(?P<days>-?\d+) (days?, )?)?"
40  r"(?P<sign>-?)"
41  r"((?:(?P<hours>\d+):)(?=\d+:\d+))?"
42  r"(?:(?P<minutes>\d+):)?"
43  r"(?P<seconds>\d+)"
44  r"(?:[\.,](?P<microseconds>\d{1,6})\d{0,6})?"
45  r"$"
46 )
47 
48 # Copyright (c) Django Software Foundation and individual contributors.
49 # All rights reserved.
50 # https://github.com/django/django/blob/main/LICENSE
51 ISO8601_DURATION_RE = re.compile(
52  r"^(?P<sign>[-+]?)"
53  r"P"
54  r"(?:(?P<days>\d+([\.,]\d+)?)D)?"
55  r"(?:T"
56  r"(?:(?P<hours>\d+([\.,]\d+)?)H)?"
57  r"(?:(?P<minutes>\d+([\.,]\d+)?)M)?"
58  r"(?:(?P<seconds>\d+([\.,]\d+)?)S)?"
59  r")?"
60  r"$"
61 )
62 
63 # Copyright (c) Django Software Foundation and individual contributors.
64 # All rights reserved.
65 # https://github.com/django/django/blob/main/LICENSE
66 POSTGRES_INTERVAL_RE = re.compile(
67  r"^"
68  r"(?:(?P<days>-?\d+) (days? ?))?"
69  r"(?:(?P<sign>[-+])?"
70  r"(?P<hours>\d+):"
71  r"(?P<minutes>\d\d):"
72  r"(?P<seconds>\d\d)"
73  r"(?:\.(?P<microseconds>\d{1,6}))?"
74  r")?$"
75 )
76 
77 
78 @lru_cache(maxsize=1)
79 def get_default_time_zone() -> dt.tzinfo:
80  """Get the default time zone."""
81  return DEFAULT_TIME_ZONE
82 
83 
84 def set_default_time_zone(time_zone: dt.tzinfo) -> None:
85  """Set a default time zone to be used when none is specified.
86 
87  Async friendly.
88  """
89  # pylint: disable-next=global-statement
90  global DEFAULT_TIME_ZONE # noqa: PLW0603
91 
92  assert isinstance(time_zone, dt.tzinfo)
93 
94  DEFAULT_TIME_ZONE = time_zone
95  get_default_time_zone.cache_clear()
96 
97 
98 def get_time_zone(time_zone_str: str) -> zoneinfo.ZoneInfo | None:
99  """Get time zone from string. Return None if unable to determine.
100 
101  Must be run in the executor if the ZoneInfo is not already
102  in the cache. If you are not sure, use async_get_time_zone.
103  """
104  try:
105  return zoneinfo.ZoneInfo(time_zone_str)
106  except zoneinfo.ZoneInfoNotFoundError:
107  return None
108 
109 
110 async def async_get_time_zone(time_zone_str: str) -> zoneinfo.ZoneInfo | None:
111  """Get time zone from string. Return None if unable to determine.
112 
113  Async friendly.
114  """
115  try:
116  return await _async_get_time_zone(time_zone_str)
117  except zoneinfo.ZoneInfoNotFoundError:
118  return None
119 
120 
121 # We use a partial here since it is implemented in native code
122 # and avoids the global lookup of UTC
123 utcnow = partial(dt.datetime.now, UTC)
124 utcnow.__doc__ = "Get now in UTC time."
125 
126 
127 def now(time_zone: dt.tzinfo | None = None) -> dt.datetime:
128  """Get now in specified time zone."""
129  return dt.datetime.now(time_zone or DEFAULT_TIME_ZONE)
130 
131 
132 def as_utc(dattim: dt.datetime) -> dt.datetime:
133  """Return a datetime as UTC time.
134 
135  Assumes datetime without tzinfo to be in the DEFAULT_TIME_ZONE.
136  """
137  if dattim.tzinfo == UTC:
138  return dattim
139  if dattim.tzinfo is None:
140  dattim = dattim.replace(tzinfo=DEFAULT_TIME_ZONE)
141 
142  return dattim.astimezone(UTC)
143 
144 
145 def as_timestamp(dt_value: dt.datetime | str) -> float:
146  """Convert a date/time into a unix time (seconds since 1970)."""
147  parsed_dt: dt.datetime | None
148  if isinstance(dt_value, dt.datetime):
149  parsed_dt = dt_value
150  else:
151  parsed_dt = parse_datetime(str(dt_value))
152  if parsed_dt is None:
153  raise ValueError("not a valid date/time.")
154  return parsed_dt.timestamp()
155 
156 
157 def as_local(dattim: dt.datetime) -> dt.datetime:
158  """Convert a UTC datetime object to local time zone."""
159  if dattim.tzinfo == DEFAULT_TIME_ZONE:
160  return dattim
161  if dattim.tzinfo is None:
162  dattim = dattim.replace(tzinfo=DEFAULT_TIME_ZONE)
163 
164  return dattim.astimezone(DEFAULT_TIME_ZONE)
165 
166 
167 # We use a partial here to improve performance by avoiding the global lookup
168 # of UTC and the function call overhead.
169 utc_from_timestamp = partial(dt.datetime.fromtimestamp, tz=UTC)
170 """Return a UTC time from a timestamp."""
171 
172 
173 def utc_to_timestamp(utc_dt: dt.datetime) -> float:
174  """Fast conversion of a datetime in UTC to a timestamp."""
175  # Taken from
176  # https://github.com/python/cpython/blob/3.10/Lib/zoneinfo/_zoneinfo.py#L185
177  return (
178  (utc_dt.toordinal() - EPOCHORDINAL) * 86400
179  + utc_dt.hour * 3600
180  + utc_dt.minute * 60
181  + utc_dt.second
182  + (utc_dt.microsecond / 1000000)
183  )
184 
185 
186 def start_of_local_day(dt_or_d: dt.date | dt.datetime | None = None) -> dt.datetime:
187  """Return local datetime object of start of day from date or datetime."""
188  if dt_or_d is None:
189  date: dt.date = now().date()
190  elif isinstance(dt_or_d, dt.datetime):
191  date = dt_or_d.date()
192  else:
193  date = dt_or_d
194 
195  return dt.datetime.combine(date, dt.time(), tzinfo=DEFAULT_TIME_ZONE)
196 
197 
198 # Copyright (c) Django Software Foundation and individual contributors.
199 # All rights reserved.
200 # https://github.com/django/django/blob/main/LICENSE
201 @overload
202 def parse_datetime(dt_str: str) -> dt.datetime | None: ...
203 
204 
205 @overload
206 def parse_datetime(dt_str: str, *, raise_on_error: Literal[True]) -> dt.datetime: ...
207 
208 
209 @overload
211  dt_str: str, *, raise_on_error: Literal[False]
212 ) -> dt.datetime | None: ...
213 
214 
215 def parse_datetime(dt_str: str, *, raise_on_error: bool = False) -> dt.datetime | None:
216  """Parse a string and return a datetime.datetime.
217 
218  This function supports time zone offsets. When the input contains one,
219  the output uses a timezone with a fixed offset from UTC.
220  Raises ValueError if the input is well formatted but not a valid datetime.
221 
222  If the input isn't well formatted, returns None if raise_on_error is False
223  or raises ValueError if it's True.
224  """
225  # First try if the string can be parsed by the fast ciso8601 library
226  with suppress(ValueError, IndexError):
227  return ciso8601.parse_datetime(dt_str)
228 
229  # ciso8601 failed to parse the string, fall back to regex
230  if not (match := DATETIME_RE.match(dt_str)):
231  if raise_on_error:
232  raise ValueError
233  return None
234  kws: dict[str, Any] = match.groupdict()
235  if kws["microsecond"]:
236  kws["microsecond"] = kws["microsecond"].ljust(6, "0")
237  tzinfo_str = kws.pop("tzinfo")
238 
239  tzinfo: dt.tzinfo | None = None
240  if tzinfo_str == "Z":
241  tzinfo = UTC
242  elif tzinfo_str is not None:
243  offset_mins = int(tzinfo_str[-2:]) if len(tzinfo_str) > 3 else 0
244  offset_hours = int(tzinfo_str[1:3])
245  offset = dt.timedelta(hours=offset_hours, minutes=offset_mins)
246  if tzinfo_str[0] == "-":
247  offset = -offset
248  tzinfo = dt.timezone(offset)
249  kws = {k: int(v) for k, v in kws.items() if v is not None}
250  kws["tzinfo"] = tzinfo
251  return dt.datetime(**kws)
252 
253 
254 def parse_date(dt_str: str) -> dt.date | None:
255  """Convert a date string to a date object."""
256  try:
257  return dt.datetime.strptime(dt_str, DATE_STR_FORMAT).date()
258  except ValueError: # If dt_str did not match our format
259  return None
260 
261 
262 # Copyright (c) Django Software Foundation and individual contributors.
263 # All rights reserved.
264 # https://github.com/django/django/blob/master/LICENSE
265 def parse_duration(value: str) -> dt.timedelta | None:
266  """Parse a duration string and return a datetime.timedelta.
267 
268  Also supports ISO 8601 representation and PostgreSQL's day-time interval
269  format.
270  """
271  match = (
272  STANDARD_DURATION_RE.match(value)
273  or ISO8601_DURATION_RE.match(value)
274  or POSTGRES_INTERVAL_RE.match(value)
275  )
276  if match:
277  kws = match.groupdict()
278  sign = -1 if kws.pop("sign", "+") == "-" else 1
279  if kws.get("microseconds"):
280  kws["microseconds"] = kws["microseconds"].ljust(6, "0")
281  time_delta_args: dict[str, float] = {
282  k: float(v.replace(",", ".")) for k, v in kws.items() if v is not None
283  }
284  days = dt.timedelta(float(time_delta_args.pop("days", 0.0) or 0.0))
285  if match.re == ISO8601_DURATION_RE:
286  days *= sign
287  return days + sign * dt.timedelta(**time_delta_args)
288  return None
289 
290 
291 def parse_time(time_str: str) -> dt.time | None:
292  """Parse a time string (00:20:00) into Time object.
293 
294  Return None if invalid.
295  """
296  parts = str(time_str).split(":")
297  if len(parts) < 2:
298  return None
299  try:
300  hour = int(parts[0])
301  minute = int(parts[1])
302  second = int(parts[2]) if len(parts) > 2 else 0
303  return dt.time(hour, minute, second)
304  except ValueError:
305  # ValueError if value cannot be converted to an int or not in range
306  return None
307 
308 
309 def _get_timestring(timediff: float, precision: int = 1) -> str:
310  """Return a string representation of a time diff."""
311 
312  def formatn(number: int, unit: str) -> str:
313  """Add "unit" if it's plural."""
314  if number == 1:
315  return f"1 {unit} "
316  return f"{number:d} {unit}s "
317 
318  if timediff == 0.0:
319  return "0 seconds"
320 
321  units = ("year", "month", "day", "hour", "minute", "second")
322 
323  factors = (365 * 24 * 60 * 60, 30 * 24 * 60 * 60, 24 * 60 * 60, 60 * 60, 60, 1)
324 
325  result_string: str = ""
326  current_precision = 0
327 
328  for i, current_factor in enumerate(factors):
329  selected_unit = units[i]
330  if timediff < current_factor:
331  continue
332  current_precision = current_precision + 1
333  if current_precision == precision:
334  return (
335  result_string + formatn(round(timediff / current_factor), selected_unit)
336  ).rstrip()
337  curr_diff = int(timediff // current_factor)
338  result_string += formatn(curr_diff, selected_unit)
339  timediff -= (curr_diff) * current_factor
340 
341  return result_string.rstrip()
342 
343 
344 def get_age(date: dt.datetime, precision: int = 1) -> str:
345  """Take a datetime and return its "age" as a string.
346 
347  The age can be in second, minute, hour, day, month and year.
348 
349  depth number of units will be returned, with the last unit rounded
350 
351  The date must be in the past or a ValueException will be raised.
352  """
353 
354  delta = (now() - date).total_seconds()
355 
356  rounded_delta = round(delta)
357 
358  if rounded_delta < 0:
359  raise ValueError("Time value is in the future")
360  return _get_timestring(rounded_delta, precision)
361 
362 
363 def get_time_remaining(date: dt.datetime, precision: int = 1) -> str:
364  """Take a datetime and return its "age" as a string.
365 
366  The age can be in second, minute, hour, day, month and year.
367 
368  depth number of units will be returned, with the last unit rounded
369 
370  The date must be in the future or a ValueException will be raised.
371  """
372 
373  delta = (date - now()).total_seconds()
374 
375  rounded_delta = round(delta)
376 
377  if rounded_delta < 0:
378  raise ValueError("Time value is in the past")
379 
380  return _get_timestring(rounded_delta, precision)
381 
382 
383 def parse_time_expression(parameter: Any, min_value: int, max_value: int) -> list[int]:
384  """Parse the time expression part and return a list of times to match."""
385  if parameter is None or parameter == "*":
386  res = list(range(min_value, max_value + 1))
387  elif isinstance(parameter, str):
388  if parameter.startswith("/"):
389  parameter = int(parameter[1:])
390  res = [x for x in range(min_value, max_value + 1) if x % parameter == 0]
391  else:
392  res = [int(parameter)]
393 
394  elif not hasattr(parameter, "__iter__"):
395  res = [int(parameter)]
396  else:
397  res = sorted(int(x) for x in parameter)
398 
399  for val in res:
400  if val < min_value or val > max_value:
401  raise ValueError(
402  f"Time expression '{parameter}': parameter {val} out of range "
403  f"({min_value} to {max_value})"
404  )
405 
406  return res
407 
408 
409 def _dst_offset_diff(dattim: dt.datetime) -> dt.timedelta:
410  """Return the offset when crossing the DST barrier."""
411  delta = dt.timedelta(hours=24)
412  return (dattim + delta).utcoffset() - ( # type: ignore[operator]
413  dattim - delta
414  ).utcoffset()
415 
416 
417 def _lower_bound(arr: list[int], cmp: int) -> int | None:
418  """Return the first value in arr greater or equal to cmp.
419 
420  Return None if no such value exists.
421  """
422  if (left := bisect.bisect_left(arr, cmp)) == len(arr):
423  return None
424  return arr[left]
425 
426 
428  now: dt.datetime, # pylint: disable=redefined-outer-name
429  seconds: list[int],
430  minutes: list[int],
431  hours: list[int],
432 ) -> dt.datetime:
433  """Find the next datetime from now for which the time expression matches.
434 
435  The algorithm looks at each time unit separately and tries to find the
436  next one that matches for each. If any of them would roll over, all
437  time units below that are reset to the first matching value.
438 
439  Timezones are also handled (the tzinfo of the now object is used),
440  including daylight saving time.
441  """
442  if not seconds or not minutes or not hours:
443  raise ValueError("Cannot find a next time: Time expression never matches!")
444 
445  while True:
446  # Reset microseconds and fold; fold (for ambiguous DST times) will be
447  # handled later.
448  result = now.replace(microsecond=0, fold=0)
449 
450  # Match next second
451  if (next_second := _lower_bound(seconds, result.second)) is None:
452  # No second to match in this minute. Roll-over to next minute.
453  next_second = seconds[0]
454  result += dt.timedelta(minutes=1)
455 
456  if result.second != next_second:
457  result = result.replace(second=next_second)
458 
459  # Match next minute
460  next_minute = _lower_bound(minutes, result.minute)
461  if next_minute != result.minute:
462  # We're in the next minute. Seconds needs to be reset.
463  result = result.replace(second=seconds[0])
464 
465  if next_minute is None:
466  # No minute to match in this hour. Roll-over to next hour.
467  next_minute = minutes[0]
468  result += dt.timedelta(hours=1)
469 
470  if result.minute != next_minute:
471  result = result.replace(minute=next_minute)
472 
473  # Match next hour
474  next_hour = _lower_bound(hours, result.hour)
475  if next_hour != result.hour:
476  # We're in the next hour. Seconds+minutes needs to be reset.
477  result = result.replace(second=seconds[0], minute=minutes[0])
478 
479  if next_hour is None:
480  # No minute to match in this day. Roll-over to next day.
481  next_hour = hours[0]
482  result += dt.timedelta(days=1)
483 
484  if result.hour != next_hour:
485  result = result.replace(hour=next_hour)
486 
487  if result.tzinfo in (None, UTC):
488  # Using UTC, no DST checking needed
489  return result
490 
491  if not _datetime_exists(result):
492  # When entering DST and clocks are turned forward.
493  # There are wall clock times that don't "exist" (an hour is skipped).
494 
495  # -> trigger on the next time that 1. matches the pattern and 2. does exist
496  # for example:
497  # on 2021.03.28 02:00:00 in CET timezone clocks are turned forward an hour
498  # with pattern "02:30", don't run on 28 mar (such a wall time does not
499  # exist on this day) instead run at 02:30 the next day
500 
501  # We solve this edge case by just iterating one second until the result
502  # exists (max. 3600 operations, which should be fine for an edge case that
503  # happens once a year)
504  now += dt.timedelta(seconds=1)
505  continue
506 
507  if not _datetime_ambiguous(now):
508  return result
509 
510  # When leaving DST and clocks are turned backward.
511  # Then there are wall clock times that are ambiguous i.e. exist with DST and
512  # without DST. The logic above does not take into account if a given pattern
513  # matches _twice_ in a day.
514  # Example: on 2021.10.31 02:00:00 in CET timezone clocks are turned
515  # backward an hour.
516 
517  if _datetime_ambiguous(result):
518  # `now` and `result` are both ambiguous, so the next match happens
519  # _within_ the current fold.
520 
521  # Examples:
522  # 1. 2021.10.31 02:00:00+02:00 with pattern 02:30
523  # -> 2021.10.31 02:30:00+02:00
524  # 2. 2021.10.31 02:00:00+01:00 with pattern 02:30
525  # -> 2021.10.31 02:30:00+01:00
526  return result.replace(fold=now.fold)
527 
528  if now.fold == 0:
529  # `now` is in the first fold, but result is not ambiguous (meaning it no
530  # longer matches within the fold).
531  # -> Check if result matches in the next fold. If so, emit that match
532 
533  # Turn back the time by the DST offset, effectively run the algorithm on
534  # the first fold. If it matches on the first fold, that means it will also
535  # match on the second one.
536 
537  # Example: 2021.10.31 02:45:00+02:00 with pattern 02:30
538  # -> 2021.10.31 02:30:00+01:00
539 
540  check_result = find_next_time_expression_time(
541  now + _dst_offset_diff(now), seconds, minutes, hours
542  )
543  if _datetime_ambiguous(check_result):
544  return check_result.replace(fold=1)
545 
546  return result
547 
548 
549 def _datetime_exists(dattim: dt.datetime) -> bool:
550  """Check if a datetime exists."""
551  assert dattim.tzinfo is not None
552  original_tzinfo = dattim.tzinfo
553  # Check if we can round trip to UTC
554  return dattim == dattim.astimezone(UTC).astimezone(original_tzinfo)
555 
556 
557 def _datetime_ambiguous(dattim: dt.datetime) -> bool:
558  """Check whether a datetime is ambiguous."""
559  assert dattim.tzinfo is not None
560  opposite_fold = dattim.replace(fold=not dattim.fold)
561  return _datetime_exists(dattim) and dattim.utcoffset() != opposite_fold.utcoffset()
dt.datetime as_utc(dt.datetime dattim)
Definition: dt.py:132
zoneinfo.ZoneInfo|None get_time_zone(str time_zone_str)
Definition: dt.py:98
dt.date|None parse_date(str dt_str)
Definition: dt.py:254
dt.datetime|None parse_datetime(str dt_str)
Definition: dt.py:202
None set_default_time_zone(dt.tzinfo time_zone)
Definition: dt.py:84
str get_age(dt.datetime date, int precision=1)
Definition: dt.py:344
float utc_to_timestamp(dt.datetime utc_dt)
Definition: dt.py:173
int|None _lower_bound(list[int] arr, int cmp)
Definition: dt.py:417
dt.datetime start_of_local_day(dt.date|dt.datetime|None dt_or_d=None)
Definition: dt.py:186
dt.datetime find_next_time_expression_time(dt.datetime now, list[int] seconds, list[int] minutes, list[int] hours)
Definition: dt.py:432
dt.time|None parse_time(str time_str)
Definition: dt.py:291
bool _datetime_exists(dt.datetime dattim)
Definition: dt.py:549
dt.timedelta _dst_offset_diff(dt.datetime dattim)
Definition: dt.py:409
float as_timestamp(dt.datetime|str dt_value)
Definition: dt.py:145
bool _datetime_ambiguous(dt.datetime dattim)
Definition: dt.py:557
dt.datetime as_local(dt.datetime dattim)
Definition: dt.py:157
dt.tzinfo get_default_time_zone()
Definition: dt.py:79
str _get_timestring(float timediff, int precision=1)
Definition: dt.py:309
str get_time_remaining(dt.datetime date, int precision=1)
Definition: dt.py:363
list[int] parse_time_expression(Any parameter, int min_value, int max_value)
Definition: dt.py:383
dt.timedelta|None parse_duration(str value)
Definition: dt.py:265
zoneinfo.ZoneInfo|None async_get_time_zone(str time_zone_str)
Definition: dt.py:110
dt.datetime now(dt.tzinfo|None time_zone=None)
Definition: dt.py:127