1 """Config flow for Govee light local."""
3 from __future__
import annotations
6 from contextlib
import suppress
9 from govee_local_api
import GoveeController
16 CONF_LISTENING_PORT_DEFAULT,
17 CONF_MULTICAST_ADDRESS_DEFAULT,
18 CONF_TARGET_PORT_DEFAULT,
23 _LOGGER = logging.getLogger(__name__)
27 """Return if there are devices that can be discovered."""
29 adapter = await network.async_get_source_ip(hass, network.PUBLIC_TARGET_IP)
31 controller: GoveeController = GoveeController(
34 listening_address=adapter,
35 broadcast_address=CONF_MULTICAST_ADDRESS_DEFAULT,
36 broadcast_port=CONF_TARGET_PORT_DEFAULT,
37 listening_port=CONF_LISTENING_PORT_DEFAULT,
38 discovery_enabled=
True,
44 await controller.start()
46 _LOGGER.error(
"Start failed, errno: %d", ex.errno)
50 async
with asyncio.timeout(delay=DISCOVERY_TIMEOUT):
51 while not controller.devices:
52 await asyncio.sleep(delay=1)
54 _LOGGER.debug(
"No devices found")
56 devices_count = len(controller.devices)
57 cleanup_complete: asyncio.Event = controller.cleanup()
58 with suppress(TimeoutError):
59 await asyncio.wait_for(cleanup_complete.wait(), 1)
61 return devices_count > 0
64 config_entry_flow.register_discovery_flow(
65 DOMAIN,
"Govee light local", _async_has_devices
bool _async_has_devices(HomeAssistant hass)