1 """Adds config flow for MicroBot."""
3 from __future__
import annotations
8 from bleak.backends.device
import BLEDevice
10 MicroBotAdvertisement,
12 parse_advertisement_data,
15 import voluptuous
as vol
18 BluetoothServiceInfoBleak,
19 async_discovered_service_info,
24 from .const
import DOMAIN
26 _LOGGER: logging.Logger = logging.getLogger(__package__)
30 """Convert a Bluetooth address to a short address."""
31 results = address.replace(
"-",
":").split(
":")
32 return f
"{results[0].upper()}{results[1].upper()}"[0:4]
36 """Get the name from a discovery."""
37 return f
'{discovery.data["local_name"]} {short_address(discovery.address)}'
41 """Config flow for MicroBot."""
47 self._errors: dict[str, str] = {}
48 self.
_discovered_adv_discovered_adv: MicroBotAdvertisement |
None =
None
49 self._discovered_advs: dict[str, MicroBotAdvertisement] = {}
50 self.
_client_client: MicroBotApiClient |
None =
None
51 self.
_ble_device_ble_device: BLEDevice |
None =
None
52 self.
_name_name: str |
None =
None
53 self.
_bdaddr_bdaddr: str |
None =
None
56 self, discovery_info: BluetoothServiceInfoBleak
57 ) -> ConfigFlowResult:
58 """Handle the bluetooth discovery step."""
59 _LOGGER.debug(
"Discovered bluetooth device: %s", discovery_info)
63 parsed = parse_advertisement_data(
64 discovery_info.device, discovery_info.advertisement
67 self.context[
"title_placeholders"] = {
73 self, user_input: dict[str, Any] |
None =
None
74 ) -> ConfigFlowResult:
75 """Handle a flow initialized by the user."""
80 self, user_input: dict[str, Any] |
None =
None
81 ) -> ConfigFlowResult:
82 """Check if paired."""
83 errors: dict[str, str] = {}
86 self._discovered_advs[discovery.address] = discovery
91 address = discovery_info.address
92 if address
in current_addresses
or address
in self._discovered_advs:
94 parsed = parse_advertisement_data(
95 discovery_info.device, discovery_info.advertisement
99 self._discovered_advs[address] = parsed
101 if not self._discovered_advs:
104 if user_input
is not None:
106 self.
_bdaddr_bdaddr = user_input[CONF_ADDRESS]
113 data_schema=vol.Schema(
115 vol.Required(CONF_ADDRESS): vol.In(
117 address: f
"{parsed.data['local_name']} ({address})"
118 for address, parsed
in self._discovered_advs.items()
127 self, user_input: dict[str, Any] |
None =
None
128 ) -> ConfigFlowResult:
129 """Given a configured host, will ask the user to press the button to pair."""
130 errors: dict[str, str] = {}
136 assert self.
_client_client
is not None
137 if user_input
is None:
138 await self.
_client_client.connect(init=
True)
142 await self.
_client_client.connect(init=
False)
144 errors[
"base"] =
"linking"
146 await self.
_client_client.disconnect()
151 assert self.
_name_name
is not None
153 title=self.
_name_name,
156 CONF_ADDRESS: self.
_bdaddr_bdaddr,
157 CONF_ACCESS_TOKEN: token,
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_bluetooth(self, BluetoothServiceInfoBleak discovery_info)
ConfigFlowResult async_step_link(self, dict[str, Any]|None user_input=None)
None _abort_if_unique_id_configured(self, dict[str, Any]|None updates=None, bool reload_on_update=True, *str error="already_configured")
set[str|None] _async_current_ids(self, bool include_ignore=True)
ConfigEntry|None async_set_unique_id(self, str|None unique_id=None, *bool raise_on_progress=True)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
Iterable[BluetoothServiceInfoBleak] async_discovered_service_info(HomeAssistant hass, bool connectable=True)
str name_from_discovery(MicroBotAdvertisement discovery)
str short_address(str address)