Home Assistant Unofficial Reference 2024.12.1
binary_sensor.py
Go to the documentation of this file.
1 """Sensor to indicate whether the current day is a workday."""
2 
3 from __future__ import annotations
4 
5 from datetime import date, datetime, timedelta
6 from typing import Final
7 
8 from holidays import (
9  PUBLIC,
10  HolidayBase,
11  __version__ as python_holidays_version,
12  country_holidays,
13 )
14 import voluptuous as vol
15 
16 from homeassistant.components.binary_sensor import BinarySensorEntity
17 from homeassistant.config_entries import ConfigEntry
18 from homeassistant.const import CONF_COUNTRY, CONF_LANGUAGE, CONF_NAME
19 from homeassistant.core import (
20  CALLBACK_TYPE,
21  HomeAssistant,
22  ServiceResponse,
23  SupportsResponse,
24  callback,
25 )
27 from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
29  AddEntitiesCallback,
30  async_get_current_platform,
31 )
32 from homeassistant.helpers.event import async_track_point_in_utc_time
33 from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
34 from homeassistant.util import dt as dt_util, slugify
35 
36 from .const import (
37  ALLOWED_DAYS,
38  CONF_ADD_HOLIDAYS,
39  CONF_CATEGORY,
40  CONF_EXCLUDES,
41  CONF_OFFSET,
42  CONF_PROVINCE,
43  CONF_REMOVE_HOLIDAYS,
44  CONF_WORKDAYS,
45  DOMAIN,
46  LOGGER,
47 )
48 
49 SERVICE_CHECK_DATE: Final = "check_date"
50 CHECK_DATE: Final = "check_date"
51 
52 
53 def validate_dates(holiday_list: list[str]) -> list[str]:
54  """Validate and adds to list of dates to add or remove."""
55  calc_holidays: list[str] = []
56  for add_date in holiday_list:
57  if add_date.find(",") > 0:
58  dates = add_date.split(",", maxsplit=1)
59  d1 = dt_util.parse_date(dates[0])
60  d2 = dt_util.parse_date(dates[1])
61  if d1 is None or d2 is None:
62  LOGGER.error("Incorrect dates in date range: %s", add_date)
63  continue
64  _range: timedelta = d2 - d1
65  for i in range(_range.days + 1):
66  day: date = d1 + timedelta(days=i)
67  calc_holidays.append(day.strftime("%Y-%m-%d"))
68  continue
69  calc_holidays.append(add_date)
70  return calc_holidays
71 
72 
74  country: str | None,
75  province: str | None,
76  year: int,
77  language: str | None,
78  categories: list[str] | None,
79 ) -> HolidayBase:
80  """Get the object for the requested country and year."""
81  if not country:
82  return HolidayBase()
83 
84  set_categories = None
85  if categories:
86  category_list = [PUBLIC]
87  category_list.extend(categories)
88  set_categories = tuple(category_list)
89 
90  obj_holidays: HolidayBase = country_holidays(
91  country,
92  subdiv=province,
93  years=[year, year + 1],
94  language=language,
95  categories=set_categories,
96  )
97  if (supported_languages := obj_holidays.supported_languages) and language == "en":
98  for lang in supported_languages:
99  if lang.startswith("en"):
100  obj_holidays = country_holidays(
101  country,
102  subdiv=province,
103  years=year,
104  language=lang,
105  categories=set_categories,
106  )
107  LOGGER.debug("Changing language from %s to %s", language, lang)
108  return obj_holidays
109 
110 
112  hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
113 ) -> None:
114  """Set up the Workday sensor."""
115  add_holidays: list[str] = entry.options[CONF_ADD_HOLIDAYS]
116  remove_holidays: list[str] = entry.options[CONF_REMOVE_HOLIDAYS]
117  country: str | None = entry.options.get(CONF_COUNTRY)
118  days_offset: int = int(entry.options[CONF_OFFSET])
119  excludes: list[str] = entry.options[CONF_EXCLUDES]
120  province: str | None = entry.options.get(CONF_PROVINCE)
121  sensor_name: str = entry.options[CONF_NAME]
122  workdays: list[str] = entry.options[CONF_WORKDAYS]
123  language: str | None = entry.options.get(CONF_LANGUAGE)
124  categories: list[str] | None = entry.options.get(CONF_CATEGORY)
125 
126  year: int = (dt_util.now() + timedelta(days=days_offset)).year
127  obj_holidays: HolidayBase = await hass.async_add_executor_job(
128  _get_obj_holidays, country, province, year, language, categories
129  )
130  calc_add_holidays: list[str] = validate_dates(add_holidays)
131  calc_remove_holidays: list[str] = validate_dates(remove_holidays)
132  next_year = dt_util.now().year + 1
133 
134  # Add custom holidays
135  try:
136  obj_holidays.append(calc_add_holidays) # type: ignore[arg-type]
137  except ValueError as error:
138  LOGGER.error("Could not add custom holidays: %s", error)
139 
140  # Remove holidays
141  for remove_holiday in calc_remove_holidays:
142  try:
143  # is this formatted as a date?
144  if dt_util.parse_date(remove_holiday):
145  # remove holiday by date
146  removed = obj_holidays.pop(remove_holiday)
147  LOGGER.debug("Removed %s", remove_holiday)
148  else:
149  # remove holiday by name
150  LOGGER.debug("Treating '%s' as named holiday", remove_holiday)
151  removed = obj_holidays.pop_named(remove_holiday)
152  for holiday in removed:
153  LOGGER.debug("Removed %s by name '%s'", holiday, remove_holiday)
154  except KeyError as unmatched:
155  LOGGER.warning("No holiday found matching %s", unmatched)
156  if _date := dt_util.parse_date(remove_holiday):
157  if _date.year <= next_year:
158  # Only check and raise issues for current and next year
160  hass,
161  DOMAIN,
162  f"bad_date_holiday-{entry.entry_id}-{slugify(remove_holiday)}",
163  is_fixable=True,
164  is_persistent=False,
165  severity=IssueSeverity.WARNING,
166  translation_key="bad_date_holiday",
167  translation_placeholders={
168  CONF_COUNTRY: country if country else "-",
169  "title": entry.title,
170  CONF_REMOVE_HOLIDAYS: remove_holiday,
171  },
172  data={
173  "entry_id": entry.entry_id,
174  "country": country,
175  "named_holiday": remove_holiday,
176  },
177  )
178  else:
180  hass,
181  DOMAIN,
182  f"bad_named_holiday-{entry.entry_id}-{slugify(remove_holiday)}",
183  is_fixable=True,
184  is_persistent=False,
185  severity=IssueSeverity.WARNING,
186  translation_key="bad_named_holiday",
187  translation_placeholders={
188  CONF_COUNTRY: country if country else "-",
189  "title": entry.title,
190  CONF_REMOVE_HOLIDAYS: remove_holiday,
191  },
192  data={
193  "entry_id": entry.entry_id,
194  "country": country,
195  "named_holiday": remove_holiday,
196  },
197  )
198 
199  LOGGER.debug("Found the following holidays for your configuration:")
200  for holiday_date, name in sorted(obj_holidays.items()):
201  # Make explicit str variable to avoid "Incompatible types in assignment"
202  _holiday_string = holiday_date.strftime("%Y-%m-%d")
203  LOGGER.debug("%s %s", _holiday_string, name)
204 
205  platform = async_get_current_platform()
206  platform.async_register_entity_service(
207  SERVICE_CHECK_DATE,
208  {vol.Required(CHECK_DATE): cv.date},
209  "check_date",
210  None,
211  SupportsResponse.ONLY,
212  )
213 
215  [
217  obj_holidays,
218  workdays,
219  excludes,
220  days_offset,
221  sensor_name,
222  entry.entry_id,
223  )
224  ],
225  )
226 
227 
229  """Implementation of a Workday sensor."""
230 
231  _attr_has_entity_name = True
232  _attr_name = None
233  _attr_translation_key = DOMAIN
234  _attr_should_poll = False
235  unsub: CALLBACK_TYPE | None = None
236 
237  def __init__(
238  self,
239  obj_holidays: HolidayBase,
240  workdays: list[str],
241  excludes: list[str],
242  days_offset: int,
243  name: str,
244  entry_id: str,
245  ) -> None:
246  """Initialize the Workday sensor."""
247  self._obj_holidays_obj_holidays = obj_holidays
248  self._workdays_workdays = workdays
249  self._excludes_excludes = excludes
250  self._days_offset_days_offset = days_offset
251  self._attr_extra_state_attributes_attr_extra_state_attributes = {
252  CONF_WORKDAYS: workdays,
253  CONF_EXCLUDES: excludes,
254  CONF_OFFSET: days_offset,
255  }
256  self._attr_unique_id_attr_unique_id = entry_id
257  self._attr_device_info_attr_device_info = DeviceInfo(
258  entry_type=DeviceEntryType.SERVICE,
259  identifiers={(DOMAIN, entry_id)},
260  manufacturer="python-holidays",
261  model=python_holidays_version,
262  name=name,
263  )
264 
265  def is_include(self, day: str, now: date) -> bool:
266  """Check if given day is in the includes list."""
267  if day in self._workdays_workdays:
268  return True
269  if "holiday" in self._workdays_workdays and now in self._obj_holidays_obj_holidays:
270  return True
271 
272  return False
273 
274  def is_exclude(self, day: str, now: date) -> bool:
275  """Check if given day is in the excludes list."""
276  if day in self._excludes_excludes:
277  return True
278  if "holiday" in self._excludes_excludes and now in self._obj_holidays_obj_holidays:
279  return True
280 
281  return False
282 
283  def get_next_interval(self, now: datetime) -> datetime:
284  """Compute next time an update should occur."""
285  tomorrow = dt_util.as_local(now) + timedelta(days=1)
286  return dt_util.start_of_local_day(tomorrow)
287 
289  """Update state and setup listener for next interval."""
290  now = dt_util.now()
291  self.update_dataupdate_data(now)
293  self.hasshass, self.point_in_time_listenerpoint_in_time_listener, self.get_next_intervalget_next_interval(now)
294  )
295 
296  @callback
297  def point_in_time_listener(self, time_date: datetime) -> None:
298  """Get the latest data and update state."""
299  self._update_state_and_setup_listener_update_state_and_setup_listener()
300  self.async_write_ha_stateasync_write_ha_state()
301 
302  async def async_added_to_hass(self) -> None:
303  """Set up first update."""
304  self._update_state_and_setup_listener_update_state_and_setup_listener()
305 
306  def update_data(self, now: datetime) -> None:
307  """Get date and look whether it is a holiday."""
308  self._attr_is_on_attr_is_on = self.date_is_workdaydate_is_workday(now)
309 
310  def check_date(self, check_date: date) -> ServiceResponse:
311  """Service to check if date is workday or not."""
312  return {"workday": self.date_is_workdaydate_is_workday(check_date)}
313 
314  def date_is_workday(self, check_date: date) -> bool:
315  """Check if date is workday."""
316  # Default is no workday
317  is_workday = False
318 
319  # Get ISO day of the week (1 = Monday, 7 = Sunday)
320  adjusted_date = check_date + timedelta(days=self._days_offset_days_offset)
321  day = adjusted_date.isoweekday() - 1
322  day_of_week = ALLOWED_DAYS[day]
323 
324  if self.is_includeis_include(day_of_week, adjusted_date):
325  is_workday = True
326 
327  if self.is_excludeis_exclude(day_of_week, adjusted_date):
328  is_workday = False
329 
330  return is_workday
None __init__(self, HolidayBase obj_holidays, list[str] workdays, list[str] excludes, int days_offset, str name, str entry_id)
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
HolidayBase _get_obj_holidays(str|None country, str|None province, int year, str|None language, list[str]|None categories)
list[str] validate_dates(list[str] holiday_list)
CALLBACK_TYPE async_track_point_in_utc_time(HomeAssistant hass, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action, datetime point_in_time)
Definition: event.py:1542