1 """Helper functions for the SIA integration."""
3 from __future__
import annotations
5 from datetime
import datetime, timedelta
8 from pysiaalarm
import SIAEvent
9 from pysiaalarm.utils
import MessageTypes
23 PING_INTERVAL_MARGIN = 30
33 """Return the unique_id and name for an entity."""
36 f
"{entry_id}_{account}_{zone}"
37 if entity_key == KEY_ALARM
38 else f
"{entry_id}_{account}_{zone}_{entity_key}"
41 f
"{port} - {account} - {entity_key}"
42 if zone == SIA_HUB_ZONE
43 else f
"{port} - {account} - zone {zone} - {entity_key}"
49 """Return the interval to the next unavailability check."""
50 return timedelta(minutes=ping, seconds=PING_INTERVAL_MARGIN).total_seconds()
54 """Create the attributes dict from a SIAEvent."""
55 timestamp = event.timestamp
if event.timestamp
else utcnow()
58 ATTR_CODE: event.code,
59 ATTR_MESSAGE: event.message,
61 ATTR_TIMESTAMP: timestamp.isoformat()
62 if isinstance(timestamp, datetime)
68 """Create a dict from the SIA Event for the HA Event."""
70 "message_type": event.message_type.value
71 if isinstance(event.message_type, MessageTypes)
72 else event.message_type,
73 "receiver": event.receiver,
75 "account": event.account,
76 "sequence": event.sequence,
77 "content": event.content,
82 "message": event.message,
83 "x_data": event.x_data,
84 "timestamp": event.timestamp.isoformat()
85 if isinstance(event.timestamp, datetime)
87 "event_qualifier": event.event_qualifier,
88 "event_type": event.event_type,
89 "partition": event.partition,
92 "identifier": xd.identifier,
94 "description": xd.description,
96 "characters": xd.characters,
99 for xd
in event.extended_data
101 if event.extended_data
is not None
104 "code": event.sia_code.code,
105 "type": event.sia_code.type,
106 "description": event.sia_code.description,
107 "concerns": event.sia_code.concerns,
109 if event.sia_code
is not None
dict[str, Any] get_attr_from_sia_event(SIAEvent event)
tuple[str, str] get_unique_id_and_name(str entry_id, int port, str account, int zone, str entity_key)
dict[str, Any] get_event_data_from_sia_event(SIAEvent event)
float get_unavailability_interval(int ping)