1 """Support for Automation Device Specification (ADS)."""
4 from asyncio
import timeout
10 from .const
import STATE_KEY_STATE
11 from .hub
import AdsHub
13 _LOGGER = logging.getLogger(__name__)
17 """Representation of ADS entity."""
19 _attr_should_poll =
False
21 def __init__(self, ads_hub: AdsHub, name: str, ads_var: str) ->
None:
22 """Initialize ADS binary sensor."""
23 self._state_dict: dict[str, Any] = {}
24 self._state_dict[STATE_KEY_STATE] =
None
27 self.
_event_event: asyncio.Event |
None =
None
35 state_key: str = STATE_KEY_STATE,
36 factor: int |
None =
None,
38 """Register device notification."""
41 """Handle device notifications."""
42 _LOGGER.debug(
"Variable %s changed its value to %d", name, value)
45 self._state_dict[state_key] = value
47 self._state_dict[state_key] = value / factor
49 asyncio.run_coroutine_threadsafe(async_event_set(), self.
hasshass.loop)
52 async
def async_event_set():
53 """Set event in async context."""
58 await self.
hasshass.async_add_executor_job(
59 self.
_ads_hub_ads_hub.add_device_notification, ads_var, plctype, update
62 async
with timeout(10):
63 await self.
_event_event.wait()
65 _LOGGER.debug(
"Variable %s: Timeout during first update", ads_var)
69 """Return False if state has not been updated yet."""
70 return self._state_dict[STATE_KEY_STATE]
is not None
None async_initialize_device(self, str ads_var, type plctype, str state_key=STATE_KEY_STATE, int|None factor=None)
None __init__(self, AdsHub ads_hub, str name, str ads_var)
None schedule_update_ha_state(self, bool force_refresh=False)