1 """Utilities for Plugwise."""
3 from collections.abc
import Awaitable, Callable, Coroutine
4 from typing
import Any, Concatenate
6 from plugwise.exceptions
import PlugwiseException
10 from .entity
import PlugwiseEntity
13 def plugwise_command[_PlugwiseEntityT: PlugwiseEntity, **_P, _R](
14 func: Callable[Concatenate[_PlugwiseEntityT, _P], Awaitable[_R]],
15 ) -> Callable[Concatenate[_PlugwiseEntityT, _P], Coroutine[Any, Any, _R]]:
16 """Decorate Plugwise calls that send commands/make changes to the device.
18 A decorator that wraps the passed in function, catches Plugwise errors,
19 and requests an coordinator update to update status of the devices asap.
23 self: _PlugwiseEntityT, *args: _P.args, **kwargs: _P.kwargs
26 return await func(self, *args, **kwargs)
27 except PlugwiseException
as error:
29 f
"Error communicating with API: {error}"
32 await self.coordinator.async_request_refresh()