1 """Config flow for WeatherFlow."""
3 from __future__
import annotations
6 from asyncio
import Future
7 from asyncio.exceptions
import CancelledError
10 from pyweatherflowudp.client
import EVENT_DEVICE_DISCOVERED, WeatherFlowListener
11 from pyweatherflowudp.errors
import AddressInUseError, EndpointError, ListenerError
18 ERROR_MSG_ADDRESS_IN_USE,
19 ERROR_MSG_CANNOT_CONNECT,
20 ERROR_MSG_NO_DEVICE_FOUND,
25 """Return if there are devices that can be discovered."""
26 future_event: Future[
None] = asyncio.get_running_loop().create_future()
30 """Handle a discovered device - only need to do this once so."""
32 if not future_event.done():
33 future_event.set_result(
None)
35 async
with WeatherFlowListener()
as client, asyncio.timeout(10):
37 client.on(EVENT_DEVICE_DISCOVERED, _async_found)
46 """Handle a config flow for WeatherFlow."""
51 self, user_input: dict[str, Any] |
None =
None
52 ) -> ConfigFlowResult:
53 """Handle a flow initialized by the user."""
64 except AddressInUseError:
65 errors[
"base"] = ERROR_MSG_ADDRESS_IN_USE
66 except (ListenerError, EndpointError, CancelledError):
67 errors[
"base"] = ERROR_MSG_CANNOT_CONNECT
69 if not found
and not errors:
70 errors[
"base"] = ERROR_MSG_NO_DEVICE_FOUND
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
bool _async_can_discover_devices()