1 """Listeners for updating data in the Crownstone integration.
3 For data updates, Cloud Push is used in form of an SSE server that sends out events.
4 For fast device switching Local Push is used in form of a USB dongle that hooks into a BLE mesh.
7 from __future__
import annotations
9 from functools
import partial
10 from typing
import TYPE_CHECKING, cast
12 from crownstone_cloud.exceptions
import CrownstoneNotFoundError
13 from crownstone_core.packets.serviceDataParsers.containers.AdvExternalCrownstoneState
import (
14 AdvExternalCrownstoneState,
16 from crownstone_core.packets.serviceDataParsers.containers.elements.AdvTypes
import (
19 from crownstone_core.protocol.SwitchState
import SwitchState
20 from crownstone_sse.const
import (
22 EVENT_ABILITY_CHANGE_DIMMING,
23 EVENT_SWITCH_STATE_UPDATE,
25 from crownstone_sse.events
import AbilityChangeEvent, SwitchStateUpdateEvent
26 from crownstone_uart
import UartEventBus, UartTopics
27 from crownstone_uart.topics.SystemTopics
import SystemTopics
31 async_dispatcher_connect,
32 async_dispatcher_send,
38 SIG_CROWNSTONE_STATE_UPDATE,
39 SIG_UART_STATE_CHANGE,
45 from .entry_manager
import CrownstoneEntryManager
50 manager: CrownstoneEntryManager, switch_event: SwitchStateUpdateEvent
52 """Update the state of a Crownstone when switched externally."""
54 updated_crownstone = manager.cloud.get_crownstone_by_id(switch_event.cloud_id)
55 except CrownstoneNotFoundError:
59 if updated_crownstone.state != switch_event.switch_state:
60 updated_crownstone.state = switch_event.switch_state
66 manager: CrownstoneEntryManager, ability_event: AbilityChangeEvent
68 """Update the ability information of a Crownstone."""
70 updated_crownstone = manager.cloud.get_crownstone_by_id(ability_event.cloud_id)
71 except CrownstoneNotFoundError:
74 ability_type = ability_event.ability_type
75 ability_enabled = ability_event.ability_enabled
77 if updated_crownstone.abilities[ability_type].is_enabled == ability_enabled:
81 updated_crownstone.abilities[ability_type].is_enabled = ability_enabled
83 if ability_event.sub_type == EVENT_ABILITY_CHANGE_DIMMING:
85 manager.hass.async_create_task(
86 manager.hass.config_entries.async_reload(manager.config_entry.entry_id)
93 """Update the uart ready state for entities that use USB."""
99 manager: CrownstoneEntryManager, data: AdvExternalCrownstoneState
101 """Update the state of a Crownstone when switched externally."""
102 if data.type != AdvType.EXTERNAL_STATE:
105 updated_crownstone = manager.cloud.get_crownstone_by_uid(
106 data.crownstoneId, manager.usb_sphere_id
108 except CrownstoneNotFoundError:
111 if data.switchState
is None:
114 updated_state = cast(SwitchState, data.switchState)
115 if updated_crownstone.state != updated_state.intensity:
116 updated_crownstone.state = updated_state.intensity
122 """Set up SSE listeners."""
124 manager.listeners[SSE_LISTENERS] = [
127 f
"{DOMAIN}_{EVENT_SWITCH_STATE_UPDATE}",
128 partial(async_update_crwn_state_sse, manager),
132 f
"{DOMAIN}_{EVENT_ABILITY_CHANGE}",
133 partial(async_update_crwn_ability, manager),
139 """Set up UART listeners."""
141 manager.listeners[UART_LISTENERS] = [
142 UartEventBus.subscribe(
143 SystemTopics.connectionEstablished,
144 partial(update_uart_state, manager),
146 UartEventBus.subscribe(
147 SystemTopics.connectionClosed,
148 partial(update_uart_state, manager),
150 UartEventBus.subscribe(
151 UartTopics.newDataAvailable,
152 partial(update_crwn_state_uart, manager),
None async_update_crwn_state_sse(CrownstoneEntryManager manager, SwitchStateUpdateEvent switch_event)
None async_update_crwn_ability(CrownstoneEntryManager manager, AbilityChangeEvent ability_event)
None update_uart_state(CrownstoneEntryManager manager, bool|None _)
None setup_sse_listeners(CrownstoneEntryManager manager)
None setup_uart_listeners(CrownstoneEntryManager manager)
None update_crwn_state_uart(CrownstoneEntryManager manager, AdvExternalCrownstoneState data)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)