1 """Auth provider that validates credentials via an external command."""
3 from __future__
import annotations
6 from collections.abc
import Mapping
9 from typing
import Any, cast
11 import voluptuous
as vol
16 from ..models
import AuthFlowContext, AuthFlowResult, Credentials, UserMeta
17 from .
import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
22 CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
24 vol.Required(CONF_COMMAND): vol.All(
25 str, os.path.normpath, msg=
"must be an absolute path"
27 vol.Optional(CONF_ARGS, default=
None): vol.Any(vol.DefaultTo(list), [str]),
28 vol.Optional(CONF_META, default=
False): bool,
30 extra=vol.PREVENT_EXTRA,
33 _LOGGER = logging.getLogger(__name__)
37 """Raised when authentication with given credentials fails."""
40 @AUTH_PROVIDERS.register(
"command_line")
42 """Auth provider validating credentials by calling a command."""
44 DEFAULT_TITLE =
"Command Line Authentication"
53 def __init__(self, *args: Any, **kwargs: Any) ->
None:
54 """Extend parent's __init__.
56 Adds self._user_meta dictionary to hold the user-specific
57 attributes provided by external programs.
60 self._user_meta: dict[str, dict[str, Any]] = {}
63 """Return a flow to login."""
67 """Validate a username and password."""
68 env = {
"username": username,
"password": password}
70 process = await asyncio.create_subprocess_exec(
71 self.config[CONF_COMMAND],
72 *self.config[CONF_ARGS],
74 stdout=asyncio.subprocess.PIPE
if self.config[CONF_META]
else None,
77 stdout, _ = await process.communicate()
78 except OSError
as err:
80 _LOGGER.error(
"Error while authenticating %r: %s", username, err)
81 raise InvalidAuthError
from err
83 if process.returncode != 0:
85 "User %r failed to authenticate, command exited with code %d",
89 raise InvalidAuthError
91 if self.config[CONF_META]:
92 meta: dict[str, str] = {}
93 for _line
in stdout.splitlines():
95 line = _line.decode().lstrip()
99 if line.startswith(
"#")
or "=" not in line:
101 key, _, value = line.partition(
"=")
103 value = value.strip()
106 self._user_meta[username] = meta
109 self, flow_result: Mapping[str, str]
111 """Get credentials based on the flow result."""
112 username = flow_result[
"username"]
113 for credential
in await self.async_credentials():
114 if credential.data[
"username"] == username:
118 return self.async_create_credentials({
"username": username})
121 self, credentials: Credentials
123 """Return extra user metadata for credentials.
125 Currently, supports name, group and local_only.
127 meta = self._user_meta.
get(credentials.data[
"username"], {})
129 name=meta.get(
"name"),
131 group=meta.get(
"group"),
132 local_only=meta.get(
"local_only") ==
"true",
137 """Handler for the login flow."""
140 self, user_input: dict[str, str] |
None =
None
142 """Handle the step of the form."""
145 if user_input
is not None:
146 user_input[
"username"] = user_input[
"username"].strip()
149 CommandLineAuthProvider, self._auth_provider
150 ).async_validate_login(user_input[
"username"], user_input[
"password"])
151 except InvalidAuthError:
152 errors[
"base"] =
"invalid_auth"
155 user_input.pop(
"password")
156 return await self.async_finish(user_input)
158 return self.async_show_form(
160 data_schema=vol.Schema(
162 vol.Required(
"username"): str,
163 vol.Required(
"password"): str,
None async_validate_login(self, str username, str password)
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
None __init__(self, *Any args, **Any kwargs)
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
LoginFlow async_login_flow(self, AuthFlowContext|None context)
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
web.Response get(self, web.Request request, str config_key)