1 """Support for controlling GPIO pins of a Numato Labs USB GPIO expander."""
3 from collections.abc
import Callable
6 import numato_gpio
as gpio
7 import voluptuous
as vol
15 EVENT_HOMEASSISTANT_START,
16 EVENT_HOMEASSISTANT_STOP,
25 _LOGGER = logging.getLogger(__name__)
29 CONF_INVERT_LOGIC =
"invert_logic"
30 CONF_DISCOVER =
"discover"
31 CONF_DEVICES =
"devices"
34 CONF_SRC_RANGE =
"source_range"
35 CONF_DST_RANGE =
"destination_range"
36 CONF_DST_UNIT =
"unit"
37 DEFAULT_INVERT_LOGIC =
False
38 DEFAULT_SRC_RANGE = [0, 1024]
39 DEFAULT_DST_RANGE = [0.0, 100.0]
40 DEFAULT_DEV = [f
"/dev/ttyACM{i}" for i
in range(10)]
42 PORT_RANGE = range(1, 8)
44 DATA_PORTS_IN_USE =
"ports_in_use"
49 """Validate the input array to describe a range by two integers."""
50 if not (isinstance(rng[0], int)
and isinstance(rng[1], int)):
51 raise vol.Invalid(f
"Only integers are allowed: {rng}")
53 raise vol.Invalid(f
"Only two numbers allowed in a range: {rng}")
55 raise vol.Invalid(f
"Lower range bound must come first: {rng}")
60 """Validate the input array to describe a range by two floats."""
62 coe = vol.Coerce(float)
65 except vol.CoerceInvalid
as err:
66 raise vol.Invalid(f
"Only int or float values are allowed: {rng}")
from err
68 raise vol.Invalid(f
"Only two numbers allowed in a range: {rng}")
70 raise vol.Invalid(f
"Lower range bound must come first: {rng}")
75 """Validate input number to be in the range of ADC enabled ports."""
78 except ValueError
as err:
79 raise vol.Invalid(f
"Port numbers must be integers: {num}")
from err
80 if num
not in range(1, 8):
81 raise vol.Invalid(f
"Only port numbers from 1 to 7 are ADC capable: {num}")
85 ADC_SCHEMA = vol.Schema(
87 vol.Required(CONF_NAME): cv.string,
88 vol.Optional(CONF_SRC_RANGE, default=DEFAULT_SRC_RANGE): int_range,
89 vol.Optional(CONF_DST_RANGE, default=DEFAULT_DST_RANGE): float_range,
90 vol.Optional(CONF_DST_UNIT, default=PERCENTAGE): cv.string,
94 PORTS_SCHEMA = vol.Schema({cv.positive_int: cv.string})
96 IO_PORTS_SCHEMA = vol.Schema(
98 vol.Required(CONF_PORTS): PORTS_SCHEMA,
99 vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean,
103 DEVICE_SCHEMA = vol.Schema(
105 vol.Required(CONF_ID): cv.positive_int,
106 CONF_BINARY_SENSORS: IO_PORTS_SCHEMA,
107 CONF_SWITCHES: IO_PORTS_SCHEMA,
108 CONF_SENSORS: {CONF_PORTS: {adc_port_number: ADC_SCHEMA}},
112 CONFIG_SCHEMA = vol.Schema(
115 CONF_DEVICES: vol.All(cv.ensure_list, [DEVICE_SCHEMA]),
116 vol.Optional(CONF_DISCOVER, default=DEFAULT_DEV): vol.All(
117 cv.ensure_list, [cv.string]
121 extra=vol.ALLOW_EXTRA,
125 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
126 """Initialize the numato integration.
128 Discovers available Numato devices and loads the binary_sensor, sensor and
131 Returns False on error during device discovery (e.g. duplicate ID),
132 otherwise returns True.
134 No exceptions should occur, since the platforms are initialized on a best
135 effort basis, which means, errors are handled locally.
137 hass.data[DOMAIN] = config[DOMAIN]
140 gpio.discover(config[DOMAIN][CONF_DISCOVER])
141 except gpio.NumatoGpioError
as err:
142 _LOGGER.error(
"Error discovering Numato devices: %s", err)
147 "Initializing Numato 32 port USB GPIO expanders with IDs: %s",
148 ", ".join(
str(d)
for d
in gpio.devices),
151 hass.data[DOMAIN][DATA_API] =
NumatoAPI()
153 def cleanup_gpio(event: Event) ->
None:
154 """Stuff to do before stopping."""
155 _LOGGER.debug(
"Clean up Numato GPIO")
157 if DATA_API
in hass.data[DOMAIN]:
158 hass.data[DOMAIN][DATA_API].ports_registered.clear()
160 def prepare_gpio(event: Event) ->
None:
161 """Stuff to do when home assistant starts."""
162 _LOGGER.debug(
"Setup cleanup at stop for Numato GPIO")
163 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
165 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpio)
167 load_platform(hass, Platform.BINARY_SENSOR, DOMAIN, {}, config)
174 """Home-Assistant specific API for numato device access."""
177 """Initialize API state."""
178 self.ports_registered: dict[tuple[int, int], int] = {}
181 """Check whether a port is still free set up.
183 Fail with exception if it has already been registered.
185 if (device_id, port)
not in self.ports_registered:
186 self.ports_registered[(device_id, port)] = direction
190 if self.ports_registered[(device_id, port)] == gpio.IN
193 raise gpio.NumatoGpioError(
194 f
"Device {device_id} port {port} already in use as {io}."
198 """Check whether a device has been discovered.
202 if device_id
not in gpio.devices:
203 raise gpio.NumatoGpioError(f
"Device {device_id} not available.")
205 def check_port(self, device_id: int, port: int, direction: int) ->
None:
206 """Raise an error if the port setup doesn't match the direction."""
208 if (device_id, port)
not in self.ports_registered:
209 raise gpio.NumatoGpioError(
210 f
"Port {port} is not set up for numato device {device_id}."
214 f
"Trying to write to device {device_id} port {port} set up as input."
217 f
"Trying to read from device {device_id} port {port} set up as output."
220 if self.ports_registered[(device_id, port)] != direction:
221 raise gpio.NumatoGpioError(msg[direction])
224 """Set up a GPIO as output."""
227 gpio.devices[device_id].
setup(port, gpio.OUT)
230 """Set up a GPIO as input."""
232 gpio.devices[device_id].
setup(port, gpio.IN)
236 """Write a value to a GPIO."""
237 self.
check_portcheck_port(device_id, port, gpio.OUT)
238 gpio.devices[device_id].write(port, value)
241 """Read a value from a GPIO."""
242 self.
check_portcheck_port(device_id, port, gpio.IN)
243 return gpio.devices[device_id].read(port)
246 """Read an ADC value from a GPIO ADC port."""
247 self.
check_portcheck_port(device_id, port, gpio.IN)
249 return gpio.devices[device_id].adc_read(port)
252 self, device_id: int, port: int, event_callback: Callable[[int, bool],
None]
254 """Add detection for RISING and FALLING events."""
255 self.
check_portcheck_port(device_id, port, gpio.IN)
256 gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH)
257 gpio.devices[device_id].notify =
True
int read_adc_input(self, int device_id, int port)
None check_port_free(self, int device_id, int port, int direction)
int read_input(self, int device_id, int port)
None setup_input(self, int device_id, int port)
None edge_detect(self, int device_id, int port, Callable[[int, bool], None] event_callback)
None check_device_id(self, int device_id)
None write_output(self, int device_id, int port, int value)
None setup_output(self, int device_id, int port)
None check_port(self, int device_id, int port, int direction)
bool setup(HomeAssistant hass, ConfigType config)
None load_platform(core.HomeAssistant hass, Platform|str component, str platform, DiscoveryInfoType|None discovered, ConfigType hass_config)