1 """Helpers to setup multi-factor auth module."""
3 from __future__
import annotations
8 import voluptuous
as vol
9 import voluptuous_serialize
11 from homeassistant
import data_entry_flow
18 WS_TYPE_SETUP_MFA =
"auth/setup_mfa"
19 SCHEMA_WS_SETUP_MFA = vol.All(
20 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
22 vol.Required(
"type"): WS_TYPE_SETUP_MFA,
23 vol.Exclusive(
"mfa_module_id",
"module_or_flow_id"): str,
24 vol.Exclusive(
"flow_id",
"module_or_flow_id"): str,
25 vol.Optional(
"user_input"): object,
28 cv.has_at_least_one_key(
"mfa_module_id",
"flow_id"),
31 WS_TYPE_DEPOSE_MFA =
"auth/depose_mfa"
32 SCHEMA_WS_DEPOSE_MFA = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
33 {vol.Required(
"type"): WS_TYPE_DEPOSE_MFA, vol.Required(
"mfa_module_id"): str}
36 DATA_SETUP_FLOW_MGR: HassKey[MfaFlowManager] =
HassKey(
"auth_mfa_setup_flow_manager")
38 _LOGGER = logging.getLogger(__name__)
42 """Manage multi factor authentication flows."""
48 context: FlowContext |
None,
51 """Create a setup flow. handler is a mfa module."""
52 mfa_module = self.
hasshass.auth.get_auth_mfa_module(handler_key)
53 if mfa_module
is None:
54 raise ValueError(f
"Mfa module {handler_key} is not found")
56 user_id = data.pop(
"user_id")
57 return await mfa_module.async_setup_flow(user_id)
62 """Complete an mfa setup flow.
64 This method is called when a flow step returns FlowResultType.ABORT or
65 FlowResultType.CREATE_ENTRY.
67 _LOGGER.debug(
"flow_result: %s", result)
73 """Init mfa setup flow manager."""
76 websocket_api.async_register_command(
77 hass, WS_TYPE_SETUP_MFA, websocket_setup_mfa, SCHEMA_WS_SETUP_MFA
80 websocket_api.async_register_command(
81 hass, WS_TYPE_DEPOSE_MFA, websocket_depose_mfa, SCHEMA_WS_DEPOSE_MFA
86 @websocket_api.ws_require_user(allow_system_user=False)
90 """Return a setup flow for mfa auth module."""
92 async
def async_setup_flow(msg: dict[str, Any]) ->
None:
93 """Return a setup flow for mfa auth module."""
94 flow_manager = hass.data[DATA_SETUP_FLOW_MGR]
96 if (flow_id := msg.get(
"flow_id"))
is not None:
97 result = await flow_manager.async_configure(flow_id, msg.get(
"user_input"))
98 connection.send_message(
103 mfa_module_id = msg[
"mfa_module_id"]
104 if hass.auth.get_auth_mfa_module(mfa_module_id)
is None:
105 connection.send_message(
106 websocket_api.error_message(
107 msg[
"id"],
"no_module", f
"MFA module {mfa_module_id} is not found"
112 result = await flow_manager.async_init(
113 mfa_module_id, data={
"user_id": connection.user.id}
116 connection.send_message(
120 hass.async_create_task(async_setup_flow(msg))
124 @websocket_api.ws_require_user(allow_system_user=False)
128 """Remove user from mfa module."""
130 async
def async_depose(msg: dict[str, Any]) ->
None:
131 """Remove user from mfa auth module."""
132 mfa_module_id = msg[
"mfa_module_id"]
134 await hass.auth.async_disable_user_mfa(
135 connection.user, msg[
"mfa_module_id"]
137 except ValueError
as err:
138 connection.send_message(
139 websocket_api.error_message(
142 f
"Cannot disable MFA Module {mfa_module_id}: {err}",
147 connection.send_message(websocket_api.result_message(msg[
"id"],
"done"))
149 hass.async_create_task(async_depose(msg))
153 result: data_entry_flow.FlowResult,
155 """Convert result to JSON."""
156 if result[
"type"] == data_entry_flow.FlowResultType.CREATE_ENTRY:
159 if result[
"type"] != data_entry_flow.FlowResultType.FORM:
164 if (schema := data[
"data_schema"])
is None:
165 data[
"data_schema"] = []
167 data[
"data_schema"] = voluptuous_serialize.convert(schema)
data_entry_flow.FlowHandler async_create_flow(self, str handler_key, *FlowContext|None context, dict[str, Any] data)
_FlowResultT async_finish_flow(self, FlowHandler[_FlowContextT, _FlowResultT, _HandlerT] flow, _FlowResultT result)
None websocket_setup_mfa(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
data_entry_flow.FlowResult _prepare_result_json(data_entry_flow.FlowResult result)
None websocket_depose_mfa(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
None async_setup(HomeAssistant hass)