Home Assistant Unofficial Reference 2024.12.1
hub.py
Go to the documentation of this file.
1 """Support for Automation Device Specification (ADS)."""
2 
3 from collections import namedtuple
4 import ctypes
5 import logging
6 import struct
7 import threading
8 
9 import pyads
10 
11 _LOGGER = logging.getLogger(__name__)
12 
13 # Tuple to hold data needed for notification
14 NotificationItem = namedtuple( # noqa: PYI024
15  "NotificationItem", "hnotify huser name plc_datatype callback"
16 )
17 
18 
19 class AdsHub:
20  """Representation of an ADS connection."""
21 
22  def __init__(self, ads_client):
23  """Initialize the ADS hub."""
24  self._client_client = ads_client
25  self._client_client.open()
26 
27  # All ADS devices are registered here
28  self._devices_devices = []
29  self._notification_items_notification_items = {}
30  self._lock_lock = threading.Lock()
31 
32  def shutdown(self, *args, **kwargs):
33  """Shutdown ADS connection."""
34 
35  _LOGGER.debug("Shutting down ADS")
36  for notification_item in self._notification_items_notification_items.values():
37  _LOGGER.debug(
38  "Deleting device notification %d, %d",
39  notification_item.hnotify,
40  notification_item.huser,
41  )
42  try:
43  self._client_client.del_device_notification(
44  notification_item.hnotify, notification_item.huser
45  )
46  except pyads.ADSError as err:
47  _LOGGER.error(err)
48  try:
49  self._client_client.close()
50  except pyads.ADSError as err:
51  _LOGGER.error(err)
52 
53  def register_device(self, device):
54  """Register a new device."""
55  self._devices_devices.append(device)
56 
57  def write_by_name(self, name, value, plc_datatype):
58  """Write a value to the device."""
59 
60  with self._lock_lock:
61  try:
62  return self._client_client.write_by_name(name, value, plc_datatype)
63  except pyads.ADSError as err:
64  _LOGGER.error("Error writing %s: %s", name, err)
65 
66  def read_by_name(self, name, plc_datatype):
67  """Read a value from the device."""
68 
69  with self._lock_lock:
70  try:
71  return self._client_client.read_by_name(name, plc_datatype)
72  except pyads.ADSError as err:
73  _LOGGER.error("Error reading %s: %s", name, err)
74 
75  def add_device_notification(self, name, plc_datatype, callback):
76  """Add a notification to the ADS devices."""
77 
78  attr = pyads.NotificationAttrib(ctypes.sizeof(plc_datatype))
79 
80  with self._lock_lock:
81  try:
82  hnotify, huser = self._client_client.add_device_notification(
83  name, attr, self._device_notification_callback_device_notification_callback
84  )
85  except pyads.ADSError as err:
86  _LOGGER.error("Error subscribing to %s: %s", name, err)
87  else:
88  hnotify = int(hnotify)
89  self._notification_items_notification_items[hnotify] = NotificationItem(
90  hnotify, huser, name, plc_datatype, callback
91  )
92 
93  _LOGGER.debug(
94  "Added device notification %d for variable %s", hnotify, name
95  )
96 
97  def _device_notification_callback(self, notification, name):
98  """Handle device notifications."""
99  contents = notification.contents
100  hnotify = int(contents.hNotification)
101  _LOGGER.debug("Received notification %d", hnotify)
102 
103  # Get dynamically sized data array
104  data_size = contents.cbSampleSize
105  data_address = (
106  ctypes.addressof(contents)
107  + pyads.structs.SAdsNotificationHeader.data.offset
108  )
109  data = (ctypes.c_ubyte * data_size).from_address(data_address)
110 
111  # Acquire notification item
112  with self._lock_lock:
113  notification_item = self._notification_items_notification_items.get(hnotify)
114 
115  if not notification_item:
116  _LOGGER.error("Unknown device notification handle: %d", hnotify)
117  return
118 
119  # Data parsing based on PLC data type
120  plc_datatype = notification_item.plc_datatype
121  unpack_formats = {
122  pyads.PLCTYPE_BYTE: "<b",
123  pyads.PLCTYPE_INT: "<h",
124  pyads.PLCTYPE_UINT: "<H",
125  pyads.PLCTYPE_SINT: "<b",
126  pyads.PLCTYPE_USINT: "<B",
127  pyads.PLCTYPE_DINT: "<i",
128  pyads.PLCTYPE_UDINT: "<I",
129  pyads.PLCTYPE_WORD: "<H",
130  pyads.PLCTYPE_DWORD: "<I",
131  pyads.PLCTYPE_LREAL: "<d",
132  pyads.PLCTYPE_REAL: "<f",
133  pyads.PLCTYPE_TOD: "<i", # Treat as DINT
134  pyads.PLCTYPE_DATE: "<i", # Treat as DINT
135  pyads.PLCTYPE_DT: "<i", # Treat as DINT
136  pyads.PLCTYPE_TIME: "<i", # Treat as DINT
137  }
138 
139  if plc_datatype == pyads.PLCTYPE_BOOL:
140  value = bool(struct.unpack("<?", bytearray(data))[0])
141  elif plc_datatype == pyads.PLCTYPE_STRING:
142  value = (
143  bytearray(data).split(b"\x00", 1)[0].decode("utf-8", errors="ignore")
144  )
145  elif plc_datatype in unpack_formats:
146  value = struct.unpack(unpack_formats[plc_datatype], bytearray(data))[0]
147  else:
148  value = bytearray(data)
149  _LOGGER.warning("No callback available for this datatype")
150 
151  notification_item.callback(notification_item.name, value)
def register_device(self, device)
Definition: hub.py:53
def add_device_notification(self, name, plc_datatype, callback)
Definition: hub.py:75
def read_by_name(self, name, plc_datatype)
Definition: hub.py:66
def shutdown(self, *args, **kwargs)
Definition: hub.py:32
def __init__(self, ads_client)
Definition: hub.py:22
def _device_notification_callback(self, notification, name)
Definition: hub.py:97
def write_by_name(self, name, value, plc_datatype)
Definition: hub.py:57
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None open(self, **Any kwargs)
Definition: lock.py:86