1 """Config flow for foscam integration."""
5 from libpyfoscam
import FoscamCamera
6 from libpyfoscam.foscam
import (
8 ERROR_FOSCAM_UNAVAILABLE,
11 import voluptuous
as vol
24 from .const
import CONF_RTSP_PORT, CONF_STREAM, DOMAIN, LOGGER
26 STREAMS = [
"Main",
"Sub"]
29 DEFAULT_RTSP_PORT = 554
32 DATA_SCHEMA = vol.Schema(
34 vol.Required(CONF_HOST): str,
35 vol.Required(CONF_PORT, default=DEFAULT_PORT): int,
36 vol.Required(CONF_USERNAME): str,
37 vol.Required(CONF_PASSWORD): str,
38 vol.Required(CONF_STREAM, default=STREAMS[0]): vol.In(STREAMS),
39 vol.Required(CONF_RTSP_PORT, default=DEFAULT_RTSP_PORT): int,
45 """Handle a config flow for foscam."""
50 """Validate the user input allows us to connect.
52 Data has the keys from DATA_SCHEMA with values provided by the user.
55 {CONF_HOST: data[CONF_HOST], CONF_PORT: data[CONF_PORT]}
58 camera = FoscamCamera(
67 ret, _ = await self.hass.async_add_executor_job(camera.get_product_all_info)
69 if ret == ERROR_FOSCAM_UNAVAILABLE:
72 if ret == ERROR_FOSCAM_AUTH:
75 if ret != FOSCAM_SUCCESS:
77 "Unexpected error code from camera %s:%s: %s",
85 ret, response = await self.hass.async_add_executor_job(camera.get_dev_info)
87 dev_name = response.get(
88 "devName", f
"Foscam {data[CONF_HOST]}:{data[CONF_PORT]}"
91 name = data.pop(CONF_NAME, dev_name)
96 self, user_input: dict[str, Any] |
None =
None
97 ) -> ConfigFlowResult:
98 """Handle the initial step."""
101 if user_input
is not None:
105 except CannotConnect:
106 errors[
"base"] =
"cannot_connect"
109 errors[
"base"] =
"invalid_auth"
111 except InvalidResponse:
112 errors[
"base"] =
"invalid_response"
118 LOGGER.exception(
"Unexpected exception")
119 errors[
"base"] =
"unknown"
122 step_id=
"user", data_schema=DATA_SCHEMA, errors=errors
127 """Error to indicate we cannot connect."""
130 class InvalidAuth(HomeAssistantError):
131 """Error to indicate there is invalid auth."""
135 """Error to indicate there is invalid response."""
def _validate_and_create(self, data)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)