1 """Support for system log."""
3 from __future__
import annotations
5 from collections
import OrderedDict, deque
10 from types
import FrameType
11 from typing
import Any, cast
13 import voluptuous
as vol
15 from homeassistant
import __path__
as HOMEASSISTANT_PATH
22 type KeyType = tuple[str, tuple[str, int], tuple[str, int, str] |
None]
24 CONF_MAX_ENTRIES =
"max_entries"
25 CONF_FIRE_EVENT =
"fire_event"
26 CONF_MESSAGE =
"message"
28 CONF_LOGGER =
"logger"
30 DATA_SYSTEM_LOG =
"system_log"
31 DEFAULT_MAX_ENTRIES = 50
32 DEFAULT_FIRE_EVENT =
False
35 EVENT_SYSTEM_LOG =
"system_log_event"
37 SERVICE_CLEAR =
"clear"
38 SERVICE_WRITE =
"write"
40 CONFIG_SCHEMA = vol.Schema(
45 CONF_MAX_ENTRIES, default=DEFAULT_MAX_ENTRIES
47 vol.Optional(CONF_FIRE_EVENT, default=DEFAULT_FIRE_EVENT): cv.boolean,
51 extra=vol.ALLOW_EXTRA,
54 SERVICE_CLEAR_SCHEMA = vol.Schema({})
55 SERVICE_WRITE_SCHEMA = vol.Schema(
57 vol.Required(CONF_MESSAGE): cv.string,
58 vol.Optional(CONF_LEVEL, default=
"error"): vol.In(
59 [
"debug",
"info",
"warning",
"error",
"critical"]
61 vol.Optional(CONF_LOGGER): cv.string,
67 record: logging.LogRecord,
68 paths_re: re.Pattern[str],
69 extracted_tb: list[tuple[FrameType, int]] |
None =
None,
71 """Figure out where a log message came from."""
76 source: list[tuple[FrameType, int]] = extracted_tb
or list(
77 traceback.walk_tb(record.exc_info[2])
80 (tb_frame.f_code.co_filename, tb_line_no)
for tb_frame, tb_line_no
in source
82 for i, (filename, _)
in enumerate(stack):
85 if filename == record.pathname:
86 stack = stack[0 : i + 1]
90 for path, line_number
in reversed(stack):
92 if match := paths_re.match(path):
93 return (cast(str, match.group(1)), line_number)
109 frame = sys._getframe(4)
128 while back := frame.f_back:
129 if match := paths_re.match(frame.f_code.co_filename):
130 return (cast(str, match.group(1)), frame.f_lineno)
134 return (record.pathname, record.lineno)
138 """Get message from record and handle exceptions.
140 This code will be unreachable during a pytest run
141 because pytest installs a logging handler that
142 will prevent this code from being reached.
144 Calling record.getMessage() can raise an exception
145 if the log message does not contain sufficient arguments.
147 As there is no guarantees about which exceptions
148 that can be raised, we catch all exceptions and
149 return a generic message.
151 This must be manually tested when changing the code.
154 return record.getMessage()
155 except Exception
as ex:
157 return f
"Bad logger message: {record.msg} ({record.args})"
159 return f
"Bad logger message: {ex}"
163 """Store HA log entries."""
180 record: logging.LogRecord,
181 paths_re: re.Pattern,
182 formatter: logging.Formatter |
None =
None,
183 figure_out_source: bool =
False,
185 """Initialize a log entry."""
193 self.
root_causeroot_cause: tuple[str, int, str] |
None =
None
194 extracted_tb: list[tuple[FrameType, int]] |
None =
None
196 if formatter
and record.exc_text
is None:
197 record.exc_text = formatter.formatException(record.exc_info)
198 self.
exceptionexception = record.exc_text
or ""
199 if extracted :=
list(traceback.walk_tb(record.exc_info[2])):
201 extracted_tb = extracted
202 tb_frame, tb_line_no = extracted[-1]
204 tb_frame.f_code.co_filename,
206 tb_frame.f_code.co_name,
208 if figure_out_source:
211 self.
sourcesource = (record.pathname, record.lineno)
216 """Convert object into dict to maintain backward compatibility."""
218 "name": self.
namename,
220 "level": self.
levellevel,
221 "source": self.
sourcesource,
224 "count": self.
countcount,
230 """Data store to hold max amount of deduped entries."""
233 """Initialize a new DedupStore."""
238 """Add a new entry."""
245 existing.timestamp = entry.timestamp
247 if entry.message[0]
not in existing.message:
248 existing.message.append(entry.message[0])
250 self.move_to_end(key)
254 if len(self) > self.
maxlenmaxlen:
256 self.popitem(last=
False)
259 """Return reversed list of log entries - LIFO."""
260 return [value.to_dict()
for value
in reversed(self.values())]
264 """Log handler for error messages."""
271 paths_re: re.Pattern[str],
273 """Initialize a new LogErrorHandler."""
280 def emit(self, record: logging.LogRecord) ->
None:
281 """Save error and warning logs.
283 Everything logged with error or warning is saved in local buffer. A
284 default upper limit is set to 50 (older entries are discarded) but can
285 be changed if needed.
288 record, self.
paths_repaths_re, formatter=self.formatter, figure_out_source=
True
290 self.
recordsrecords.add_entry(entry)
292 self.
hasshass.bus.fire(EVENT_SYSTEM_LOG, entry.to_dict())
295 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
296 """Set up the logger component."""
297 if (conf := config.get(DOMAIN))
is None:
300 hass_path: str = HOMEASSISTANT_PATH[0]
301 config_dir = hass.config.config_dir
302 paths_re = re.compile(rf
"(?:{re.escape(hass_path)}|{re.escape(config_dir)})/(.*)")
304 hass, conf[CONF_MAX_ENTRIES], conf[CONF_FIRE_EVENT], paths_re
306 handler.setLevel(logging.WARNING)
308 hass.data[DOMAIN] = handler
311 def _async_stop_handler(_: Event) ->
None:
312 """Cleanup handler."""
313 logging.root.removeHandler(handler)
314 del hass.data[DOMAIN]
316 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_stop_handler)
318 logging.root.addHandler(handler)
320 websocket_api.async_register_command(hass, list_errors)
323 def _async_clear_service_handler(service: ServiceCall) ->
None:
324 handler.records.clear()
327 def _async_write_service_handler(service: ServiceCall) ->
None:
328 name = service.data.get(CONF_LOGGER, f
"{__name__}.external")
329 logger = logging.getLogger(name)
330 level = service.data[CONF_LEVEL]
331 getattr(logger, level)(service.data[CONF_MESSAGE])
333 hass.services.async_register(
334 DOMAIN, SERVICE_CLEAR, _async_clear_service_handler, schema=SERVICE_CLEAR_SCHEMA
336 hass.services.async_register(
337 DOMAIN, SERVICE_WRITE, _async_write_service_handler, schema=SERVICE_WRITE_SCHEMA
343 @websocket_api.require_admin
344 @websocket_api.websocket_command({vol.Required("type"):
"system_log/list"})
349 """List all possible diagnostic handlers."""
350 connection.send_result(
352 hass.data[DOMAIN].records.to_list(),
list[dict[str, Any]] to_list(self)
None add_entry(self, LogEntry entry)
None __init__(self, int maxlen=50)
None __init__(self, logging.LogRecord record, re.Pattern paths_re, logging.Formatter|None formatter=None, bool figure_out_source=False)
dict[str, Any] to_dict(self)
None emit(self, logging.LogRecord record)
None __init__(self, HomeAssistant hass, int maxlen, bool fire_event, re.Pattern[str] paths_re)
str _safe_get_message(logging.LogRecord record)
None list_errors(HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
bool async_setup(HomeAssistant hass, ConfigType config)
tuple[str, int] _figure_out_source(logging.LogRecord record, re.Pattern[str] paths_re, list[tuple[FrameType, int]]|None extracted_tb=None)