Home Assistant Unofficial Reference 2024.12.1
entity.py
Go to the documentation of this file.
1 """Support for Automation Device Specification (ADS)."""
2 
3 import asyncio
4 from asyncio import timeout
5 import logging
6 from typing import Any
7 
8 from homeassistant.helpers.entity import Entity
9 
10 from .const import STATE_KEY_STATE
11 from .hub import AdsHub
12 
13 _LOGGER = logging.getLogger(__name__)
14 
15 
17  """Representation of ADS entity."""
18 
19  _attr_should_poll = False
20 
21  def __init__(self, ads_hub: AdsHub, name: str, ads_var: str) -> None:
22  """Initialize ADS binary sensor."""
23  self._state_dict: dict[str, Any] = {}
24  self._state_dict[STATE_KEY_STATE] = None
25  self._ads_hub_ads_hub = ads_hub
26  self._ads_var_ads_var = ads_var
27  self._event_event: asyncio.Event | None = None
28  self._attr_unique_id_attr_unique_id = ads_var
29  self._attr_name_attr_name = name
30 
32  self,
33  ads_var: str,
34  plctype: type,
35  state_key: str = STATE_KEY_STATE,
36  factor: int | None = None,
37  ) -> None:
38  """Register device notification."""
39 
40  def update(name, value):
41  """Handle device notifications."""
42  _LOGGER.debug("Variable %s changed its value to %d", name, value)
43 
44  if factor is None:
45  self._state_dict[state_key] = value
46  else:
47  self._state_dict[state_key] = value / factor
48 
49  asyncio.run_coroutine_threadsafe(async_event_set(), self.hasshass.loop)
50  self.schedule_update_ha_stateschedule_update_ha_state()
51 
52  async def async_event_set():
53  """Set event in async context."""
54  self._event_event.set()
55 
56  self._event_event = asyncio.Event()
57 
58  await self.hasshass.async_add_executor_job(
59  self._ads_hub_ads_hub.add_device_notification, ads_var, plctype, update
60  )
61  try:
62  async with timeout(10):
63  await self._event_event.wait()
64  except TimeoutError:
65  _LOGGER.debug("Variable %s: Timeout during first update", ads_var)
66 
67  @property
68  def available(self) -> bool:
69  """Return False if state has not been updated yet."""
70  return self._state_dict[STATE_KEY_STATE] is not None
None async_initialize_device(self, str ads_var, type plctype, str state_key=STATE_KEY_STATE, int|None factor=None)
Definition: entity.py:37
None __init__(self, AdsHub ads_hub, str name, str ads_var)
Definition: entity.py:21
None schedule_update_ha_state(self, bool force_refresh=False)
Definition: entity.py:1244