1 """Support managing StatesMeta."""
3 from __future__
import annotations
7 from typing
import TYPE_CHECKING, Final, Literal
10 from sqlalchemy
import lambda_stmt, select
11 from sqlalchemy.orm.session
import Session
12 from sqlalchemy.sql.expression
import true
13 from sqlalchemy.sql.lambdas
import StatementLambdaElement
15 from ..db_schema
import StatisticsMeta
16 from ..models
import StatisticMetaData
17 from ..util
import execute_stmt_lambda_element
20 from ..core
import Recorder
24 _LOGGER = logging.getLogger(__name__)
26 QUERY_STATISTIC_META = (
28 StatisticsMeta.statistic_id,
29 StatisticsMeta.source,
30 StatisticsMeta.unit_of_measurement,
31 StatisticsMeta.has_mean,
32 StatisticsMeta.has_sum,
37 INDEX_STATISTIC_ID: Final = 1
38 INDEX_SOURCE: Final = 2
39 INDEX_UNIT_OF_MEASUREMENT: Final = 3
40 INDEX_HAS_MEAN: Final = 4
41 INDEX_HAS_SUM: Final = 5
46 statistic_ids: set[str] |
None =
None,
47 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
48 statistic_source: str |
None =
None,
49 ) -> StatementLambdaElement:
50 """Generate a statement to fetch metadata."""
51 stmt = lambda_stmt(
lambda: select(*QUERY_STATISTIC_META))
53 stmt +=
lambda q: q.where(StatisticsMeta.statistic_id.in_(statistic_ids))
54 if statistic_source
is not None:
55 stmt +=
lambda q: q.where(StatisticsMeta.source == statistic_source)
56 if statistic_type ==
"mean":
57 stmt +=
lambda q: q.where(StatisticsMeta.has_mean == true())
58 elif statistic_type ==
"sum":
59 stmt +=
lambda q: q.where(StatisticsMeta.has_sum == true())
64 """Manage the StatisticsMeta table."""
66 def __init__(self, recorder: Recorder) ->
None:
67 """Initialize the statistics meta manager."""
69 self._stat_id_to_id_meta: LRU[str, tuple[int, StatisticMetaData]] = LRU(
74 """Clear the cache."""
75 for statistic_id
in statistic_ids:
76 self._stat_id_to_id_meta.pop(statistic_id,
None)
81 statistic_ids: set[str] |
None =
None,
82 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
83 statistic_source: str |
None =
None,
84 ) -> dict[str, tuple[int, StatisticMetaData]]:
85 """Fetch meta data and process it into results and/or cache."""
91 and self.
recorderrecorder.thread_id == threading.get_ident()
93 results: dict[str, tuple[int, StatisticMetaData]] = {}
94 id_meta: tuple[int, StatisticMetaData]
95 meta: StatisticMetaData
98 with session.no_autoflush:
99 stat_id_to_id_meta = self._stat_id_to_id_meta
103 statistic_ids, statistic_type, statistic_source
107 statistic_id = row[INDEX_STATISTIC_ID]
108 row_id = row[INDEX_ID]
110 "has_mean": row[INDEX_HAS_MEAN],
111 "has_sum": row[INDEX_HAS_SUM],
112 "name": row[INDEX_NAME],
113 "source": row[INDEX_SOURCE],
114 "statistic_id": statistic_id,
115 "unit_of_measurement": row[INDEX_UNIT_OF_MEASUREMENT],
117 id_meta = (row_id, meta)
118 results[statistic_id] = id_meta
120 stat_id_to_id_meta[statistic_id] = id_meta
124 """Assert that we are in the recorder thread."""
125 if self.
recorderrecorder.thread_id != threading.get_ident():
126 raise RuntimeError(
"Detected unsafe call not in recorder thread")
129 self, session: Session, statistic_id: str, new_metadata: StatisticMetaData
131 """Add metadata to the database.
133 This call is not thread-safe and must be called from the
137 meta = StatisticsMeta.from_meta(new_metadata)
142 "Added new statistics metadata for %s, new_metadata: %s",
152 new_metadata: StatisticMetaData,
153 old_metadata_dict: dict[str, tuple[int, StatisticMetaData]],
154 ) -> tuple[str |
None, int]:
155 """Update metadata in the database.
157 This call is not thread-safe and must be called from the
160 metadata_id, old_metadata = old_metadata_dict[statistic_id]
162 old_metadata[
"has_mean"] != new_metadata[
"has_mean"]
163 or old_metadata[
"has_sum"] != new_metadata[
"has_sum"]
164 or old_metadata[
"name"] != new_metadata[
"name"]
165 or old_metadata[
"unit_of_measurement"]
166 != new_metadata[
"unit_of_measurement"]
168 return None, metadata_id
171 session.query(StatisticsMeta).filter_by(statistic_id=statistic_id).
update(
173 StatisticsMeta.has_mean: new_metadata[
"has_mean"],
174 StatisticsMeta.has_sum: new_metadata[
"has_sum"],
175 StatisticsMeta.name: new_metadata[
"name"],
176 StatisticsMeta.unit_of_measurement: new_metadata[
"unit_of_measurement"],
178 synchronize_session=
False,
182 "Updated statistics metadata for %s, old_metadata: %s, new_metadata: %s",
187 return statistic_id, metadata_id
189 def load(self, session: Session) ->
None:
190 """Load the statistic_id to metadata_id mapping into memory.
192 This call is not thread-safe and must be called from the
198 self, session: Session, statistic_id: str
199 ) -> tuple[int, StatisticMetaData] |
None:
200 """Resolve statistic_id to the metadata_id."""
201 return self.
get_manyget_many(session, {statistic_id}).
get(statistic_id)
206 statistic_ids: set[str] |
None =
None,
207 statistic_type: Literal[
"mean",
"sum"] |
None =
None,
208 statistic_source: str |
None =
None,
209 ) -> dict[str, tuple[int, StatisticMetaData]]:
212 Returns a dict of (metadata_id, StatisticMetaData) tuples indexed by statistic_id.
214 If statistic_ids is given, fetch metadata only for the listed statistics_ids.
215 If statistic_type is given, fetch metadata only for statistic_ids supporting it.
217 if statistic_ids
is None:
221 statistic_type=statistic_type,
222 statistic_source=statistic_source,
225 if statistic_type
is not None or statistic_source
is not None:
230 "Providing statistic_type and statistic_source is mutually exclusive of statistic_ids"
234 if not (missing_statistic_id := statistic_ids.difference(results)):
239 session, statistic_ids=missing_statistic_id
243 self, statistic_ids: set[str]
244 ) -> dict[str, tuple[int, StatisticMetaData]]:
245 """Get metadata from cache.
247 This call is thread safe and can be run in the event loop,
248 the database executor, or the recorder thread.
251 statistic_id: id_meta
252 for statistic_id
in statistic_ids
256 if (id_meta := self._stat_id_to_id_meta.
get(statistic_id))
262 new_metadata: StatisticMetaData,
263 old_metadata_dict: dict[str, tuple[int, StatisticMetaData]],
264 ) -> tuple[str |
None, int]:
265 """Get metadata_id for a statistic_id.
267 If the statistic_id is previously unknown, add it. If it's already known, update
270 Updating metadata source is not possible.
272 Returns a tuple of (statistic_id | None, metadata_id).
274 statistic_id is None if the metadata was not updated
276 This call is not thread-safe and must be called from the
279 statistic_id = new_metadata[
"statistic_id"]
280 if statistic_id
not in old_metadata_dict:
281 return statistic_id, self.
_add_metadata_add_metadata(session, statistic_id, new_metadata)
283 session, statistic_id, new_metadata, old_metadata_dict
287 self, session: Session, statistic_id: str, new_unit: str |
None
289 """Update the unit of measurement for a statistic_id.
291 This call is not thread-safe and must be called from the
295 session.query(StatisticsMeta).filter(
296 StatisticsMeta.statistic_id == statistic_id
297 ).
update({StatisticsMeta.unit_of_measurement: new_unit})
304 old_statistic_id: str,
305 new_statistic_id: str,
307 """Update the statistic_id for a statistic_id.
309 This call is not thread-safe and must be called from the
313 if self.
getget(session, new_statistic_id):
315 "Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use",
320 session.query(StatisticsMeta).filter(
321 (StatisticsMeta.statistic_id == old_statistic_id)
322 & (StatisticsMeta.source == source)
323 ).
update({StatisticsMeta.statistic_id: new_statistic_id})
326 def delete(self, session: Session, statistic_ids: list[str]) ->
None:
327 """Clear statistics for a list of statistic_ids.
329 This call is not thread-safe and must be called from the
333 session.query(StatisticsMeta).filter(
334 StatisticsMeta.statistic_id.in_(statistic_ids)
335 ).
delete(synchronize_session=
False)
339 """Reset the cache."""
340 self._stat_id_to_id_meta.clear()
343 """Adjust the LRU cache size.
345 This call is not thread-safe and must be called from the
348 lru: LRU = self._stat_id_to_id_meta
349 if new_size > lru.get_size():
350 lru.set_size(new_size)
IssData update(pyiss.ISS iss)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)