Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for system log."""
2 
3 from __future__ import annotations
4 
5 from collections import OrderedDict, deque
6 import logging
7 import re
8 import sys
9 import traceback
10 from types import FrameType
11 from typing import Any, cast
12 
13 import voluptuous as vol
14 
15 from homeassistant import __path__ as HOMEASSISTANT_PATH
16 from homeassistant.components import websocket_api
17 from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE
18 from homeassistant.core import Event, HomeAssistant, ServiceCall, callback
20 from homeassistant.helpers.typing import ConfigType
21 
22 type KeyType = tuple[str, tuple[str, int], tuple[str, int, str] | None]
23 
24 CONF_MAX_ENTRIES = "max_entries"
25 CONF_FIRE_EVENT = "fire_event"
26 CONF_MESSAGE = "message"
27 CONF_LEVEL = "level"
28 CONF_LOGGER = "logger"
29 
30 DATA_SYSTEM_LOG = "system_log"
31 DEFAULT_MAX_ENTRIES = 50
32 DEFAULT_FIRE_EVENT = False
33 DOMAIN = "system_log"
34 
35 EVENT_SYSTEM_LOG = "system_log_event"
36 
37 SERVICE_CLEAR = "clear"
38 SERVICE_WRITE = "write"
39 
40 CONFIG_SCHEMA = vol.Schema(
41  {
42  DOMAIN: vol.Schema(
43  {
44  vol.Optional(
45  CONF_MAX_ENTRIES, default=DEFAULT_MAX_ENTRIES
46  ): cv.positive_int,
47  vol.Optional(CONF_FIRE_EVENT, default=DEFAULT_FIRE_EVENT): cv.boolean,
48  }
49  )
50  },
51  extra=vol.ALLOW_EXTRA,
52 )
53 
54 SERVICE_CLEAR_SCHEMA = vol.Schema({})
55 SERVICE_WRITE_SCHEMA = vol.Schema(
56  {
57  vol.Required(CONF_MESSAGE): cv.string,
58  vol.Optional(CONF_LEVEL, default="error"): vol.In(
59  ["debug", "info", "warning", "error", "critical"]
60  ),
61  vol.Optional(CONF_LOGGER): cv.string,
62  }
63 )
64 
65 
67  record: logging.LogRecord,
68  paths_re: re.Pattern[str],
69  extracted_tb: list[tuple[FrameType, int]] | None = None,
70 ) -> tuple[str, int]:
71  """Figure out where a log message came from."""
72  # If a stack trace exists, extract file names from the entire call stack.
73  # The other case is when a regular "log" is made (without an attached
74  # exception). In that case, just use the file where the log was made from.
75  if record.exc_info:
76  source: list[tuple[FrameType, int]] = extracted_tb or list(
77  traceback.walk_tb(record.exc_info[2])
78  )
79  stack = [
80  (tb_frame.f_code.co_filename, tb_line_no) for tb_frame, tb_line_no in source
81  ]
82  for i, (filename, _) in enumerate(stack):
83  # Slice the stack to the first frame that matches
84  # the record pathname.
85  if filename == record.pathname:
86  stack = stack[0 : i + 1]
87  break
88  # Iterate through the stack call (in reverse) and find the last call from
89  # a file in Home Assistant. Try to figure out where error happened.
90  for path, line_number in reversed(stack):
91  # Try to match with a file within Home Assistant
92  if match := paths_re.match(path):
93  return (cast(str, match.group(1)), line_number)
94  else:
95  #
96  # We need to figure out where the log call came from if we
97  # don't have an exception.
98  #
99  # We do this by walking up the stack until we find the first
100  # frame match the record pathname so the code below
101  # can be used to reverse the remaining stack frames
102  # and find the first one that is from a file within Home Assistant.
103  #
104  # We do not call traceback.extract_stack() because it is
105  # it makes many stat() syscalls calls which do blocking I/O,
106  # and since this code is running in the event loop, we need to avoid
107  # blocking I/O.
108 
109  frame = sys._getframe(4) # noqa: SLF001
110  #
111  # We use _getframe with 4 to skip the following frames:
112  #
113  # Jump 2 frames up to get to the actual caller
114  # since we are in a function, and always called from another function
115  # that are never the original source of the log message.
116  #
117  # Next try to skip any frames that are from the logging module
118  # We know that the logger module typically has 5 frames itself
119  # but it may change in the future so we are conservative and
120  # only skip 2.
121  #
122  # _getframe is cpython only but we are already using cpython specific
123  # code everywhere in HA so it's fine as its unlikely we will ever
124  # support other python implementations.
125  #
126  # Iterate through the stack call (in reverse) and find the last call from
127  # a file in Home Assistant. Try to figure out where error happened.
128  while back := frame.f_back:
129  if match := paths_re.match(frame.f_code.co_filename):
130  return (cast(str, match.group(1)), frame.f_lineno)
131  frame = back
132 
133  # Ok, we don't know what this is
134  return (record.pathname, record.lineno)
135 
136 
137 def _safe_get_message(record: logging.LogRecord) -> str:
138  """Get message from record and handle exceptions.
139 
140  This code will be unreachable during a pytest run
141  because pytest installs a logging handler that
142  will prevent this code from being reached.
143 
144  Calling record.getMessage() can raise an exception
145  if the log message does not contain sufficient arguments.
146 
147  As there is no guarantees about which exceptions
148  that can be raised, we catch all exceptions and
149  return a generic message.
150 
151  This must be manually tested when changing the code.
152  """
153  try:
154  return record.getMessage()
155  except Exception as ex: # noqa: BLE001
156  try:
157  return f"Bad logger message: {record.msg} ({record.args})"
158  except Exception: # noqa: BLE001
159  return f"Bad logger message: {ex}"
160 
161 
162 class LogEntry:
163  """Store HA log entries."""
164 
165  __slots__ = (
166  "first_occurred",
167  "timestamp",
168  "name",
169  "level",
170  "message",
171  "exception",
172  "root_cause",
173  "source",
174  "count",
175  "key",
176  )
177 
178  def __init__(
179  self,
180  record: logging.LogRecord,
181  paths_re: re.Pattern,
182  formatter: logging.Formatter | None = None,
183  figure_out_source: bool = False,
184  ) -> None:
185  """Initialize a log entry."""
186  self.first_occurredfirst_occurred = self.timestamptimestamp = record.created
187  self.namename = record.name
188  self.levellevel = record.levelname
189  # See the docstring of _safe_get_message for why we need to do this.
190  # This must be manually tested when changing the code.
191  self.messagemessage = deque([_safe_get_message(record)], maxlen=5)
192  self.exceptionexception = ""
193  self.root_causeroot_cause: tuple[str, int, str] | None = None
194  extracted_tb: list[tuple[FrameType, int]] | None = None
195  if record.exc_info:
196  if formatter and record.exc_text is None:
197  record.exc_text = formatter.formatException(record.exc_info)
198  self.exceptionexception = record.exc_text or ""
199  if extracted := list(traceback.walk_tb(record.exc_info[2])):
200  # Last line of traceback contains the root cause of the exception
201  extracted_tb = extracted
202  tb_frame, tb_line_no = extracted[-1]
203  self.root_causeroot_cause = (
204  tb_frame.f_code.co_filename,
205  tb_line_no,
206  tb_frame.f_code.co_name,
207  )
208  if figure_out_source:
209  self.sourcesource = _figure_out_source(record, paths_re, extracted_tb)
210  else:
211  self.sourcesource = (record.pathname, record.lineno)
212  self.countcount = 1
213  self.keykey = (self.namename, self.sourcesource, self.root_causeroot_cause)
214 
215  def to_dict(self) -> dict[str, Any]:
216  """Convert object into dict to maintain backward compatibility."""
217  return {
218  "name": self.namename,
219  "message": list(self.messagemessage),
220  "level": self.levellevel,
221  "source": self.sourcesource,
222  "timestamp": self.timestamptimestamp,
223  "exception": self.exceptionexception,
224  "count": self.countcount,
225  "first_occurred": self.first_occurredfirst_occurred,
226  }
227 
228 
229 class DedupStore(OrderedDict[KeyType, LogEntry]):
230  """Data store to hold max amount of deduped entries."""
231 
232  def __init__(self, maxlen: int = 50) -> None:
233  """Initialize a new DedupStore."""
234  super().__init__()
235  self.maxlenmaxlen = maxlen
236 
237  def add_entry(self, entry: LogEntry) -> None:
238  """Add a new entry."""
239  key = entry.key
240 
241  if key in self:
242  # Update stored entry
243  existing = self[key]
244  existing.count += 1
245  existing.timestamp = entry.timestamp
246 
247  if entry.message[0] not in existing.message:
248  existing.message.append(entry.message[0])
249 
250  self.move_to_end(key)
251  else:
252  self[key] = entry
253 
254  if len(self) > self.maxlenmaxlen:
255  # Removes the first record which should also be the oldest
256  self.popitem(last=False)
257 
258  def to_list(self) -> list[dict[str, Any]]:
259  """Return reversed list of log entries - LIFO."""
260  return [value.to_dict() for value in reversed(self.values())]
261 
262 
263 class LogErrorHandler(logging.Handler):
264  """Log handler for error messages."""
265 
266  def __init__(
267  self,
268  hass: HomeAssistant,
269  maxlen: int,
270  fire_event: bool,
271  paths_re: re.Pattern[str],
272  ) -> None:
273  """Initialize a new LogErrorHandler."""
274  super().__init__()
275  self.hasshass = hass
276  self.recordsrecords = DedupStore(maxlen=maxlen)
277  self.fire_eventfire_event = fire_event
278  self.paths_repaths_re = paths_re
279 
280  def emit(self, record: logging.LogRecord) -> None:
281  """Save error and warning logs.
282 
283  Everything logged with error or warning is saved in local buffer. A
284  default upper limit is set to 50 (older entries are discarded) but can
285  be changed if needed.
286  """
287  entry = LogEntry(
288  record, self.paths_repaths_re, formatter=self.formatter, figure_out_source=True
289  )
290  self.recordsrecords.add_entry(entry)
291  if self.fire_eventfire_event:
292  self.hasshass.bus.fire(EVENT_SYSTEM_LOG, entry.to_dict())
293 
294 
295 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
296  """Set up the logger component."""
297  if (conf := config.get(DOMAIN)) is None:
298  conf = CONFIG_SCHEMA({DOMAIN: {}})[DOMAIN]
299 
300  hass_path: str = HOMEASSISTANT_PATH[0]
301  config_dir = hass.config.config_dir
302  paths_re = re.compile(rf"(?:{re.escape(hass_path)}|{re.escape(config_dir)})/(.*)")
303  handler = LogErrorHandler(
304  hass, conf[CONF_MAX_ENTRIES], conf[CONF_FIRE_EVENT], paths_re
305  )
306  handler.setLevel(logging.WARNING)
307 
308  hass.data[DOMAIN] = handler
309 
310  @callback
311  def _async_stop_handler(_: Event) -> None:
312  """Cleanup handler."""
313  logging.root.removeHandler(handler)
314  del hass.data[DOMAIN]
315 
316  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_stop_handler)
317 
318  logging.root.addHandler(handler)
319 
320  websocket_api.async_register_command(hass, list_errors)
321 
322  @callback
323  def _async_clear_service_handler(service: ServiceCall) -> None:
324  handler.records.clear()
325 
326  @callback
327  def _async_write_service_handler(service: ServiceCall) -> None:
328  name = service.data.get(CONF_LOGGER, f"{__name__}.external")
329  logger = logging.getLogger(name)
330  level = service.data[CONF_LEVEL]
331  getattr(logger, level)(service.data[CONF_MESSAGE])
332 
333  hass.services.async_register(
334  DOMAIN, SERVICE_CLEAR, _async_clear_service_handler, schema=SERVICE_CLEAR_SCHEMA
335  )
336  hass.services.async_register(
337  DOMAIN, SERVICE_WRITE, _async_write_service_handler, schema=SERVICE_WRITE_SCHEMA
338  )
339 
340  return True
341 
342 
343 @websocket_api.require_admin
344 @websocket_api.websocket_command({vol.Required("type"): "system_log/list"})
345 @callback
347  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
348 ) -> None:
349  """List all possible diagnostic handlers."""
350  connection.send_result(
351  msg["id"],
352  hass.data[DOMAIN].records.to_list(),
353  )
list[dict[str, Any]] to_list(self)
Definition: __init__.py:258
None add_entry(self, LogEntry entry)
Definition: __init__.py:237
None __init__(self, logging.LogRecord record, re.Pattern paths_re, logging.Formatter|None formatter=None, bool figure_out_source=False)
Definition: __init__.py:184
None emit(self, logging.LogRecord record)
Definition: __init__.py:280
None __init__(self, HomeAssistant hass, int maxlen, bool fire_event, re.Pattern[str] paths_re)
Definition: __init__.py:272
str _safe_get_message(logging.LogRecord record)
Definition: __init__.py:137
None list_errors(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: __init__.py:348
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:295
tuple[str, int] _figure_out_source(logging.LogRecord record, re.Pattern[str] paths_re, list[tuple[FrameType, int]]|None extracted_tb=None)
Definition: __init__.py:70