1 """Ban logic for HTTP component."""
3 from __future__
import annotations
5 from collections
import defaultdict
6 from collections.abc
import Awaitable, Callable, Coroutine
7 from contextlib
import suppress
8 from datetime
import datetime
9 from http
import HTTPStatus
10 from ipaddress
import IPv4Address, IPv6Address, ip_address
12 from socket
import gethostbyaddr, herror
13 from typing
import Any, Concatenate, Final
15 from aiohttp.web
import (
23 from aiohttp.web_exceptions
import HTTPForbidden, HTTPUnauthorized
24 import voluptuous
as vol
33 from .const
import KEY_HASS
34 from .view
import HomeAssistantView
36 _LOGGER: Final = logging.getLogger(__name__)
38 KEY_BAN_MANAGER = AppKey[
"IpBanManager"](
"ha_banned_ips_manager")
39 KEY_FAILED_LOGIN_ATTEMPTS = AppKey[defaultdict[IPv4Address | IPv6Address, int]](
40 "ha_failed_login_attempts"
42 KEY_LOGIN_THRESHOLD = AppKey[int](
"ban_manager.ip_bans_lookup")
44 NOTIFICATION_ID_BAN: Final =
"ip-ban"
45 NOTIFICATION_ID_LOGIN: Final =
"http-login"
47 IP_BANS_FILE: Final =
"ip_bans.yaml"
48 ATTR_BANNED_AT: Final =
"banned_at"
50 SCHEMA_IP_BAN_ENTRY: Final = vol.Schema(
51 {vol.Optional(
"banned_at"): vol.Any(
None, cv.datetime)}
56 def setup_bans(hass: HomeAssistant, app: Application, login_threshold: int) ->
None:
57 """Create IP Ban middleware for the app."""
58 app.middlewares.append(ban_middleware)
59 app[KEY_FAILED_LOGIN_ATTEMPTS] = defaultdict[IPv4Address | IPv6Address, int](int)
60 app[KEY_LOGIN_THRESHOLD] = login_threshold
63 async
def ban_startup(app: Application) ->
None:
64 """Initialize bans when app starts up."""
67 app.on_startup.append(ban_startup)
72 request: Request, handler: Callable[[Request], Awaitable[StreamResponse]]
74 """IP Ban middleware."""
75 if (ban_manager := request.app.get(KEY_BAN_MANAGER))
is None:
76 _LOGGER.error(
"IP Ban middleware loaded but banned IPs not loaded")
77 return await handler(request)
79 if ip_bans_lookup := ban_manager.ip_bans_lookup:
81 ip_address_ = ip_address(request.remote)
82 if ip_address_
in ip_bans_lookup:
86 return await handler(request)
87 except HTTPUnauthorized:
92 def log_invalid_auth[_HassViewT: HomeAssistantView, **_P](
93 func: Callable[Concatenate[_HassViewT, Request, _P], Awaitable[Response]],
94 ) -> Callable[Concatenate[_HassViewT, Request, _P], Coroutine[Any, Any, Response]]:
95 """Decorate function to handle invalid auth or failed login attempts."""
98 view: _HassViewT, request: Request, *args: _P.args, **kwargs: _P.kwargs
100 """Try to log failed login attempts if response status >= BAD_REQUEST."""
101 resp = await func(view, request, *args, **kwargs)
102 if resp.status >= HTTPStatus.BAD_REQUEST:
110 """Process a wrong login attempt.
112 Increase failed login attempts counter for remote IP address.
113 Add ip ban entry if failed login attempts exceeds threshold.
115 hass = request.app[KEY_HASS]
117 assert request.remote
118 remote_addr = ip_address(request.remote)
119 remote_host = request.remote
120 with suppress(herror):
121 remote_host, _, _ = await hass.async_add_executor_job(
122 gethostbyaddr, request.remote
126 "Login attempt or request with invalid authentication from"
127 f
" {remote_host} ({remote_addr})."
131 user_agent = request.headers.get(
"user-agent")
132 log_msg = f
"{base_msg} Requested URL: '{request.rel_url}'. ({user_agent})"
134 notification_msg = f
"{base_msg} See the log for details."
136 _LOGGER.warning(log_msg)
142 persistent_notification.async_create(
143 hass, notification_msg,
"Login attempt failed", NOTIFICATION_ID_LOGIN
147 if KEY_BAN_MANAGER
not in request.app
or request.app[KEY_LOGIN_THRESHOLD] < 1:
150 request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr] += 1
157 request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr]
158 >= request.app[KEY_LOGIN_THRESHOLD]
160 ban_manager = request.app[KEY_BAN_MANAGER]
161 _LOGGER.warning(
"Banned IP %s for too many login attempts", remote_addr)
162 await ban_manager.async_add_ban(remote_addr)
164 persistent_notification.async_create(
166 f
"Too many login attempts from {remote_addr}",
167 "Banning IP address",
174 """Process a success login attempt.
176 Reset failed login attempts counter for remote IP address.
177 No release IP address from banned list function, it can only be done by
178 manual modify ip bans config file.
182 if KEY_BAN_MANAGER
not in app
or app[KEY_LOGIN_THRESHOLD] < 1:
185 remote_addr = ip_address(request.remote)
186 login_attempt_history = app[KEY_FAILED_LOGIN_ATTEMPTS]
187 if remote_addr
in login_attempt_history
and login_attempt_history[remote_addr] > 0:
189 "Login success, reset failed login attempts counter from %s", remote_addr
191 login_attempt_history.pop(remote_addr)
195 """Represents banned IP address."""
199 ip_ban: str | IPv4Address | IPv6Address,
200 banned_at: datetime |
None =
None,
202 """Initialize IP Ban object."""
208 """Manage IP bans."""
211 """Init the ban manager."""
213 self.
pathpath = hass.config.path(IP_BANS_FILE)
214 self.
ip_bans_lookupip_bans_lookup: dict[IPv4Address | IPv6Address, IpBan] = {}
217 """Load the existing IP bans."""
219 list_ = await self.
hasshass.async_add_executor_job(
220 load_yaml_config_file, self.
pathpath
222 except FileNotFoundError:
224 except HomeAssistantError
as err:
225 _LOGGER.error(
"Unable to load %s: %s", self.
pathpath,
str(err))
228 ip_bans_lookup: dict[IPv4Address | IPv6Address, IpBan] = {}
229 for ip_ban, ip_info
in list_.items():
231 ip_info = SCHEMA_IP_BAN_ENTRY(ip_info)
232 ban =
IpBan(ip_ban, ip_info[
"banned_at"])
233 ip_bans_lookup[ban.ip_address] = ban
234 except vol.Invalid
as err:
235 _LOGGER.error(
"Failed to load IP ban %s: %s", ip_info, err)
241 """Update config file with new banned IP address."""
242 with open(self.
pathpath,
"a", encoding=
"utf8")
as out:
244 str(ip_ban.ip_address): {ATTR_BANNED_AT: ip_ban.banned_at.isoformat()}
247 out.write(
"\n" + yaml.dump(ip_))
249 async
def async_add_ban(self, remote_addr: IPv4Address | IPv6Address) ->
None:
250 """Add a new IP address to the banned list."""
253 await self.
hasshass.async_add_executor_job(self.
_add_ban_add_ban, new_ban)
None __init__(self, HomeAssistant hass)
None _add_ban(self, IpBan ip_ban)
None async_add_ban(self, IPv4Address|IPv6Address remote_addr)
None __init__(self, str|IPv4Address|IPv6Address ip_ban, datetime|None banned_at=None)
bool is_hassio(HomeAssistant hass)
None process_wrong_login(Request request)
StreamResponse ban_middleware(Request request, Callable[[Request], Awaitable[StreamResponse]] handler)
None setup_bans(HomeAssistant hass, Application app, int login_threshold)
None process_success_login(Request request)
None open(self, **Any kwargs)
None async_load(HomeAssistant hass)