1 """Support managing EventTypes."""
3 from __future__
import annotations
5 from collections.abc
import Iterable
6 from typing
import TYPE_CHECKING, Any, cast
9 from sqlalchemy.orm.session
import Session
15 from ..db_schema
import EventTypes
16 from ..queries
import find_event_type_ids
17 from ..tasks
import RefreshEventTypesTask
18 from ..util
import execute_stmt_lambda_element
19 from .
import BaseLRUTableManager
22 from ..core
import Recorder
29 """Manage the EventTypes table."""
33 def __init__(self, recorder: Recorder) ->
None:
34 """Initialize the event type manager."""
35 super().
__init__(recorder, CACHE_SIZE)
36 self._non_existent_event_types: LRU[EventType[Any] | str,
None] = LRU(
40 def load(self, events: list[Event], session: Session) ->
None:
41 """Load the event_type to event_type_ids mapping into memory.
43 This call is not thread-safe and must be called from the
47 {event.event_type
for event
in events
if event.event_type
is not None},
54 event_type: EventType[Any] | str,
56 from_recorder: bool =
False,
58 """Resolve event_type to the event_type_id.
60 This call is not thread-safe and must be called from the
63 return self.
get_manyget_many((event_type,), session)[event_type]
67 event_types: Iterable[EventType[Any] | str],
69 from_recorder: bool =
False,
70 ) -> dict[EventType[Any] | str, int |
None]:
71 """Resolve event_types to event_type_ids.
73 This call is not thread-safe and must be called from the
76 results: dict[EventType[Any] | str, int |
None] = {}
77 missing: list[EventType[Any] | str] = []
78 non_existent: list[EventType[Any] | str] = []
80 for event_type
in event_types:
81 if (event_type_id := self._id_map.
get(event_type))
is None:
82 if event_type
in self._non_existent_event_types:
83 results[event_type] =
None
85 missing.append(event_type)
87 results[event_type] = event_type_id
92 with session.no_autoflush:
93 for missing_chunk
in chunked_or_all(missing, self.recorder.max_bind_vars):
97 results[event_type] = self._id_map[event_type] = cast(
102 event_type
for event_type
in missing
if results[event_type]
is None
107 for event_type
in non_existent:
108 self._non_existent_event_types[event_type] =
None
118 """Add a pending EventTypes that will be committed at the next interval.
120 This call is not thread-safe and must be called from the
123 assert db_event_type.event_type
is not None
124 event_type: str = db_event_type.event_type
125 self._pending[event_type] = db_event_type
128 """Call after commit to load the event_type_ids of the new EventTypes into the LRU.
130 This call is not thread-safe and must be called from the
133 for event_type, db_event_types
in self._pending.items():
134 self._id_map[event_type] = db_event_types.event_type_id
136 self._pending.clear()
139 """Clear a non-existent event type from the cache.
141 This call is not thread-safe and must be called from the
144 self._non_existent_event_types.pop(event_type,
None)
147 """Evict purged event_types from the cache when they are no longer used.
149 This call is not thread-safe and must be called from the
152 for event_type
in event_types:
153 self._id_map.pop(event_type,
None)
None load(self, list[Event] events, Session session)
None __init__(self, Recorder recorder)
None add_pending(self, EventTypes db_event_type)
dict[EventType[Any]|str, int|None] get_many(self, Iterable[EventType[Any]|str] event_types, Session session, bool from_recorder=False)
None clear_non_existent(self, EventType[Any]|str event_type)
None post_commit_pending(self)
None evict_purged(self, Iterable[str] event_types)
int|None get(self, EventType[Any]|str event_type, Session session, bool from_recorder=False)
StatementLambdaElement find_event_type_ids(Iterable[str] event_types)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)