Home Assistant Unofficial Reference 2024.12.1
command_line.py
Go to the documentation of this file.
1 """Auth provider that validates credentials via an external command."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Mapping
7 import logging
8 import os
9 from typing import Any, cast
10 
11 import voluptuous as vol
12 
13 from homeassistant.const import CONF_COMMAND
14 from homeassistant.exceptions import HomeAssistantError
15 
16 from ..models import AuthFlowContext, AuthFlowResult, Credentials, UserMeta
17 from . import AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, AuthProvider, LoginFlow
18 
19 CONF_ARGS = "args"
20 CONF_META = "meta"
21 
22 CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
23  {
24  vol.Required(CONF_COMMAND): vol.All(
25  str, os.path.normpath, msg="must be an absolute path"
26  ),
27  vol.Optional(CONF_ARGS, default=None): vol.Any(vol.DefaultTo(list), [str]),
28  vol.Optional(CONF_META, default=False): bool,
29  },
30  extra=vol.PREVENT_EXTRA,
31 )
32 
33 _LOGGER = logging.getLogger(__name__)
34 
35 
37  """Raised when authentication with given credentials fails."""
38 
39 
40 @AUTH_PROVIDERS.register("command_line")
41 class CommandLineAuthProvider(AuthProvider):
42  """Auth provider validating credentials by calling a command."""
43 
44  DEFAULT_TITLE = "Command Line Authentication"
45 
46  # which keys to accept from a program's stdout
47  ALLOWED_META_KEYS = (
48  "name",
49  "group",
50  "local_only",
51  )
52 
53  def __init__(self, *args: Any, **kwargs: Any) -> None:
54  """Extend parent's __init__.
55 
56  Adds self._user_meta dictionary to hold the user-specific
57  attributes provided by external programs.
58  """
59  super().__init__(*args, **kwargs)
60  self._user_meta: dict[str, dict[str, Any]] = {}
61 
62  async def async_login_flow(self, context: AuthFlowContext | None) -> LoginFlow:
63  """Return a flow to login."""
64  return CommandLineLoginFlow(self)
65 
66  async def async_validate_login(self, username: str, password: str) -> None:
67  """Validate a username and password."""
68  env = {"username": username, "password": password}
69  try:
70  process = await asyncio.create_subprocess_exec(
71  self.config[CONF_COMMAND],
72  *self.config[CONF_ARGS],
73  env=env,
74  stdout=asyncio.subprocess.PIPE if self.config[CONF_META] else None,
75  close_fds=False, # required for posix_spawn
76  )
77  stdout, _ = await process.communicate()
78  except OSError as err:
79  # happens when command doesn't exist or permission is denied
80  _LOGGER.error("Error while authenticating %r: %s", username, err)
81  raise InvalidAuthError from err
82 
83  if process.returncode != 0:
84  _LOGGER.error(
85  "User %r failed to authenticate, command exited with code %d",
86  username,
87  process.returncode,
88  )
89  raise InvalidAuthError
90 
91  if self.config[CONF_META]:
92  meta: dict[str, str] = {}
93  for _line in stdout.splitlines():
94  try:
95  line = _line.decode().lstrip()
96  except ValueError:
97  # malformed line
98  continue
99  if line.startswith("#") or "=" not in line:
100  continue
101  key, _, value = line.partition("=")
102  key = key.strip()
103  value = value.strip()
104  if key in self.ALLOWED_META_KEYSALLOWED_META_KEYS:
105  meta[key] = value
106  self._user_meta[username] = meta
107 
109  self, flow_result: Mapping[str, str]
110  ) -> Credentials:
111  """Get credentials based on the flow result."""
112  username = flow_result["username"]
113  for credential in await self.async_credentials():
114  if credential.data["username"] == username:
115  return credential
116 
117  # Create new credentials.
118  return self.async_create_credentials({"username": username})
119 
121  self, credentials: Credentials
122  ) -> UserMeta:
123  """Return extra user metadata for credentials.
124 
125  Currently, supports name, group and local_only.
126  """
127  meta = self._user_meta.get(credentials.data["username"], {})
128  return UserMeta(
129  name=meta.get("name"),
130  is_active=True,
131  group=meta.get("group"),
132  local_only=meta.get("local_only") == "true",
133  )
134 
135 
136 class CommandLineLoginFlow(LoginFlow):
137  """Handler for the login flow."""
138 
139  async def async_step_init(
140  self, user_input: dict[str, str] | None = None
141  ) -> AuthFlowResult:
142  """Handle the step of the form."""
143  errors = {}
144 
145  if user_input is not None:
146  user_input["username"] = user_input["username"].strip()
147  try:
148  await cast(
149  CommandLineAuthProvider, self._auth_provider
150  ).async_validate_login(user_input["username"], user_input["password"])
151  except InvalidAuthError:
152  errors["base"] = "invalid_auth"
153 
154  if not errors:
155  user_input.pop("password")
156  return await self.async_finish(user_input)
157 
158  return self.async_show_form(
159  step_id="init",
160  data_schema=vol.Schema(
161  {
162  vol.Required("username"): str,
163  vol.Required("password"): str,
164  }
165  ),
166  errors=errors,
167  )
None async_validate_login(self, str username, str password)
Definition: command_line.py:66
UserMeta async_user_meta_for_credentials(self, Credentials credentials)
Credentials async_get_or_create_credentials(self, Mapping[str, str] flow_result)
LoginFlow async_login_flow(self, AuthFlowContext|None context)
Definition: command_line.py:62
AuthFlowResult async_step_init(self, dict[str, str]|None user_input=None)
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88