Home Assistant Unofficial Reference 2024.12.1
sensor.py
Go to the documentation of this file.
1 """Support for displaying IPs banned by fail2ban."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 import os
8 import re
9 
10 import voluptuous as vol
11 
13  PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
14  SensorEntity,
15 )
16 from homeassistant.const import CONF_FILE_PATH, CONF_NAME
17 from homeassistant.core import HomeAssistant
19 from homeassistant.helpers.entity_platform import AddEntitiesCallback
20 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
21 
22 _LOGGER = logging.getLogger(__name__)
23 
24 CONF_JAILS = "jails"
25 
26 DEFAULT_NAME = "fail2ban"
27 DEFAULT_LOG = "/var/log/fail2ban.log"
28 
29 STATE_CURRENT_BANS = "current_bans"
30 STATE_ALL_BANS = "total_bans"
31 SCAN_INTERVAL = timedelta(seconds=120)
32 
33 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
34  {
35  vol.Required(CONF_JAILS): vol.All(cv.ensure_list, vol.Length(min=1)),
36  vol.Optional(CONF_FILE_PATH): cv.isfile,
37  vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
38  }
39 )
40 
41 
43  hass: HomeAssistant,
44  config: ConfigType,
45  async_add_entities: AddEntitiesCallback,
46  discovery_info: DiscoveryInfoType | None = None,
47 ) -> None:
48  """Set up the fail2ban sensor."""
49  name = config[CONF_NAME]
50  jails = config[CONF_JAILS]
51  log_file = config.get(CONF_FILE_PATH, DEFAULT_LOG)
52 
53  log_parser = BanLogParser(log_file)
54 
55  async_add_entities((BanSensor(name, jail, log_parser) for jail in jails), True)
56 
57 
59  """Implementation of a fail2ban sensor."""
60 
61  def __init__(self, name, jail, log_parser):
62  """Initialize the sensor."""
63  self._name_name = f"{name} {jail}"
64  self.jailjail = jail
65  self.ban_dictban_dict = {STATE_CURRENT_BANS: [], STATE_ALL_BANS: []}
66  self.last_banlast_ban = None
67  self.log_parserlog_parser = log_parser
68  self.log_parserlog_parser.ip_regex[self.jailjail] = re.compile(
69  rf"\[{re.escape(self.jail)}\]\s*(Ban|Unban) (.*)"
70  )
71  _LOGGER.debug("Setting up jail %s", self.jailjail)
72 
73  @property
74  def name(self):
75  """Return the name of the sensor."""
76  return self._name_name
77 
78  @property
80  """Return the state attributes of the fail2ban sensor."""
81  return self.ban_dictban_dict
82 
83  @property
84  def native_value(self):
85  """Return the most recently banned IP Address."""
86  return self.last_banlast_ban
87 
88  def update(self) -> None:
89  """Update the list of banned ips."""
90  self.log_parserlog_parser.read_log(self.jailjail)
91 
92  if self.log_parserlog_parser.data:
93  for entry in self.log_parserlog_parser.data:
94  _LOGGER.debug(entry)
95  current_ip = entry[1]
96  if entry[0] == "Ban":
97  if current_ip not in self.ban_dictban_dict[STATE_CURRENT_BANS]:
98  self.ban_dictban_dict[STATE_CURRENT_BANS].append(current_ip)
99  if current_ip not in self.ban_dictban_dict[STATE_ALL_BANS]:
100  self.ban_dictban_dict[STATE_ALL_BANS].append(current_ip)
101  if len(self.ban_dictban_dict[STATE_ALL_BANS]) > 10:
102  self.ban_dictban_dict[STATE_ALL_BANS].pop(0)
103 
104  elif (
105  entry[0] == "Unban"
106  and current_ip in self.ban_dictban_dict[STATE_CURRENT_BANS]
107  ):
108  self.ban_dictban_dict[STATE_CURRENT_BANS].remove(current_ip)
109 
110  if self.ban_dictban_dict[STATE_CURRENT_BANS]:
111  self.last_banlast_ban = self.ban_dictban_dict[STATE_CURRENT_BANS][-1]
112  else:
113  self.last_banlast_ban = "None"
114 
115 
117  """Class to parse fail2ban logs."""
118 
119  def __init__(self, log_file):
120  """Initialize the parser."""
121  self.log_filelog_file = log_file
122  self.datadata = []
123  self.ip_regexip_regex = {}
124 
125  def read_log(self, jail):
126  """Read the fail2ban log and find entries for jail."""
127  self.datadata = []
128  try:
129  with open(self.log_filelog_file, encoding="utf-8") as file_data:
130  self.datadata = self.ip_regexip_regex[jail].findall(file_data.read())
131 
132  except (IndexError, FileNotFoundError, IsADirectoryError, UnboundLocalError):
133  _LOGGER.warning("File not present: %s", os.path.basename(self.log_filelog_file))
def __init__(self, name, jail, log_parser)
Definition: sensor.py:61
bool remove(self, _T matcher)
Definition: match.py:214
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
Definition: sensor.py:47
None open(self, **Any kwargs)
Definition: lock.py:86