Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for controlling GPIO pins of a Numato Labs USB GPIO expander."""
2 
3 from collections.abc import Callable
4 import logging
5 
6 import numato_gpio as gpio
7 import voluptuous as vol
8 
9 from homeassistant.const import (
10  CONF_BINARY_SENSORS,
11  CONF_ID,
12  CONF_NAME,
13  CONF_SENSORS,
14  CONF_SWITCHES,
15  EVENT_HOMEASSISTANT_START,
16  EVENT_HOMEASSISTANT_STOP,
17  PERCENTAGE,
18  Platform,
19 )
20 from homeassistant.core import Event, HomeAssistant
22 from homeassistant.helpers.discovery import load_platform
23 from homeassistant.helpers.typing import ConfigType
24 
25 _LOGGER = logging.getLogger(__name__)
26 
27 DOMAIN = "numato"
28 
29 CONF_INVERT_LOGIC = "invert_logic"
30 CONF_DISCOVER = "discover"
31 CONF_DEVICES = "devices"
32 CONF_DEVICE_ID = "id"
33 CONF_PORTS = "ports"
34 CONF_SRC_RANGE = "source_range"
35 CONF_DST_RANGE = "destination_range"
36 CONF_DST_UNIT = "unit"
37 DEFAULT_INVERT_LOGIC = False
38 DEFAULT_SRC_RANGE = [0, 1024]
39 DEFAULT_DST_RANGE = [0.0, 100.0]
40 DEFAULT_DEV = [f"/dev/ttyACM{i}" for i in range(10)]
41 
42 PORT_RANGE = range(1, 8) # ports 0-7 are ADC capable
43 
44 DATA_PORTS_IN_USE = "ports_in_use"
45 DATA_API = "api"
46 
47 
48 def int_range(rng):
49  """Validate the input array to describe a range by two integers."""
50  if not (isinstance(rng[0], int) and isinstance(rng[1], int)):
51  raise vol.Invalid(f"Only integers are allowed: {rng}")
52  if len(rng) != 2:
53  raise vol.Invalid(f"Only two numbers allowed in a range: {rng}")
54  if rng[0] > rng[1]:
55  raise vol.Invalid(f"Lower range bound must come first: {rng}")
56  return rng
57 
58 
59 def float_range(rng):
60  """Validate the input array to describe a range by two floats."""
61  try:
62  coe = vol.Coerce(float)
63  coe(rng[0])
64  coe(rng[1])
65  except vol.CoerceInvalid as err:
66  raise vol.Invalid(f"Only int or float values are allowed: {rng}") from err
67  if len(rng) != 2:
68  raise vol.Invalid(f"Only two numbers allowed in a range: {rng}")
69  if rng[0] > rng[1]:
70  raise vol.Invalid(f"Lower range bound must come first: {rng}")
71  return rng
72 
73 
74 def adc_port_number(num):
75  """Validate input number to be in the range of ADC enabled ports."""
76  try:
77  num = int(num)
78  except ValueError as err:
79  raise vol.Invalid(f"Port numbers must be integers: {num}") from err
80  if num not in range(1, 8):
81  raise vol.Invalid(f"Only port numbers from 1 to 7 are ADC capable: {num}")
82  return num
83 
84 
85 ADC_SCHEMA = vol.Schema(
86  {
87  vol.Required(CONF_NAME): cv.string,
88  vol.Optional(CONF_SRC_RANGE, default=DEFAULT_SRC_RANGE): int_range,
89  vol.Optional(CONF_DST_RANGE, default=DEFAULT_DST_RANGE): float_range,
90  vol.Optional(CONF_DST_UNIT, default=PERCENTAGE): cv.string,
91  }
92 )
93 
94 PORTS_SCHEMA = vol.Schema({cv.positive_int: cv.string})
95 
96 IO_PORTS_SCHEMA = vol.Schema(
97  {
98  vol.Required(CONF_PORTS): PORTS_SCHEMA,
99  vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean,
100  }
101 )
102 
103 DEVICE_SCHEMA = vol.Schema(
104  {
105  vol.Required(CONF_ID): cv.positive_int,
106  CONF_BINARY_SENSORS: IO_PORTS_SCHEMA,
107  CONF_SWITCHES: IO_PORTS_SCHEMA,
108  CONF_SENSORS: {CONF_PORTS: {adc_port_number: ADC_SCHEMA}},
109  }
110 )
111 
112 CONFIG_SCHEMA = vol.Schema(
113  {
114  DOMAIN: {
115  CONF_DEVICES: vol.All(cv.ensure_list, [DEVICE_SCHEMA]),
116  vol.Optional(CONF_DISCOVER, default=DEFAULT_DEV): vol.All(
117  cv.ensure_list, [cv.string]
118  ),
119  },
120  },
121  extra=vol.ALLOW_EXTRA,
122 )
123 
124 
125 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
126  """Initialize the numato integration.
127 
128  Discovers available Numato devices and loads the binary_sensor, sensor and
129  switch platforms.
130 
131  Returns False on error during device discovery (e.g. duplicate ID),
132  otherwise returns True.
133 
134  No exceptions should occur, since the platforms are initialized on a best
135  effort basis, which means, errors are handled locally.
136  """
137  hass.data[DOMAIN] = config[DOMAIN]
138 
139  try:
140  gpio.discover(config[DOMAIN][CONF_DISCOVER])
141  except gpio.NumatoGpioError as err:
142  _LOGGER.error("Error discovering Numato devices: %s", err)
143  gpio.cleanup()
144  return False
145 
146  _LOGGER.debug(
147  "Initializing Numato 32 port USB GPIO expanders with IDs: %s",
148  ", ".join(str(d) for d in gpio.devices),
149  )
150 
151  hass.data[DOMAIN][DATA_API] = NumatoAPI()
152 
153  def cleanup_gpio(event: Event) -> None:
154  """Stuff to do before stopping."""
155  _LOGGER.debug("Clean up Numato GPIO")
156  gpio.cleanup()
157  if DATA_API in hass.data[DOMAIN]:
158  hass.data[DOMAIN][DATA_API].ports_registered.clear()
159 
160  def prepare_gpio(event: Event) -> None:
161  """Stuff to do when home assistant starts."""
162  _LOGGER.debug("Setup cleanup at stop for Numato GPIO")
163  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
164 
165  hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpio)
166 
167  load_platform(hass, Platform.BINARY_SENSOR, DOMAIN, {}, config)
168  load_platform(hass, Platform.SENSOR, DOMAIN, {}, config)
169  load_platform(hass, Platform.SWITCH, DOMAIN, {}, config)
170  return True
171 
172 
173 class NumatoAPI:
174  """Home-Assistant specific API for numato device access."""
175 
176  def __init__(self) -> None:
177  """Initialize API state."""
178  self.ports_registered: dict[tuple[int, int], int] = {}
179 
180  def check_port_free(self, device_id: int, port: int, direction: int) -> None:
181  """Check whether a port is still free set up.
182 
183  Fail with exception if it has already been registered.
184  """
185  if (device_id, port) not in self.ports_registered:
186  self.ports_registered[(device_id, port)] = direction
187  else:
188  io = (
189  "input"
190  if self.ports_registered[(device_id, port)] == gpio.IN
191  else "output"
192  )
193  raise gpio.NumatoGpioError(
194  f"Device {device_id} port {port} already in use as {io}."
195  )
196 
197  def check_device_id(self, device_id: int) -> None:
198  """Check whether a device has been discovered.
199 
200  Fail with exception.
201  """
202  if device_id not in gpio.devices:
203  raise gpio.NumatoGpioError(f"Device {device_id} not available.")
204 
205  def check_port(self, device_id: int, port: int, direction: int) -> None:
206  """Raise an error if the port setup doesn't match the direction."""
207  self.check_device_idcheck_device_id(device_id)
208  if (device_id, port) not in self.ports_registered:
209  raise gpio.NumatoGpioError(
210  f"Port {port} is not set up for numato device {device_id}."
211  )
212  msg = {
213  gpio.OUT: (
214  f"Trying to write to device {device_id} port {port} set up as input."
215  ),
216  gpio.IN: (
217  f"Trying to read from device {device_id} port {port} set up as output."
218  ),
219  }
220  if self.ports_registered[(device_id, port)] != direction:
221  raise gpio.NumatoGpioError(msg[direction])
222 
223  def setup_output(self, device_id: int, port: int) -> None:
224  """Set up a GPIO as output."""
225  self.check_device_idcheck_device_id(device_id)
226  self.check_port_freecheck_port_free(device_id, port, gpio.OUT)
227  gpio.devices[device_id].setup(port, gpio.OUT)
228 
229  def setup_input(self, device_id: int, port: int) -> None:
230  """Set up a GPIO as input."""
231  self.check_device_idcheck_device_id(device_id)
232  gpio.devices[device_id].setup(port, gpio.IN)
233  self.check_port_freecheck_port_free(device_id, port, gpio.IN)
234 
235  def write_output(self, device_id: int, port: int, value: int) -> None:
236  """Write a value to a GPIO."""
237  self.check_portcheck_port(device_id, port, gpio.OUT)
238  gpio.devices[device_id].write(port, value)
239 
240  def read_input(self, device_id: int, port: int) -> int:
241  """Read a value from a GPIO."""
242  self.check_portcheck_port(device_id, port, gpio.IN)
243  return gpio.devices[device_id].read(port)
244 
245  def read_adc_input(self, device_id: int, port: int) -> int:
246  """Read an ADC value from a GPIO ADC port."""
247  self.check_portcheck_port(device_id, port, gpio.IN)
248  self.check_device_idcheck_device_id(device_id)
249  return gpio.devices[device_id].adc_read(port)
250 
252  self, device_id: int, port: int, event_callback: Callable[[int, bool], None]
253  ) -> None:
254  """Add detection for RISING and FALLING events."""
255  self.check_portcheck_port(device_id, port, gpio.IN)
256  gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH)
257  gpio.devices[device_id].notify = True
int read_adc_input(self, int device_id, int port)
Definition: __init__.py:245
None check_port_free(self, int device_id, int port, int direction)
Definition: __init__.py:180
int read_input(self, int device_id, int port)
Definition: __init__.py:240
None setup_input(self, int device_id, int port)
Definition: __init__.py:229
None edge_detect(self, int device_id, int port, Callable[[int, bool], None] event_callback)
Definition: __init__.py:253
None check_device_id(self, int device_id)
Definition: __init__.py:197
None write_output(self, int device_id, int port, int value)
Definition: __init__.py:235
None setup_output(self, int device_id, int port)
Definition: __init__.py:223
None check_port(self, int device_id, int port, int direction)
Definition: __init__.py:205
bool setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:125
None load_platform(core.HomeAssistant hass, Platform|str component, str platform, DiscoveryInfoType|None discovered, ConfigType hass_config)
Definition: discovery.py:137