1 """Support for displaying IPs banned by fail2ban."""
3 from __future__
import annotations
5 from datetime
import timedelta
10 import voluptuous
as vol
13 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
22 _LOGGER = logging.getLogger(__name__)
26 DEFAULT_NAME =
"fail2ban"
27 DEFAULT_LOG =
"/var/log/fail2ban.log"
29 STATE_CURRENT_BANS =
"current_bans"
30 STATE_ALL_BANS =
"total_bans"
33 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
35 vol.Required(CONF_JAILS): vol.All(cv.ensure_list, vol.Length(min=1)),
36 vol.Optional(CONF_FILE_PATH): cv.isfile,
37 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
45 async_add_entities: AddEntitiesCallback,
46 discovery_info: DiscoveryInfoType |
None =
None,
48 """Set up the fail2ban sensor."""
49 name = config[CONF_NAME]
50 jails = config[CONF_JAILS]
51 log_file = config.get(CONF_FILE_PATH, DEFAULT_LOG)
59 """Implementation of a fail2ban sensor."""
62 """Initialize the sensor."""
63 self.
_name_name = f
"{name} {jail}"
65 self.
ban_dictban_dict = {STATE_CURRENT_BANS: [], STATE_ALL_BANS: []}
69 rf
"\[{re.escape(self.jail)}\]\s*(Ban|Unban) (.*)"
71 _LOGGER.debug(
"Setting up jail %s", self.
jailjail)
75 """Return the name of the sensor."""
76 return self.
_name_name
80 """Return the state attributes of the fail2ban sensor."""
85 """Return the most recently banned IP Address."""
89 """Update the list of banned ips."""
97 if current_ip
not in self.
ban_dictban_dict[STATE_CURRENT_BANS]:
98 self.
ban_dictban_dict[STATE_CURRENT_BANS].append(current_ip)
99 if current_ip
not in self.
ban_dictban_dict[STATE_ALL_BANS]:
100 self.
ban_dictban_dict[STATE_ALL_BANS].append(current_ip)
101 if len(self.
ban_dictban_dict[STATE_ALL_BANS]) > 10:
102 self.
ban_dictban_dict[STATE_ALL_BANS].pop(0)
106 and current_ip
in self.
ban_dictban_dict[STATE_CURRENT_BANS]
110 if self.
ban_dictban_dict[STATE_CURRENT_BANS]:
117 """Class to parse fail2ban logs."""
120 """Initialize the parser."""
126 """Read the fail2ban log and find entries for jail."""
129 with open(self.
log_filelog_file, encoding=
"utf-8")
as file_data:
130 self.
datadata = self.
ip_regexip_regex[jail].findall(file_data.read())
132 except (IndexError, FileNotFoundError, IsADirectoryError, UnboundLocalError):
133 _LOGGER.warning(
"File not present: %s", os.path.basename(self.
log_filelog_file))
def __init__(self, log_file)
def extra_state_attributes(self)
def __init__(self, name, jail, log_parser)
bool remove(self, _T matcher)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
None open(self, **Any kwargs)