1 """Support for recording details."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Iterable
7 from concurrent.futures
import CancelledError
9 from datetime
import datetime, timedelta
15 from typing
import TYPE_CHECKING, Any, cast
17 from propcache
import cached_property
18 import psutil_home_assistant
as ha_psutil
19 from sqlalchemy
import create_engine, event
as sqlalchemy_event, exc, select, update
20 from sqlalchemy.engine
import Engine
21 from sqlalchemy.engine.interfaces
import DBAPIConnection
22 from sqlalchemy.exc
import SQLAlchemyError
23 from sqlalchemy.orm
import scoped_session, sessionmaker
24 from sqlalchemy.orm.session
import Session
29 EVENT_HOMEASSISTANT_CLOSE,
30 EVENT_HOMEASSISTANT_FINAL_WRITE,
37 EventStateChangedData,
42 async_track_time_change,
43 async_track_time_interval,
44 async_track_utc_time_change,
52 from .
import migration, statistics
57 LAST_REPORTED_SCHEMA_VERSION,
58 MARIADB_PYMYSQL_URL_PREFIX,
60 MAX_QUEUE_BACKLOG_MIN_VALUE,
61 MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG,
62 MYSQLDB_PYMYSQL_URL_PREFIX,
68 from .db_schema
import (
80 from .executor
import DBInterruptibleThreadPoolExecutor
81 from .models
import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
82 from .pool
import POOL_SIZE, MutexPool, RecorderPool
83 from .table_managers.event_data
import EventDataManager
84 from .table_managers.event_types
import EventTypeManager
85 from .table_managers.recorder_runs
import RecorderRunsManager
86 from .table_managers.state_attributes
import StateAttributesManager
87 from .table_managers.states
import StatesManager
88 from .table_managers.states_meta
import StatesMetaManager
89 from .table_managers.statistics_meta
import StatisticsMetaManager
93 ChangeStatisticsUnitTask,
96 CompileMissingStatisticsTask,
106 UpdateStatesMetadataTask,
107 UpdateStatisticsMetadataTask,
111 async_create_backup_failure_issue,
116 move_away_broken_database,
118 setup_connection_for_dialect,
119 validate_or_move_away_sqlite_database,
120 write_lock_db_sqlite,
123 _LOGGER = logging.getLogger(__name__)
125 DEFAULT_URL =
"sqlite:///{hass_config_path}"
129 EXPIRE_AFTER_COMMITS = 120
131 SHUTDOWN_TASK = object()
139 DB_LOCK_QUEUE_CHECK_TIMEOUT = 10
143 INVALIDATED_ERR =
"Database connection invalidated"
144 CONNECTIVITY_ERR =
"Error in database connectivity during commit"
147 MAX_DB_EXECUTOR_WORKERS = POOL_SIZE - 1
151 """A threaded recorder class."""
161 commit_interval: int,
165 entity_filter: Callable[[str], bool] |
None,
166 exclude_event_types: set[EventType[Any] | str],
168 """Initialize the recorder."""
169 threading.Thread.__init__(self, name=
"Recorder")
172 self.
thread_idthread_id: int |
None =
None
173 self.recorder_and_worker_thread_ids: set[int] = set()
178 self._hass_started: asyncio.Future[object] = hass.loop.create_future()
180 self._queue: queue.SimpleQueue[RecorderTask | Event] = queue.SimpleQueue()
186 db_connected: asyncio.Future[bool] = hass.data[DOMAIN].db_connected
187 self.async_db_connected: asyncio.Future[bool] = db_connected
189 self.async_db_ready: asyncio.Future[bool] = hass.loop.create_future()
193 self.
engineengine: Engine |
None =
None
194 self.max_backlog: int = MAX_QUEUE_BACKLOG_MIN_VALUE
195 self.
_psutil_psutil: ha_psutil.PsutilWrapper |
None =
None
216 self.
_get_session_get_session: Callable[[], Session] |
None =
None
222 self.
_db_executor_db_executor: DBInterruptibleThreadPoolExecutor |
None =
None
230 self.
_dialect_name_dialect_name: SupportedDialect |
None =
None
242 """Return the number of items in the recorder backlog."""
243 return self._queue.qsize()
247 """Return the dialect the recorder uses."""
252 """Short version to check if we are using sqlite3 as a file."""
253 return self.
db_urldb_url != SQLITE_URL_PREFIX
and self.
db_urldb_url.startswith(
259 """Return if the recorder is recording."""
263 """Get a new sqlalchemy session."""
265 raise RuntimeError(
"The database connection has not been established")
269 """Add a task to the recorder queue."""
270 self._queue.put(task)
273 """Enable or disable recording events and states."""
278 """Start the executor."""
280 self.recorder_and_worker_thread_ids,
281 thread_name_prefix=DB_WORKER_PREFIX,
282 max_workers=MAX_DB_EXECUTOR_WORKERS,
287 """Close the dbpool connections in the current thread."""
288 if self.
engineengine
and hasattr(self.
engineengine.pool,
"shutdown"):
289 self.
engineengine.pool.shutdown()
293 """Initialize the recorder."""
296 queue_put = self._queue.put_nowait
300 """Listen for new events and put them in the process queue."""
301 if event.event_type
in exclude_event_types:
304 if entity_filter
is None or not (
305 entity_id := event.data.get(ATTR_ENTITY_ID)
310 if isinstance(entity_id, str):
315 if isinstance(entity_id, list):
316 for eid
in entity_id:
332 QUEUE_CHECK_INTERVAL,
333 name=
"Recorder queue watcher",
338 """Queue a keep alive."""
344 """Queue a commit."""
353 def async_add_executor_job[_T](
354 self, target: Callable[..., _T], *args: Any
355 ) -> asyncio.Future[_T]:
356 """Add an executor job from within the event loop."""
357 return self.
hasshass.loop.run_in_executor(self.
_db_executor_db_executor, target, *args)
361 """Periodic check of the queue size to ensure we do not exhaust memory.
363 The queue grows during migration or if something really goes wrong.
365 _LOGGER.debug(
"Recorder queue size is: %s", self.
backlogbacklog)
370 "The recorder backlog queue reached the maximum size of %s events; "
371 "usually, the system is CPU bound, I/O bound, or the database "
372 "is corrupt due to a disk problem; The recorder will stop "
373 "recording events to avoid running out of memory"
380 """Return the available memory in bytes."""
382 self.
_psutil_psutil = ha_psutil.PsutilWrapper()
383 return cast(int, self.
_psutil_psutil.psutil.virtual_memory().available)
386 """Check if the system has reached the max queue backlog and return True if it has."""
388 if self.
backlogbacklog < MAX_QUEUE_BACKLOG_MIN_VALUE:
393 return self.
_available_memory_available_memory() < MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG
397 """Stop watching the queue and listening for events."""
407 """Stop listeners."""
423 """Empty the queue if its still present at close."""
434 self._queue.get_nowait()
438 await self.
hasshass.async_add_executor_job(self.join)
441 """Shut down the Recorder at final write."""
442 if not self._hass_started.done():
443 self._hass_started.set_result(SHUTDOWN_TASK)
446 await self.
hasshass.async_add_executor_job(self.join)
450 """Notify that hass has started."""
451 self._hass_started.set_result(
None)
455 """Post connection initialize."""
456 bus = self.
hasshass.bus
457 bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, self.
_async_close_async_close)
458 bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self.
_async_shutdown_async_shutdown)
463 """Report startup failure."""
469 if not self.async_db_connected.done():
470 self.async_db_connected.set_result(
False)
471 if not self.async_db_ready.done():
472 self.async_db_ready.set_result(
False)
474 persistent_notification.async_create(
476 "The recorder could not start, check [the logs](/config/logs)",
483 """Connect to the database succeeded, schema version and migration need known.
485 The database may not yet be ready for use in case of a non-live migration.
487 self.async_db_connected.set_result(
True)
491 """Database live and ready for use.
493 Called after non-live migration steps are finished.
495 if self.async_db_ready.done():
497 self.async_db_ready.set_result(
True)
502 """Finish start and mark recorder ready.
504 Called after all migration steps are finished.
511 """Trigger the purge."""
524 """Run tasks every five minutes."""
525 self.
queue_taskqueue_task(ADJUST_LRU_SIZE_TASK)
529 """Trigger the LRU adjustment.
531 If the number of entities has increased, increase the size of the LRU
532 cache to avoid thrashing.
534 if new_size := self.
hasshass.states.async_entity_ids_count() * 2:
541 """Trigger the statistics run.
543 Short term statistics run every 5 minutes
545 start = statistics.get_start_time()
552 start_time: datetime,
553 sum_adjustment: float,
554 adjustment_unit: str,
556 """Adjust statistics."""
559 statistic_id, start_time, sum_adjustment, adjustment_unit
565 self, statistic_ids: list[str], *, on_done: Callable[[],
None] |
None =
None
567 """Clear statistics for a list of statistic_ids."""
575 new_statistic_id: str | UndefinedType = UNDEFINED,
576 new_unit_of_measurement: str |
None | UndefinedType = UNDEFINED,
577 on_done: Callable[[],
None] |
None =
None,
579 """Update statistics metadata for a statistic_id."""
582 on_done, statistic_id, new_statistic_id, new_unit_of_measurement
592 """Update states metadata for an entity_id."""
600 new_unit_of_measurement: str,
601 old_unit_of_measurement: str,
603 """Change statistics unit for a statistic_id."""
606 statistic_id, new_unit_of_measurement, old_unit_of_measurement
613 metadata: StatisticMetaData,
614 stats: Iterable[StatisticData],
615 table: type[Statistics | StatisticsShortTerm],
617 """Schedule import of statistics."""
622 """Prepare periodic tasks."""
629 if self.
dialect_namedialect_name != SupportedDialect.SQLITE:
634 name=
"Recorder keep alive",
643 name=
"Recorder commit",
657 """Wait for the hass started future."""
658 return await self._hass_started
661 """Wait for startup or shutdown before starting."""
663 return asyncio.run_coroutine_threadsafe(
666 except CancelledError
as ex:
668 "Recorder startup was externally canceled before it could complete: %s",
674 """Run the recorder thread."""
680 "Recorder._run threw unexpected exception, recorder shutting down"
689 """Add an object to the session."""
694 """Notify the user schema migration failed."""
695 persistent_notification.create(
697 "The database migration failed, check [the logs](/config/logs).",
698 "Database Migration Failed",
699 "recorder_database_migration",
703 """Dismiss notification about migration in progress."""
704 persistent_notification.dismiss(self.
hasshass,
"recorder_database_migration")
707 """Start processing events to save."""
708 thread_id = threading.get_ident()
710 self.recorder_and_worker_thread_ids.
add(thread_id)
718 schema_status = migration.validate_db_schema(self.
hasshass, self, self.
get_sessionget_session)
719 if schema_status
is None:
724 if not schema_status.migration_needed
and not schema_status.schema_errors:
728 self.
migration_is_livemigration_is_live = migration.live_migration(schema_status)
733 if schema_status.migration_needed:
761 if schema_status.migration_needed
or schema_status.schema_errors:
784 _LOGGER.debug(
"Recorder processing the queue")
790 self, schema_status: migration.SchemaValidationStatus
792 """Activate the table managers or schedule migrations and mark the db as ready."""
800 migration.migrate_data_live(self, self.
get_sessionget_session, schema_status)
810 """Run the event loop for the recorder."""
815 startup_task_or_events: list[RecorderTask | Event] = []
816 while not queue_.empty()
and (task_or_event := queue_.get_nowait()):
817 startup_task_or_events.append(task_or_event)
819 for task
in startup_task_or_events:
824 del startup_task_or_events
831 self, startup_task_or_events: list[RecorderTask | Event[Any]]
833 """Pre process startup events."""
836 state_change_events: list[Event[EventStateChangedData]] = []
837 non_state_change_events: list[Event] = []
839 for task_or_event
in startup_task_or_events:
842 if type(task_or_event)
is Event:
843 event_ = task_or_event
844 if event_.event_type == EVENT_STATE_CHANGED:
845 state_change_events.append(event_)
847 non_state_change_events.append(event_)
857 self, task: RecorderTask | Event
859 """Process a task, guarding against exceptions to ensure the loop does not collapse."""
860 _LOGGER.debug(
"Processing task: %s", task)
864 _LOGGER.exception(
"Error while processing event %s", task)
867 """Process a task or event, reconnect, or recover a malformed database."""
873 if type(task)
is Event:
879 assert isinstance(task, RecorderTask)
880 if task.commit_before:
883 except exc.DatabaseError
as err:
886 _LOGGER.exception(
"Unhandled database error while processing task %s", task)
887 except SQLAlchemyError:
888 _LOGGER.exception(
"SQLAlchemyError error processing task %s", task)
897 """Create a connection to the database."""
903 return migration.initialize_database(self.
get_sessionget_session)
904 except UnsupportedDialect:
908 "Error during connection setup: (retrying in %s seconds)",
920 self, schema_status: migration.SchemaValidationStatus
923 with self.
hasshass.timeout.freeze(DOMAIN):
924 migration.migrate_data_non_live(self, self.
get_sessionget_session, schema_status)
927 self, schema_status: migration.SchemaValidationStatus
929 """Migrate schema to the latest version."""
930 with self.
hasshass.timeout.freeze(DOMAIN):
934 self, schema_status: migration.SchemaValidationStatus
936 """Migrate schema to the latest version."""
937 persistent_notification.create(
940 "System performance will temporarily degrade during the database"
941 " upgrade. Do not power down or restart the system until the upgrade"
942 " completes. Integrations that read the database, such as logbook,"
943 " history, and statistics may return inconsistent results until the"
944 " upgrade completes. This notification will be automatically dismissed"
945 " when the upgrade completes."
947 "Database upgrade in progress",
948 "recorder_database_migration",
954 schema_status: migration.SchemaValidationStatus,
957 """Migrate schema to the latest version."""
958 assert self.
engineengine
is not None
961 migrator = migration.migrate_schema_live
963 migrator = migration.migrate_schema_non_live
964 new_schema_status = migrator(
967 except exc.DatabaseError
as err:
972 current_version=SCHEMA_VERSION,
973 migration_needed=
False,
974 non_live_data_migration_needed=
False,
976 start_version=SCHEMA_VERSION,
978 return (
True, new_schema_status)
979 _LOGGER.exception(
"Database error during schema migration")
980 return (
False, schema_status)
982 _LOGGER.exception(
"Error during schema migration")
983 return (
False, schema_status)
985 return (
True, new_schema_status)
989 def _async_set_database_locked(task: DatabaseLockTask) ->
None:
990 task.database_locked.set()
992 local_start_time = dt_util.now()
996 hass.add_job(_async_set_database_locked, task)
997 while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
1000 "Database queue backlog reached more than %s events "
1001 "while waiting for backup to finish; recorder will now "
1002 "resume writing to database. The backup cannot be trusted and "
1003 "must be restarted",
1006 task.queue_overflow =
True
1008 async_create_backup_failure_issue, self.
hasshass, local_start_time
1012 "Database queue backlog reached %d entries during backup",
1019 if event.event_type == EVENT_STATE_CHANGED:
1028 """Process any event into the session except state changed."""
1030 assert session
is not None
1031 dbevent = Events.from_event(event)
1035 if pending_event_types := event_type_manager.get_pending(event.event_type):
1036 dbevent.event_type_rel = pending_event_types
1037 elif event_type_id := event_type_manager.get(event.event_type, session,
True):
1038 dbevent.event_type_id = event_type_id
1040 event_types =
EventTypes(event_type=event.event_type)
1041 event_type_manager.add_pending(event_types)
1043 dbevent.event_type_rel = event_types
1050 if not (shared_data_bytes := event_data_manager.serialize_from_event(event)):
1054 shared_data = shared_data_bytes.decode(
"utf-8")
1056 if pending_event_data := event_data_manager.get_pending(shared_data):
1057 dbevent.event_data_rel = pending_event_data
1059 elif (data_id := event_data_manager.get_from_cache(shared_data))
or (
1060 (hash_ := EventData.hash_shared_data_bytes(shared_data_bytes))
1061 and (data_id := event_data_manager.get(shared_data, hash_, session))
1063 dbevent.data_id = data_id
1066 dbevent_data =
EventData(shared_data=shared_data, hash=hash_)
1067 event_data_manager.add_pending(dbevent_data)
1069 dbevent.event_data_rel = dbevent_data
1074 self, event: Event[EventStateChangedData]
1076 """Process a state_changed event into the session."""
1079 entity_removed =
not event.data.get(
"new_state")
1080 entity_id = event.data[
"entity_id"]
1082 dbstate = States.from_event(event)
1083 old_state = event.data[
"old_state"]
1089 if pending_state := states_manager.pop_pending(entity_id):
1090 dbstate.old_state = pending_state
1092 pending_state.last_reported_ts = old_state.last_reported_timestamp
1093 elif old_state_id := states_manager.pop_committed(entity_id):
1094 dbstate.old_state_id = old_state_id
1096 states_manager.update_pending_last_reported(
1097 old_state_id, old_state.last_reported_timestamp
1100 dbstate.state =
None
1102 states_manager.add_pending(entity_id, dbstate)
1104 if states_meta_manager.active:
1105 dbstate.entity_id =
None
1107 if entity_id
is None or not (
1108 shared_attrs_bytes := state_attributes_manager.serialize_from_event(event)
1113 if pending_states_meta := states_meta_manager.get_pending(entity_id):
1114 dbstate.states_meta_rel = pending_states_meta
1115 elif metadata_id := states_meta_manager.get(entity_id, session,
True):
1116 dbstate.metadata_id = metadata_id
1117 elif states_meta_manager.active
and entity_removed:
1124 states_meta =
StatesMeta(entity_id=entity_id)
1125 states_meta_manager.add_pending(states_meta)
1127 dbstate.states_meta_rel = states_meta
1130 shared_attrs = shared_attrs_bytes.decode(
"utf-8")
1131 dbstate.attributes =
None
1133 if pending_event_data := state_attributes_manager.get_pending(shared_attrs):
1134 dbstate.state_attributes = pending_event_data
1137 attributes_id := state_attributes_manager.get_from_cache(shared_attrs)
1139 (hash_ := StateAttributes.hash_shared_attrs_bytes(shared_attrs_bytes))
1141 attributes_id := state_attributes_manager.get(
1142 shared_attrs, hash_, session
1146 dbstate.attributes_id = attributes_id
1149 dbstate_attributes =
StateAttributes(shared_attrs=shared_attrs, hash=hash_)
1150 state_attributes_manager.add_pending(dbstate_attributes)
1152 dbstate.state_attributes = dbstate_attributes
1157 """Handle a database error that may result in moving away the corrupt db."""
1159 (cause := err.__cause__)
1160 and isinstance(cause, sqlite3.DatabaseError)
1161 and (cause_str :=
str(cause))
1165 and (
"malformed" in cause_str
or "not a database" in cause_str)
1168 "Unrecoverable sqlite3 database corruption detected: %s", err
1175 """Commit the event session if there is work to do."""
1182 except (exc.InternalError, exc.OperationalError)
as err:
1184 "%s: Error executing query: %s. (retrying in %s seconds)",
1185 INVALIDATED_ERR
if err.connection_invalidated
else CONNECTIVITY_ERR,
1203 pending_last_reported
1204 := self.
states_managerstates_manager.get_pending_last_reported_timestamp()
1205 )
and self.
schema_versionschema_version >= LAST_REPORTED_SCHEMA_VERSION:
1206 with session.no_autoflush:
1211 "state_id": state_id,
1212 "last_reported_ts": last_reported_timestamp,
1214 for state_id, last_reported_timestamp
in pending_last_reported.items()
1235 session.expire_all()
1238 """Handle the sqlite3 database being corrupt."""
1250 """Close the event session."""
1264 except SQLAlchemyError:
1265 _LOGGER.exception(
"Error while rolling back and closing the event session")
1268 """Rollback the event session and reopen it after a failure."""
1273 """Open the event session."""
1278 """Send a keep alive to keep the db connection open."""
1280 _LOGGER.debug(
"Sending keepalive")
1281 self.
event_sessionevent_session.connection().scalar(select(1))
1284 """Async version of block_till_done."""
1287 event = asyncio.Event()
1292 """Block till all events processed.
1294 This is only called in tests.
1296 This only blocks until the queue is empty
1297 which does not mean the recorder is done.
1299 Call tests.common's wait_recording_done
1300 after calling this to ensure the data
1308 """Lock database so it can be backed up safely."""
1309 if self.
dialect_namedialect_name != SupportedDialect.SQLITE:
1311 "Not a SQLite database or not connected, locking not necessary"
1316 _LOGGER.warning(
"Database already locked")
1319 database_locked = asyncio.Event()
1323 async
with asyncio.timeout(DB_LOCK_TIMEOUT):
1324 await database_locked.wait()
1325 except TimeoutError
as err:
1326 task.database_unlock.set()
1328 f
"Could not lock database within {DB_LOCK_TIMEOUT} seconds."
1337 Returns true if database lock has been held throughout the process.
1339 if self.
dialect_namedialect_name != SupportedDialect.SQLITE:
1341 "Not a SQLite database or not connected, unlocking not necessary"
1346 _LOGGER.warning(
"Database currently not locked")
1357 self, dbapi_connection: DBAPIConnection, connection_record: Any
1359 """Dbapi specific connection settings."""
1360 assert self.
engineengine
is not None
1363 self.
engineengine.dialect.name,
1368 self.
max_bind_varsmax_bind_vars = database_engine.max_bind_vars
1372 """Ensure database is ready to fly."""
1373 kwargs: dict[str, Any] = {}
1376 if self.
db_urldb_url == SQLITE_URL_PREFIX
or ":memory:" in self.
db_urldb_url:
1377 kwargs[
"connect_args"] = {
"check_same_thread":
False}
1378 kwargs[
"poolclass"] = MutexPool
1379 MutexPool.pool_lock = threading.RLock()
1380 kwargs[
"pool_reset_on_return"] =
None
1381 elif self.
db_urldb_url.startswith(SQLITE_URL_PREFIX):
1382 kwargs[
"poolclass"] = RecorderPool
1383 kwargs[
"recorder_and_worker_thread_ids"] = (
1384 self.recorder_and_worker_thread_ids
1386 elif self.
db_urldb_url.startswith(
1389 MARIADB_PYMYSQL_URL_PREFIX,
1391 MYSQLDB_PYMYSQL_URL_PREFIX,
1394 kwargs[
"connect_args"] = {
"charset":
"utf8mb4"}
1395 if self.
db_urldb_url.startswith((MARIADB_URL_PREFIX, MYSQLDB_URL_PREFIX)):
1401 with contextlib.suppress(ImportError):
1405 if not self.
db_urldb_url.startswith(SQLITE_URL_PREFIX):
1406 kwargs[
"echo"] =
False
1411 assert not self.
engineengine
1412 self.
engineengine = create_engine(self.
db_urldb_url, **kwargs, future=
True)
1414 self.__dict__.pop(
"dialect_name",
None)
1417 migration.pre_migrate_schema(self.
engineengine)
1418 Base.metadata.create_all(self.
engineengine)
1420 _LOGGER.debug(
"Connected to recorder database")
1423 """Close the connection."""
1430 """Log the start of the current run and schedule any needed jobs."""
1439 """Add tasks for missing statistics runs."""
1443 """End the recorder session."""
1453 _LOGGER.exception(
"Error saving the event session during shutdown")
1459 """Save end time for current run."""
1460 _LOGGER.debug(
"Shutting down recorder")
1483 self.
_db_executor_db_executor.join_threads_or_timeout()
bool _setup_recorder(self)
None _async_set_recorder_ready_migration_done(self)
tuple[bool, migration.SchemaValidationStatus] _migrate_schema(self, migration.SchemaValidationStatus schema_status, bool live)
None _dismiss_migration_in_progress(self)
tuple[bool, migration.SchemaValidationStatus] _migrate_schema_live(self, migration.SchemaValidationStatus schema_status)
None _activate_and_set_db_ready(self, migration.SchemaValidationStatus schema_status)
SupportedDialect|None dialect_name(self)
None _async_hass_started(self, HomeAssistant hass)
int _available_memory(self)
None _migrate_data_offline(self, migration.SchemaValidationStatus schema_status)
None async_periodic_statistics(self)
bool unlock_database(self)
bool _using_file_sqlite(self)
None _setup_recorder_connection(self, DBAPIConnection dbapi_connection, Any connection_record)
None _guarded_process_one_task_or_event_or_recover(self, RecorderTask|Event task)
None _process_non_state_changed_event_into_session(self, Event event)
None _process_state_changed_event_into_session(self, Event[EventStateChangedData] event)
None async_register(self)
None queue_task(self, RecorderTask|Event task)
None async_clear_statistics(self, list[str] statistic_ids, *Callable[[], None]|None on_done=None)
None _reopen_event_session(self)
None _close_event_session(self)
object|None _async_wait_for_started(self)
None block_till_done(self)
None _close_connection(self)
None async_nightly_tasks(self, datetime now)
None _async_shutdown(self, Event event)
bool _handle_database_error(self, Exception err, *bool setup_run)
bool _reached_max_backlog(self)
None _adjust_lru_size(self)
Session get_session(self)
None _shutdown_pool(self)
None _add_to_session(self, Session session, object obj)
None _process_one_event(self, Event[Any] event)
None async_adjust_statistics(self, str statistic_id, datetime start_time, float sum_adjustment, str adjustment_unit)
None __init__(self, HomeAssistant hass, bool auto_purge, bool auto_repack, int keep_days, int commit_interval, str uri, int db_max_retries, int db_retry_wait, Callable[[str], bool]|None entity_filter, set[EventType[Any]|str] exclude_event_types)
None async_change_statistics_unit(self, str statistic_id, *str new_unit_of_measurement, str old_unit_of_measurement)
None _async_setup_periodic_tasks(self)
None set_enable(self, bool enable)
_event_session_has_pending_writes
None _async_startup_done(self, bool startup_failed)
None _notify_migration_failed(self)
None _setup_connection(self)
None _send_keep_alive(self)
None async_set_db_ready(self)
None _async_keep_alive(self, datetime now)
None _lock_database(self, DatabaseLockTask task)
None async_update_states_metadata(self, str entity_id, str new_entity_id)
_completed_first_database_setup
None _async_stop_queue_watcher_and_event_listener(self)
None async_import_statistics(self, StatisticMetaData metadata, Iterable[StatisticData] stats, type[Statistics|StatisticsShortTerm] table)
None _commit_event_session(self)
None _process_one_task_or_event_or_recover(self, RecorderTask|Event task)
None _async_commit(self, datetime now)
None _run_event_loop(self)
None _handle_sqlite_corruption(self, bool setup_run)
None async_block_till_done(self)
None _async_stop_listeners(self)
tuple[bool, migration.SchemaValidationStatus] _migrate_schema_offline(self, migration.SchemaValidationStatus schema_status)
None async_start_executor(self)
None async_update_statistics_metadata(self, str statistic_id, *str|UndefinedType new_statistic_id=UNDEFINED, str|None|UndefinedType new_unit_of_measurement=UNDEFINED, Callable[[], None]|None on_done=None)
None _async_five_minute_tasks(self, datetime now)
None _async_close(self, Event event)
None _open_event_session(self)
None async_connection_success(self)
None _schedule_compile_missing_statistics(self)
None async_initialize(self)
None _pre_process_startup_events(self, list[RecorderTask|Event[Any]] startup_task_or_events)
object|None _wait_startup_or_shutdown(self)
None _async_check_queue(self, *Any _)
None _commit_event_session_or_retry(self)
bool add(self, _T matcher)
IssData update(pyiss.ISS iss)
DatabaseEngine|None setup_connection_for_dialect(Recorder instance, str dialect_name, DBAPIConnection dbapi_connection, bool first_connection)
None end_incomplete_runs(Session session, datetime start_time)
bool validate_or_move_away_sqlite_database(str dburl)
Generator[None] write_lock_db_sqlite(Recorder instance)
str dburl_to_path(str dburl)
dict build_mysqldb_conv()
None move_away_broken_database(str dbfile)
bool is_second_sunday(datetime date_time)
CALLBACK_TYPE async_track_time_change(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, Any|None hour=None, Any|None minute=None, Any|None second=None)
CALLBACK_TYPE async_track_utc_time_change(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, Any|None hour=None, Any|None minute=None, Any|None second=None, bool local=False)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
CALLBACK_TYPE async_at_started(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)