1 """Support for Velbus devices."""
3 from __future__
import annotations
5 from collections.abc
import Awaitable, Callable, Coroutine
6 from functools
import wraps
7 from typing
import Any, Concatenate
9 from duotecno.unit
import BaseUnit
15 from .const
import DOMAIN
19 """Representation of a Duotecno entity."""
21 _attr_should_poll =
False
24 """Initialize a Duotecno entity."""
29 (DOMAIN,
str(unit.get_node_address())),
31 manufacturer=
"Duotecno",
32 name=unit.get_node_name(),
34 self.
_attr_unique_id_attr_unique_id = f
"{unit.get_node_address()}-{unit.get_number()}"
37 """When added to hass."""
41 """When a unit has an update."""
46 """Available state for the unit."""
50 def api_call[_T: DuotecnoEntity, **_P](
51 func: Callable[Concatenate[_T, _P], Awaitable[
None]],
52 ) -> Callable[Concatenate[_T, _P], Coroutine[Any, Any,
None]]:
53 """Catch command exceptions."""
56 async
def cmd_wrapper(self: _T, *args: _P.args, **kwargs: _P.kwargs) ->
None:
57 """Wrap all command methods."""
59 await func(self, *args, **kwargs)
60 except OSError
as exc:
62 f
"Error calling {func.__name__} on entity {self.entity_id}"
None async_added_to_hass(self)
None __init__(self, BaseUnit unit)
None async_write_ha_state(self)