1 """Helper functions for Homematicip Cloud Integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Coroutine
6 from functools
import wraps
9 from typing
import Any, Concatenate, TypeGuard
11 from homematicip.base.enums
import FunctionalChannelType
12 from homematicip.device
import Device
16 from .entity
import HomematicipGenericEntity
18 _LOGGER = logging.getLogger(__name__)
22 """Response from async call contains errors or not."""
23 if isinstance(response, dict):
24 return response.get(
"errorCode")
not in (
"",
None)
29 def handle_errors[_HomematicipGenericEntityT: HomematicipGenericEntity, **_P](
31 Concatenate[_HomematicipGenericEntityT, _P], Coroutine[Any, Any, Any]
33 ) -> Callable[Concatenate[_HomematicipGenericEntityT, _P], Coroutine[Any, Any, Any]]:
34 """Handle async errors."""
38 self: _HomematicipGenericEntityT, *args: _P.args, **kwargs: _P.kwargs
40 """Handle errors from async call."""
41 result = await func(self, *args, **kwargs)
44 "Error while execute function %s: %s",
49 f
"Error while execute function {func.__name__}: {result.get('errorCode')}. See log for more information."
56 """Get all channels matching with channel_type from device."""
59 for ch
in device.functionalChannels
60 if ch.functionalChannelType == channel_type
TypeGuard[dict[str, Any]] is_error_response(Any response)
def get_channels_from_device(Device device, FunctionalChannelType channel_type)