1 """Support for Automation Device Specification (ADS)."""
3 from collections
import namedtuple
11 _LOGGER = logging.getLogger(__name__)
14 NotificationItem = namedtuple(
15 "NotificationItem",
"hnotify huser name plc_datatype callback"
20 """Representation of an ADS connection."""
23 """Initialize the ADS hub."""
30 self.
_lock_lock = threading.Lock()
33 """Shutdown ADS connection."""
35 _LOGGER.debug(
"Shutting down ADS")
38 "Deleting device notification %d, %d",
39 notification_item.hnotify,
40 notification_item.huser,
43 self.
_client_client.del_device_notification(
44 notification_item.hnotify, notification_item.huser
46 except pyads.ADSError
as err:
50 except pyads.ADSError
as err:
54 """Register a new device."""
58 """Write a value to the device."""
63 except pyads.ADSError
as err:
64 _LOGGER.error(
"Error writing %s: %s", name, err)
67 """Read a value from the device."""
72 except pyads.ADSError
as err:
73 _LOGGER.error(
"Error reading %s: %s", name, err)
76 """Add a notification to the ADS devices."""
78 attr = pyads.NotificationAttrib(ctypes.sizeof(plc_datatype))
85 except pyads.ADSError
as err:
86 _LOGGER.error(
"Error subscribing to %s: %s", name, err)
88 hnotify =
int(hnotify)
90 hnotify, huser, name, plc_datatype, callback
94 "Added device notification %d for variable %s", hnotify, name
98 """Handle device notifications."""
99 contents = notification.contents
100 hnotify =
int(contents.hNotification)
101 _LOGGER.debug(
"Received notification %d", hnotify)
104 data_size = contents.cbSampleSize
106 ctypes.addressof(contents)
107 + pyads.structs.SAdsNotificationHeader.data.offset
109 data = (ctypes.c_ubyte * data_size).from_address(data_address)
112 with self.
_lock_lock:
115 if not notification_item:
116 _LOGGER.error(
"Unknown device notification handle: %d", hnotify)
120 plc_datatype = notification_item.plc_datatype
122 pyads.PLCTYPE_BYTE:
"<b",
123 pyads.PLCTYPE_INT:
"<h",
124 pyads.PLCTYPE_UINT:
"<H",
125 pyads.PLCTYPE_SINT:
"<b",
126 pyads.PLCTYPE_USINT:
"<B",
127 pyads.PLCTYPE_DINT:
"<i",
128 pyads.PLCTYPE_UDINT:
"<I",
129 pyads.PLCTYPE_WORD:
"<H",
130 pyads.PLCTYPE_DWORD:
"<I",
131 pyads.PLCTYPE_LREAL:
"<d",
132 pyads.PLCTYPE_REAL:
"<f",
133 pyads.PLCTYPE_TOD:
"<i",
134 pyads.PLCTYPE_DATE:
"<i",
135 pyads.PLCTYPE_DT:
"<i",
136 pyads.PLCTYPE_TIME:
"<i",
139 if plc_datatype == pyads.PLCTYPE_BOOL:
140 value = bool(struct.unpack(
"<?", bytearray(data))[0])
141 elif plc_datatype == pyads.PLCTYPE_STRING:
143 bytearray(data).split(b
"\x00", 1)[0].decode(
"utf-8", errors=
"ignore")
145 elif plc_datatype
in unpack_formats:
146 value = struct.unpack(unpack_formats[plc_datatype], bytearray(data))[0]
148 value = bytearray(data)
149 _LOGGER.warning(
"No callback available for this datatype")
151 notification_item.callback(notification_item.name, value)
def register_device(self, device)
def add_device_notification(self, name, plc_datatype, callback)
def read_by_name(self, name, plc_datatype)
def shutdown(self, *args, **kwargs)
def __init__(self, ads_client)
def _device_notification_callback(self, notification, name)
def write_by_name(self, name, value, plc_datatype)
web.Response get(self, web.Request request, str config_key)
None open(self, **Any kwargs)