Home Assistant Unofficial Reference 2024.12.1
mfa_setup_flow.py
Go to the documentation of this file.
1 """Helpers to setup multi-factor auth module."""
2 
3 from __future__ import annotations
4 
5 import logging
6 from typing import Any
7 
8 import voluptuous as vol
9 import voluptuous_serialize
10 
11 from homeassistant import data_entry_flow
12 from homeassistant.components import websocket_api
13 from homeassistant.core import HomeAssistant, callback
14 from homeassistant.data_entry_flow import FlowContext
16 from homeassistant.util.hass_dict import HassKey
17 
18 WS_TYPE_SETUP_MFA = "auth/setup_mfa"
19 SCHEMA_WS_SETUP_MFA = vol.All(
20  websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
21  {
22  vol.Required("type"): WS_TYPE_SETUP_MFA,
23  vol.Exclusive("mfa_module_id", "module_or_flow_id"): str,
24  vol.Exclusive("flow_id", "module_or_flow_id"): str,
25  vol.Optional("user_input"): object,
26  }
27  ),
28  cv.has_at_least_one_key("mfa_module_id", "flow_id"),
29 )
30 
31 WS_TYPE_DEPOSE_MFA = "auth/depose_mfa"
32 SCHEMA_WS_DEPOSE_MFA = websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
33  {vol.Required("type"): WS_TYPE_DEPOSE_MFA, vol.Required("mfa_module_id"): str}
34 )
35 
36 DATA_SETUP_FLOW_MGR: HassKey[MfaFlowManager] = HassKey("auth_mfa_setup_flow_manager")
37 
38 _LOGGER = logging.getLogger(__name__)
39 
40 
42  """Manage multi factor authentication flows."""
43 
44  async def async_create_flow( # type: ignore[override]
45  self,
46  handler_key: str,
47  *,
48  context: FlowContext | None,
49  data: dict[str, Any],
51  """Create a setup flow. handler is a mfa module."""
52  mfa_module = self.hasshass.auth.get_auth_mfa_module(handler_key)
53  if mfa_module is None:
54  raise ValueError(f"Mfa module {handler_key} is not found")
55 
56  user_id = data.pop("user_id")
57  return await mfa_module.async_setup_flow(user_id)
58 
59  async def async_finish_flow(
60  self, flow: data_entry_flow.FlowHandler, result: data_entry_flow.FlowResult
62  """Complete an mfa setup flow.
63 
64  This method is called when a flow step returns FlowResultType.ABORT or
65  FlowResultType.CREATE_ENTRY.
66  """
67  _LOGGER.debug("flow_result: %s", result)
68  return result
69 
70 
71 @callback
72 def async_setup(hass: HomeAssistant) -> None:
73  """Init mfa setup flow manager."""
74  hass.data[DATA_SETUP_FLOW_MGR] = MfaFlowManager(hass)
75 
76  websocket_api.async_register_command(
77  hass, WS_TYPE_SETUP_MFA, websocket_setup_mfa, SCHEMA_WS_SETUP_MFA
78  )
79 
80  websocket_api.async_register_command(
81  hass, WS_TYPE_DEPOSE_MFA, websocket_depose_mfa, SCHEMA_WS_DEPOSE_MFA
82  )
83 
84 
85 @callback
86 @websocket_api.ws_require_user(allow_system_user=False)
88  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
89 ) -> None:
90  """Return a setup flow for mfa auth module."""
91 
92  async def async_setup_flow(msg: dict[str, Any]) -> None:
93  """Return a setup flow for mfa auth module."""
94  flow_manager = hass.data[DATA_SETUP_FLOW_MGR]
95 
96  if (flow_id := msg.get("flow_id")) is not None:
97  result = await flow_manager.async_configure(flow_id, msg.get("user_input"))
98  connection.send_message(
99  websocket_api.result_message(msg["id"], _prepare_result_json(result))
100  )
101  return
102 
103  mfa_module_id = msg["mfa_module_id"]
104  if hass.auth.get_auth_mfa_module(mfa_module_id) is None:
105  connection.send_message(
106  websocket_api.error_message(
107  msg["id"], "no_module", f"MFA module {mfa_module_id} is not found"
108  )
109  )
110  return
111 
112  result = await flow_manager.async_init(
113  mfa_module_id, data={"user_id": connection.user.id}
114  )
115 
116  connection.send_message(
117  websocket_api.result_message(msg["id"], _prepare_result_json(result))
118  )
119 
120  hass.async_create_task(async_setup_flow(msg))
121 
122 
123 @callback
124 @websocket_api.ws_require_user(allow_system_user=False)
126  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
127 ) -> None:
128  """Remove user from mfa module."""
129 
130  async def async_depose(msg: dict[str, Any]) -> None:
131  """Remove user from mfa auth module."""
132  mfa_module_id = msg["mfa_module_id"]
133  try:
134  await hass.auth.async_disable_user_mfa(
135  connection.user, msg["mfa_module_id"]
136  )
137  except ValueError as err:
138  connection.send_message(
139  websocket_api.error_message(
140  msg["id"],
141  "disable_failed",
142  f"Cannot disable MFA Module {mfa_module_id}: {err}",
143  )
144  )
145  return
146 
147  connection.send_message(websocket_api.result_message(msg["id"], "done"))
148 
149  hass.async_create_task(async_depose(msg))
150 
151 
153  result: data_entry_flow.FlowResult,
155  """Convert result to JSON."""
156  if result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY:
157  return result.copy()
158 
159  if result["type"] != data_entry_flow.FlowResultType.FORM:
160  return result
161 
162  data = result.copy()
163 
164  if (schema := data["data_schema"]) is None:
165  data["data_schema"] = [] # type: ignore[typeddict-item] # json result type
166  else:
167  data["data_schema"] = voluptuous_serialize.convert(schema)
168 
169  return data
data_entry_flow.FlowHandler async_create_flow(self, str handler_key, *FlowContext|None context, dict[str, Any] data)
_FlowResultT async_finish_flow(self, FlowHandler[_FlowContextT, _FlowResultT, _HandlerT] flow, _FlowResultT result)
hass
None websocket_setup_mfa(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
data_entry_flow.FlowResult _prepare_result_json(data_entry_flow.FlowResult result)
None websocket_depose_mfa(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)