1 """Base class for Cambridge Audio entities."""
3 from collections.abc
import Awaitable, Callable, Coroutine
4 from functools
import wraps
5 from typing
import Any, Concatenate
7 from aiostreammagic
import StreamMagicClient
8 from aiostreammagic.models
import CallbackType
14 from .const
import DOMAIN, STREAM_MAGIC_EXCEPTIONS
17 def command[_EntityT: CambridgeAudioEntity, **_P](
18 func: Callable[Concatenate[_EntityT, _P], Awaitable[
None]],
19 ) -> Callable[Concatenate[_EntityT, _P], Coroutine[Any, Any,
None]]:
20 """Wrap async calls to raise on request error."""
23 async
def decorator(self: _EntityT, *args: _P.args, **kwargs: _P.kwargs) ->
None:
24 """Wrap all command methods."""
26 await func(self, *args, **kwargs)
27 except STREAM_MAGIC_EXCEPTIONS
as exc:
29 translation_domain=DOMAIN,
30 translation_key=
"command_error",
31 translation_placeholders={
32 "function_name": func.__name__,
33 "entity_id": self.entity_id,
41 """Defines a base Cambridge Audio entity."""
43 _attr_has_entity_name =
True
45 def __init__(self, client: StreamMagicClient) ->
None:
46 """Initialize Cambridge Audio entity."""
49 identifiers={(DOMAIN, client.info.unit_id)},
50 name=client.info.name,
51 manufacturer=
"Cambridge Audio",
52 model=client.info.model,
53 serial_number=client.info.unit_id,
54 configuration_url=f
"http://{client.host}",
58 self, _client: StreamMagicClient, _callback_type: CallbackType
60 """Call when the device is notified of changes."""
65 """Register callback handlers."""
69 """Remove callbacks."""
None async_will_remove_from_hass(self)
None __init__(self, StreamMagicClient client)
None async_added_to_hass(self)
None _state_update_callback(self, StreamMagicClient _client, CallbackType _callback_type)
None async_write_ha_state(self)