Home Assistant Unofficial Reference 2024.12.1
ban.py
Go to the documentation of this file.
1 """Ban logic for HTTP component."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from collections.abc import Awaitable, Callable, Coroutine
7 from contextlib import suppress
8 from datetime import datetime
9 from http import HTTPStatus
10 from ipaddress import IPv4Address, IPv6Address, ip_address
11 import logging
12 from socket import gethostbyaddr, herror
13 from typing import Any, Concatenate, Final
14 
15 from aiohttp.web import (
16  AppKey,
17  Application,
18  Request,
19  Response,
20  StreamResponse,
21  middleware,
22 )
23 from aiohttp.web_exceptions import HTTPForbidden, HTTPUnauthorized
24 import voluptuous as vol
25 
26 from homeassistant.config import load_yaml_config_file
27 from homeassistant.core import HomeAssistant, callback
28 from homeassistant.exceptions import HomeAssistantError
30 from homeassistant.helpers.hassio import get_supervisor_ip, is_hassio
31 from homeassistant.util import dt as dt_util, yaml
32 
33 from .const import KEY_HASS
34 from .view import HomeAssistantView
35 
36 _LOGGER: Final = logging.getLogger(__name__)
37 
38 KEY_BAN_MANAGER = AppKey["IpBanManager"]("ha_banned_ips_manager")
39 KEY_FAILED_LOGIN_ATTEMPTS = AppKey[defaultdict[IPv4Address | IPv6Address, int]](
40  "ha_failed_login_attempts"
41 )
42 KEY_LOGIN_THRESHOLD = AppKey[int]("ban_manager.ip_bans_lookup")
43 
44 NOTIFICATION_ID_BAN: Final = "ip-ban"
45 NOTIFICATION_ID_LOGIN: Final = "http-login"
46 
47 IP_BANS_FILE: Final = "ip_bans.yaml"
48 ATTR_BANNED_AT: Final = "banned_at"
49 
50 SCHEMA_IP_BAN_ENTRY: Final = vol.Schema(
51  {vol.Optional("banned_at"): vol.Any(None, cv.datetime)}
52 )
53 
54 
55 @callback
56 def setup_bans(hass: HomeAssistant, app: Application, login_threshold: int) -> None:
57  """Create IP Ban middleware for the app."""
58  app.middlewares.append(ban_middleware)
59  app[KEY_FAILED_LOGIN_ATTEMPTS] = defaultdict[IPv4Address | IPv6Address, int](int)
60  app[KEY_LOGIN_THRESHOLD] = login_threshold
61  app[KEY_BAN_MANAGER] = IpBanManager(hass)
62 
63  async def ban_startup(app: Application) -> None:
64  """Initialize bans when app starts up."""
65  await app[KEY_BAN_MANAGER].async_load()
66 
67  app.on_startup.append(ban_startup)
68 
69 
70 @middleware
71 async def ban_middleware(
72  request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
73 ) -> StreamResponse:
74  """IP Ban middleware."""
75  if (ban_manager := request.app.get(KEY_BAN_MANAGER)) is None:
76  _LOGGER.error("IP Ban middleware loaded but banned IPs not loaded")
77  return await handler(request)
78 
79  if ip_bans_lookup := ban_manager.ip_bans_lookup:
80  # Verify if IP is not banned
81  ip_address_ = ip_address(request.remote) # type: ignore[arg-type]
82  if ip_address_ in ip_bans_lookup:
83  raise HTTPForbidden
84 
85  try:
86  return await handler(request)
87  except HTTPUnauthorized:
88  await process_wrong_login(request)
89  raise
90 
91 
92 def log_invalid_auth[_HassViewT: HomeAssistantView, **_P](
93  func: Callable[Concatenate[_HassViewT, Request, _P], Awaitable[Response]],
94 ) -> Callable[Concatenate[_HassViewT, Request, _P], Coroutine[Any, Any, Response]]:
95  """Decorate function to handle invalid auth or failed login attempts."""
96 
97  async def handle_req(
98  view: _HassViewT, request: Request, *args: _P.args, **kwargs: _P.kwargs
99  ) -> Response:
100  """Try to log failed login attempts if response status >= BAD_REQUEST."""
101  resp = await func(view, request, *args, **kwargs)
102  if resp.status >= HTTPStatus.BAD_REQUEST:
103  await process_wrong_login(request)
104  return resp
105 
106  return handle_req
107 
108 
109 async def process_wrong_login(request: Request) -> None:
110  """Process a wrong login attempt.
111 
112  Increase failed login attempts counter for remote IP address.
113  Add ip ban entry if failed login attempts exceeds threshold.
114  """
115  hass = request.app[KEY_HASS]
116 
117  assert request.remote
118  remote_addr = ip_address(request.remote)
119  remote_host = request.remote
120  with suppress(herror):
121  remote_host, _, _ = await hass.async_add_executor_job(
122  gethostbyaddr, request.remote
123  )
124 
125  base_msg = (
126  "Login attempt or request with invalid authentication from"
127  f" {remote_host} ({remote_addr})."
128  )
129 
130  # The user-agent is unsanitized input so we only include it in the log
131  user_agent = request.headers.get("user-agent")
132  log_msg = f"{base_msg} Requested URL: '{request.rel_url}'. ({user_agent})"
133 
134  notification_msg = f"{base_msg} See the log for details."
135 
136  _LOGGER.warning(log_msg)
137 
138  # Circular import with websocket_api
139  # pylint: disable=import-outside-toplevel
140  from homeassistant.components import persistent_notification
141 
142  persistent_notification.async_create(
143  hass, notification_msg, "Login attempt failed", NOTIFICATION_ID_LOGIN
144  )
145 
146  # Check if ban middleware is loaded
147  if KEY_BAN_MANAGER not in request.app or request.app[KEY_LOGIN_THRESHOLD] < 1:
148  return
149 
150  request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr] += 1
151 
152  # Supervisor IP should never be banned
153  if is_hassio(hass) and str(remote_addr) == get_supervisor_ip():
154  return
155 
156  if (
157  request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr]
158  >= request.app[KEY_LOGIN_THRESHOLD]
159  ):
160  ban_manager = request.app[KEY_BAN_MANAGER]
161  _LOGGER.warning("Banned IP %s for too many login attempts", remote_addr)
162  await ban_manager.async_add_ban(remote_addr)
163 
164  persistent_notification.async_create(
165  hass,
166  f"Too many login attempts from {remote_addr}",
167  "Banning IP address",
168  NOTIFICATION_ID_BAN,
169  )
170 
171 
172 @callback
173 def process_success_login(request: Request) -> None:
174  """Process a success login attempt.
175 
176  Reset failed login attempts counter for remote IP address.
177  No release IP address from banned list function, it can only be done by
178  manual modify ip bans config file.
179  """
180  app = request.app
181  # Check if ban middleware is loaded
182  if KEY_BAN_MANAGER not in app or app[KEY_LOGIN_THRESHOLD] < 1:
183  return
184 
185  remote_addr = ip_address(request.remote) # type: ignore[arg-type]
186  login_attempt_history = app[KEY_FAILED_LOGIN_ATTEMPTS]
187  if remote_addr in login_attempt_history and login_attempt_history[remote_addr] > 0:
188  _LOGGER.debug(
189  "Login success, reset failed login attempts counter from %s", remote_addr
190  )
191  login_attempt_history.pop(remote_addr)
192 
193 
194 class IpBan:
195  """Represents banned IP address."""
196 
197  def __init__(
198  self,
199  ip_ban: str | IPv4Address | IPv6Address,
200  banned_at: datetime | None = None,
201  ) -> None:
202  """Initialize IP Ban object."""
203  self.ip_addressip_address = ip_address(ip_ban)
204  self.banned_atbanned_at = banned_at or dt_util.utcnow()
205 
206 
208  """Manage IP bans."""
209 
210  def __init__(self, hass: HomeAssistant) -> None:
211  """Init the ban manager."""
212  self.hasshass = hass
213  self.pathpath = hass.config.path(IP_BANS_FILE)
214  self.ip_bans_lookupip_bans_lookup: dict[IPv4Address | IPv6Address, IpBan] = {}
215 
216  async def async_load(self) -> None:
217  """Load the existing IP bans."""
218  try:
219  list_ = await self.hasshass.async_add_executor_job(
220  load_yaml_config_file, self.pathpath
221  )
222  except FileNotFoundError:
223  return
224  except HomeAssistantError as err:
225  _LOGGER.error("Unable to load %s: %s", self.pathpath, str(err))
226  return
227 
228  ip_bans_lookup: dict[IPv4Address | IPv6Address, IpBan] = {}
229  for ip_ban, ip_info in list_.items():
230  try:
231  ip_info = SCHEMA_IP_BAN_ENTRY(ip_info)
232  ban = IpBan(ip_ban, ip_info["banned_at"])
233  ip_bans_lookup[ban.ip_address] = ban
234  except vol.Invalid as err:
235  _LOGGER.error("Failed to load IP ban %s: %s", ip_info, err)
236  continue
237 
238  self.ip_bans_lookupip_bans_lookup = ip_bans_lookup
239 
240  def _add_ban(self, ip_ban: IpBan) -> None:
241  """Update config file with new banned IP address."""
242  with open(self.pathpath, "a", encoding="utf8") as out:
243  ip_ = {
244  str(ip_ban.ip_address): {ATTR_BANNED_AT: ip_ban.banned_at.isoformat()}
245  }
246  # Write in a single write call to avoid interleaved writes
247  out.write("\n" + yaml.dump(ip_))
248 
249  async def async_add_ban(self, remote_addr: IPv4Address | IPv6Address) -> None:
250  """Add a new IP address to the banned list."""
251  if remote_addr not in self.ip_bans_lookupip_bans_lookup:
252  new_ban = self.ip_bans_lookupip_bans_lookup[remote_addr] = IpBan(remote_addr)
253  await self.hasshass.async_add_executor_job(self._add_ban_add_ban, new_ban)
None __init__(self, HomeAssistant hass)
Definition: ban.py:210
None _add_ban(self, IpBan ip_ban)
Definition: ban.py:240
None async_add_ban(self, IPv4Address|IPv6Address remote_addr)
Definition: ban.py:249
None __init__(self, str|IPv4Address|IPv6Address ip_ban, datetime|None banned_at=None)
Definition: ban.py:201
bool is_hassio(HomeAssistant hass)
Definition: __init__.py:302
None process_wrong_login(Request request)
Definition: ban.py:109
StreamResponse ban_middleware(Request request, Callable[[Request], Awaitable[StreamResponse]] handler)
Definition: ban.py:73
None setup_bans(HomeAssistant hass, Application app, int login_threshold)
Definition: ban.py:56
None process_success_login(Request request)
Definition: ban.py:173
None open(self, **Any kwargs)
Definition: lock.py:86
None async_load(HomeAssistant hass)