Home Assistant Unofficial Reference 2024.12.1
gateway.py
Go to the documentation of this file.
1 """The sms gateway to interact with a GSM modem."""
2 
3 import logging
4 
5 import gammu
6 from gammu.asyncworker import GammuAsyncWorker
7 
8 from homeassistant.core import callback
9 
10 from .const import DOMAIN, SMS_STATE_UNREAD
11 
12 _LOGGER = logging.getLogger(__name__)
13 
14 
15 class Gateway:
16  """SMS gateway to interact with a GSM modem."""
17 
18  def __init__(self, config, hass):
19  """Initialize the sms gateway."""
20  _LOGGER.debug("Init with connection mode:%s", config["Connection"])
21  self._worker_worker = GammuAsyncWorker(self.sms_pullsms_pull)
22  self._worker_worker.configure(config)
23  self._hass_hass = hass
24  self._first_pull_first_pull = True
25  self.manufacturermanufacturer = None
26  self.modelmodel = None
27  self.firmwarefirmware = None
28 
29  async def init_async(self):
30  """Initialize the sms gateway asynchronously. This method is also called in config flow to verify connection."""
31  await self._worker_worker.init_async()
32  self.manufacturermanufacturer = await self.get_manufacturer_asyncget_manufacturer_async()
33  self.modelmodel = await self.get_model_asyncget_model_async()
34  self.firmwarefirmware = await self.get_firmware_asyncget_firmware_async()
35 
36  def sms_pull(self, state_machine):
37  """Pull device.
38 
39  @param state_machine: state machine
40  @type state_machine: gammu.StateMachine
41  """
42  state_machine.ReadDevice()
43 
44  _LOGGER.debug("Pulling modem")
45  self.sms_read_messagessms_read_messages(state_machine, self._first_pull_first_pull)
46  self._first_pull_first_pull = False
47 
48  def sms_read_messages(self, state_machine, force=False):
49  """Read all received SMS messages.
50 
51  @param state_machine: state machine which invoked action
52  @type state_machine: gammu.StateMachine
53  """
54  entries = self.get_and_delete_all_smsget_and_delete_all_sms(state_machine, force)
55  _LOGGER.debug("SMS entries:%s", entries)
56  data = []
57 
58  for entry in entries:
59  decoded_entry = gammu.DecodeSMS(entry)
60  message = entry[0]
61  _LOGGER.debug("Processing sms:%s,decoded:%s", message, decoded_entry)
62  sms_state = message["State"]
63  _LOGGER.debug("SMS state:%s", sms_state)
64  if sms_state == SMS_STATE_UNREAD:
65  if decoded_entry is None:
66  text = message["Text"]
67  else:
68  text = ""
69  for inner_entry in decoded_entry["Entries"]:
70  if inner_entry["Buffer"] is not None:
71  text += inner_entry["Buffer"]
72 
73  event_data = {
74  "phone": message["Number"],
75  "date": str(message["DateTime"]),
76  "message": text,
77  }
78 
79  _LOGGER.debug("Append event data:%s", event_data)
80  data.append(event_data)
81 
82  self._hass_hass.add_job(self._notify_incoming_sms_notify_incoming_sms, data)
83 
84  def get_and_delete_all_sms(self, state_machine, force=False):
85  """Read and delete all SMS in the modem."""
86  # Read SMS memory status ...
87  memory = state_machine.GetSMSStatus()
88  # ... and calculate number of messages
89  remaining = memory["SIMUsed"] + memory["PhoneUsed"]
90  start_remaining = remaining
91  # Get all sms
92  start = True
93  entries = []
94  all_parts = -1
95  _LOGGER.debug("Start remaining:%i", start_remaining)
96 
97  try:
98  while remaining > 0:
99  if start:
100  entry = state_machine.GetNextSMS(Folder=0, Start=True)
101  all_parts = entry[0]["UDH"]["AllParts"]
102  part_number = entry[0]["UDH"]["PartNumber"]
103  part_is_missing = all_parts > start_remaining
104  _LOGGER.debug("All parts:%i", all_parts)
105  _LOGGER.debug("Part Number:%i", part_number)
106  _LOGGER.debug("Remaining:%i", remaining)
107  _LOGGER.debug("Start is_part_missing:%s", part_is_missing)
108  start = False
109  else:
110  entry = state_machine.GetNextSMS(
111  Folder=0, Location=entry[0]["Location"]
112  )
113 
114  if part_is_missing and not force:
115  _LOGGER.debug("Not all parts have arrived")
116  break
117 
118  remaining = remaining - 1
119  entries.append(entry)
120 
121  # delete retrieved sms
122  _LOGGER.debug("Deleting message")
123  try:
124  state_machine.DeleteSMS(Folder=0, Location=entry[0]["Location"])
125  except gammu.ERR_MEMORY_NOT_AVAILABLE:
126  _LOGGER.error("Error deleting SMS, memory not available")
127 
128  except gammu.ERR_EMPTY:
129  # error is raised if memory is empty (this induces wrong reported
130  # memory status)
131  _LOGGER.warning("Failed to read messages!")
132 
133  # Link all SMS when there are concatenated messages
134  return gammu.LinkSMS(entries)
135 
136  @callback
137  def _notify_incoming_sms(self, messages):
138  """Notify hass when an incoming SMS message is received."""
139  for message in messages:
140  event_data = {
141  "phone": message["phone"],
142  "date": message["date"],
143  "text": message["message"],
144  }
145  self._hass_hass.bus.async_fire(f"{DOMAIN}.incoming_sms", event_data)
146 
147  async def send_sms_async(self, message):
148  """Send sms message via the worker."""
149  return await self._worker_worker.send_sms_async(message)
150 
151  async def get_imei_async(self):
152  """Get the IMEI of the device."""
153  return await self._worker_worker.get_imei_async()
154 
155  async def get_signal_quality_async(self):
156  """Get the current signal level of the modem."""
157  return await self._worker_worker.get_signal_quality_async()
158 
159  async def get_network_info_async(self):
160  """Get the current network info of the modem."""
161  network_info = await self._worker_worker.get_network_info_async()
162  # Looks like there is a bug and it's empty for any modem https://github.com/gammu/python-gammu/issues/31, so try workaround
163  if not network_info["NetworkName"]:
164  network_info["NetworkName"] = gammu.GSMNetworks.get(
165  network_info["NetworkCode"]
166  )
167  return network_info
168 
169  async def get_manufacturer_async(self):
170  """Get the manufacturer of the modem."""
171  return await self._worker_worker.get_manufacturer_async()
172 
173  async def get_model_async(self):
174  """Get the model of the modem."""
175  model = await self._worker_worker.get_model_async()
176  if not model or not model[0]:
177  return None
178  display = model[0] # Identification model
179  if model[1]: # Real model
180  display = f"{display} ({model[1]})"
181  return display
182 
183  async def get_firmware_async(self):
184  """Get the firmware information of the modem."""
185  firmware = await self._worker_worker.get_firmware_async()
186  if not firmware or not firmware[0]:
187  return None
188  display = firmware[0] # Version
189  if firmware[1]: # Date
190  display = f"{display} ({firmware[1]})"
191  return display
192 
193  async def terminate_async(self):
194  """Terminate modem connection."""
195  return await self._worker_worker.terminate_async()
196 
197 
198 async def create_sms_gateway(config, hass):
199  """Create the sms gateway."""
200  try:
201  gateway = Gateway(config, hass)
202  try:
203  await gateway.init_async()
204  except gammu.GSMError as exc:
205  _LOGGER.error("Failed to initialize, error %s", exc)
206  await gateway.terminate_async()
207  return None
208  except gammu.GSMError as exc:
209  _LOGGER.error("Failed to create async worker, error %s", exc)
210  return None
211  return gateway
def get_and_delete_all_sms(self, state_machine, force=False)
Definition: gateway.py:84
def sms_read_messages(self, state_machine, force=False)
Definition: gateway.py:48
def sms_pull(self, state_machine)
Definition: gateway.py:36
def __init__(self, config, hass)
Definition: gateway.py:18
def create_sms_gateway(config, hass)
Definition: gateway.py:198