1 """Helper methods to handle the time in Home Assistant."""
3 from __future__
import annotations
6 from contextlib
import suppress
8 from functools
import lru_cache, partial
10 from typing
import Any, Literal, overload
13 from aiozoneinfo
import async_get_time_zone
as _async_get_time_zone
16 DATE_STR_FORMAT =
"%Y-%m-%d"
18 DEFAULT_TIME_ZONE: dt.tzinfo = dt.UTC
22 EPOCHORDINAL = dt.datetime(1970, 1, 1).toordinal()
27 DATETIME_RE = re.compile(
28 r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
29 r"[T ](?P<hour>\d{1,2}):(?P<minute>\d{1,2})"
30 r"(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?"
31 r"(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$"
37 STANDARD_DURATION_RE = re.compile(
39 r"(?:(?P<days>-?\d+) (days?, )?)?"
41 r"((?:(?P<hours>\d+):)(?=\d+:\d+))?"
42 r"(?:(?P<minutes>\d+):)?"
44 r"(?:[\.,](?P<microseconds>\d{1,6})\d{0,6})?"
51 ISO8601_DURATION_RE = re.compile(
54 r"(?:(?P<days>\d+([\.,]\d+)?)D)?"
56 r"(?:(?P<hours>\d+([\.,]\d+)?)H)?"
57 r"(?:(?P<minutes>\d+([\.,]\d+)?)M)?"
58 r"(?:(?P<seconds>\d+([\.,]\d+)?)S)?"
66 POSTGRES_INTERVAL_RE = re.compile(
68 r"(?:(?P<days>-?\d+) (days? ?))?"
73 r"(?:\.(?P<microseconds>\d{1,6}))?"
80 """Get the default time zone."""
81 return DEFAULT_TIME_ZONE
85 """Set a default time zone to be used when none is specified.
90 global DEFAULT_TIME_ZONE
92 assert isinstance(time_zone, dt.tzinfo)
94 DEFAULT_TIME_ZONE = time_zone
95 get_default_time_zone.cache_clear()
99 """Get time zone from string. Return None if unable to determine.
101 Must be run in the executor if the ZoneInfo is not already
102 in the cache. If you are not sure, use async_get_time_zone.
105 return zoneinfo.ZoneInfo(time_zone_str)
106 except zoneinfo.ZoneInfoNotFoundError:
111 """Get time zone from string. Return None if unable to determine.
116 return await _async_get_time_zone(time_zone_str)
117 except zoneinfo.ZoneInfoNotFoundError:
123 utcnow = partial(dt.datetime.now, UTC)
124 utcnow.__doc__ =
"Get now in UTC time."
127 def now(time_zone: dt.tzinfo |
None =
None) -> dt.datetime:
128 """Get now in specified time zone."""
129 return dt.datetime.now(time_zone
or DEFAULT_TIME_ZONE)
132 def as_utc(dattim: dt.datetime) -> dt.datetime:
133 """Return a datetime as UTC time.
135 Assumes datetime without tzinfo to be in the DEFAULT_TIME_ZONE.
137 if dattim.tzinfo == UTC:
139 if dattim.tzinfo
is None:
140 dattim = dattim.replace(tzinfo=DEFAULT_TIME_ZONE)
142 return dattim.astimezone(UTC)
146 """Convert a date/time into a unix time (seconds since 1970)."""
147 parsed_dt: dt.datetime |
None
148 if isinstance(dt_value, dt.datetime):
152 if parsed_dt
is None:
153 raise ValueError(
"not a valid date/time.")
154 return parsed_dt.timestamp()
158 """Convert a UTC datetime object to local time zone."""
159 if dattim.tzinfo == DEFAULT_TIME_ZONE:
161 if dattim.tzinfo
is None:
162 dattim = dattim.replace(tzinfo=DEFAULT_TIME_ZONE)
164 return dattim.astimezone(DEFAULT_TIME_ZONE)
169 utc_from_timestamp = partial(dt.datetime.fromtimestamp, tz=UTC)
170 """Return a UTC time from a timestamp."""
174 """Fast conversion of a datetime in UTC to a timestamp."""
178 (utc_dt.toordinal() - EPOCHORDINAL) * 86400
182 + (utc_dt.microsecond / 1000000)
187 """Return local datetime object of start of day from date or datetime."""
190 elif isinstance(dt_or_d, dt.datetime):
191 date = dt_or_d.date()
195 return dt.datetime.combine(date, dt.time(), tzinfo=DEFAULT_TIME_ZONE)
206 def parse_datetime(dt_str: str, *, raise_on_error: Literal[
True]) -> dt.datetime: ...
211 dt_str: str, *, raise_on_error: Literal[
False]
212 ) -> dt.datetime |
None: ...
215 def parse_datetime(dt_str: str, *, raise_on_error: bool =
False) -> dt.datetime |
None:
216 """Parse a string and return a datetime.datetime.
218 This function supports time zone offsets. When the input contains one,
219 the output uses a timezone with a fixed offset from UTC.
220 Raises ValueError if the input is well formatted but not a valid datetime.
222 If the input isn't well formatted, returns None if raise_on_error is False
223 or raises ValueError if it's True.
226 with suppress(ValueError, IndexError):
227 return ciso8601.parse_datetime(dt_str)
230 if not (match := DATETIME_RE.match(dt_str)):
234 kws: dict[str, Any] = match.groupdict()
235 if kws[
"microsecond"]:
236 kws[
"microsecond"] = kws[
"microsecond"].ljust(6,
"0")
237 tzinfo_str = kws.pop(
"tzinfo")
239 tzinfo: dt.tzinfo |
None =
None
240 if tzinfo_str ==
"Z":
242 elif tzinfo_str
is not None:
243 offset_mins =
int(tzinfo_str[-2:])
if len(tzinfo_str) > 3
else 0
244 offset_hours =
int(tzinfo_str[1:3])
245 offset = dt.timedelta(hours=offset_hours, minutes=offset_mins)
246 if tzinfo_str[0] ==
"-":
248 tzinfo = dt.timezone(offset)
249 kws = {k:
int(v)
for k, v
in kws.items()
if v
is not None}
250 kws[
"tzinfo"] = tzinfo
251 return dt.datetime(**kws)
255 """Convert a date string to a date object."""
257 return dt.datetime.strptime(dt_str, DATE_STR_FORMAT).
date()
266 """Parse a duration string and return a datetime.timedelta.
268 Also supports ISO 8601 representation and PostgreSQL's day-time interval
272 STANDARD_DURATION_RE.match(value)
273 or ISO8601_DURATION_RE.match(value)
274 or POSTGRES_INTERVAL_RE.match(value)
277 kws = match.groupdict()
278 sign = -1
if kws.pop(
"sign",
"+") ==
"-" else 1
279 if kws.get(
"microseconds"):
280 kws[
"microseconds"] = kws[
"microseconds"].ljust(6,
"0")
281 time_delta_args: dict[str, float] = {
282 k:
float(v.replace(
",",
"."))
for k, v
in kws.items()
if v
is not None
284 days = dt.timedelta(
float(time_delta_args.pop(
"days", 0.0)
or 0.0))
285 if match.re == ISO8601_DURATION_RE:
287 return days + sign * dt.timedelta(**time_delta_args)
292 """Parse a time string (00:20:00) into Time object.
294 Return None if invalid.
296 parts =
str(time_str).split(
":")
301 minute =
int(parts[1])
302 second =
int(parts[2])
if len(parts) > 2
else 0
303 return dt.time(hour, minute, second)
310 """Return a string representation of a time diff."""
312 def formatn(number: int, unit: str) -> str:
313 """Add "unit" if it's plural."""
316 return f
"{number:d} {unit}s "
321 units = (
"year",
"month",
"day",
"hour",
"minute",
"second")
323 factors = (365 * 24 * 60 * 60, 30 * 24 * 60 * 60, 24 * 60 * 60, 60 * 60, 60, 1)
325 result_string: str =
""
326 current_precision = 0
328 for i, current_factor
in enumerate(factors):
329 selected_unit = units[i]
330 if timediff < current_factor:
332 current_precision = current_precision + 1
333 if current_precision == precision:
335 result_string + formatn(round(timediff / current_factor), selected_unit)
337 curr_diff =
int(timediff // current_factor)
338 result_string += formatn(curr_diff, selected_unit)
339 timediff -= (curr_diff) * current_factor
341 return result_string.rstrip()
344 def get_age(date: dt.datetime, precision: int = 1) -> str:
345 """Take a datetime and return its "age" as a string.
347 The age can be in second, minute, hour, day, month and year.
349 depth number of units will be returned, with the last unit rounded
351 The date must be in the past or a ValueException will be raised.
354 delta = (
now() - date).total_seconds()
356 rounded_delta = round(delta)
358 if rounded_delta < 0:
359 raise ValueError(
"Time value is in the future")
364 """Take a datetime and return its "age" as a string.
366 The age can be in second, minute, hour, day, month and year.
368 depth number of units will be returned, with the last unit rounded
370 The date must be in the future or a ValueException will be raised.
373 delta = (date -
now()).total_seconds()
375 rounded_delta = round(delta)
377 if rounded_delta < 0:
378 raise ValueError(
"Time value is in the past")
384 """Parse the time expression part and return a list of times to match."""
385 if parameter
is None or parameter ==
"*":
386 res =
list(range(min_value, max_value + 1))
387 elif isinstance(parameter, str):
388 if parameter.startswith(
"/"):
389 parameter =
int(parameter[1:])
390 res = [x
for x
in range(min_value, max_value + 1)
if x % parameter == 0]
392 res = [
int(parameter)]
394 elif not hasattr(parameter,
"__iter__"):
395 res = [
int(parameter)]
397 res = sorted(
int(x)
for x
in parameter)
400 if val < min_value
or val > max_value:
402 f
"Time expression '{parameter}': parameter {val} out of range "
403 f
"({min_value} to {max_value})"
410 """Return the offset when crossing the DST barrier."""
411 delta = dt.timedelta(hours=24)
412 return (dattim + delta).utcoffset() - (
418 """Return the first value in arr greater or equal to cmp.
420 Return None if no such value exists.
422 if (left := bisect.bisect_left(arr, cmp)) == len(arr):
433 """Find the next datetime from now for which the time expression matches.
435 The algorithm looks at each time unit separately and tries to find the
436 next one that matches for each. If any of them would roll over, all
437 time units below that are reset to the first matching value.
439 Timezones are also handled (the tzinfo of the now object is used),
440 including daylight saving time.
442 if not seconds
or not minutes
or not hours:
443 raise ValueError(
"Cannot find a next time: Time expression never matches!")
448 result = now.replace(microsecond=0, fold=0)
451 if (next_second :=
_lower_bound(seconds, result.second))
is None:
453 next_second = seconds[0]
454 result += dt.timedelta(minutes=1)
456 if result.second != next_second:
457 result = result.replace(second=next_second)
461 if next_minute != result.minute:
463 result = result.replace(second=seconds[0])
465 if next_minute
is None:
467 next_minute = minutes[0]
468 result += dt.timedelta(hours=1)
470 if result.minute != next_minute:
471 result = result.replace(minute=next_minute)
475 if next_hour != result.hour:
477 result = result.replace(second=seconds[0], minute=minutes[0])
479 if next_hour
is None:
482 result += dt.timedelta(days=1)
484 if result.hour != next_hour:
485 result = result.replace(hour=next_hour)
487 if result.tzinfo
in (
None, UTC):
504 now += dt.timedelta(seconds=1)
526 return result.replace(fold=now.fold)
544 return check_result.replace(fold=1)
550 """Check if a datetime exists."""
551 assert dattim.tzinfo
is not None
552 original_tzinfo = dattim.tzinfo
554 return dattim == dattim.astimezone(UTC).astimezone(original_tzinfo)
558 """Check whether a datetime is ambiguous."""
559 assert dattim.tzinfo
is not None
560 opposite_fold = dattim.replace(fold=
not dattim.fold)
561 return _datetime_exists(dattim)
and dattim.utcoffset() != opposite_fold.utcoffset()
dt.datetime as_utc(dt.datetime dattim)
zoneinfo.ZoneInfo|None get_time_zone(str time_zone_str)
dt.date|None parse_date(str dt_str)
dt.datetime|None parse_datetime(str dt_str)
None set_default_time_zone(dt.tzinfo time_zone)
str get_age(dt.datetime date, int precision=1)
float utc_to_timestamp(dt.datetime utc_dt)
int|None _lower_bound(list[int] arr, int cmp)
dt.datetime start_of_local_day(dt.date|dt.datetime|None dt_or_d=None)
dt.datetime find_next_time_expression_time(dt.datetime now, list[int] seconds, list[int] minutes, list[int] hours)
dt.time|None parse_time(str time_str)
bool _datetime_exists(dt.datetime dattim)
dt.timedelta _dst_offset_diff(dt.datetime dattim)
float as_timestamp(dt.datetime|str dt_value)
bool _datetime_ambiguous(dt.datetime dattim)
dt.datetime as_local(dt.datetime dattim)
dt.tzinfo get_default_time_zone()
str _get_timestring(float timediff, int precision=1)
str get_time_remaining(dt.datetime date, int precision=1)
list[int] parse_time_expression(Any parameter, int min_value, int max_value)
dt.timedelta|None parse_duration(str value)
zoneinfo.ZoneInfo|None async_get_time_zone(str time_zone_str)
dt.datetime now(dt.tzinfo|None time_zone=None)