Home Assistant Unofficial Reference 2024.12.1
board.py
Go to the documentation of this file.
1 """Code to handle a Firmata board."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Mapping
6 import logging
7 from typing import Literal
8 
9 from pymata_express.pymata_express import PymataExpress
10 from pymata_express.pymata_express_serial import serial
11 
12 from homeassistant.const import (
13  CONF_BINARY_SENSORS,
14  CONF_LIGHTS,
15  CONF_NAME,
16  CONF_SENSORS,
17  CONF_SWITCHES,
18 )
19 
20 from .const import (
21  CONF_ARDUINO_INSTANCE_ID,
22  CONF_ARDUINO_WAIT,
23  CONF_SAMPLING_INTERVAL,
24  CONF_SERIAL_BAUD_RATE,
25  CONF_SERIAL_PORT,
26  CONF_SLEEP_TUNE,
27  PIN_TYPE_ANALOG,
28  PIN_TYPE_DIGITAL,
29 )
30 
31 _LOGGER = logging.getLogger(__name__)
32 
33 type FirmataPinType = int | str
34 
35 
37  """Manages a single Firmata board."""
38 
39  def __init__(self, config: Mapping) -> None:
40  """Initialize the board."""
41  self.configconfig = config
42  self.apiapi: PymataExpress = None
43  self.firmware_versionfirmware_version: str | None = None
44  self.protocol_versionprotocol_version = None
45  self.namename = self.configconfig[CONF_NAME]
46  self.switchesswitches = []
47  self.lightslights = []
48  self.binary_sensorsbinary_sensors = []
49  self.sensorssensors = []
50  self.used_pins: list[FirmataPinType] = []
51 
52  if CONF_SWITCHES in self.configconfig:
53  self.switchesswitches = self.configconfig[CONF_SWITCHES]
54  if CONF_LIGHTS in self.configconfig:
55  self.lightslights = self.configconfig[CONF_LIGHTS]
56  if CONF_BINARY_SENSORS in self.configconfig:
57  self.binary_sensorsbinary_sensors = self.configconfig[CONF_BINARY_SENSORS]
58  if CONF_SENSORS in self.configconfig:
59  self.sensorssensors = self.configconfig[CONF_SENSORS]
60 
61  async def async_setup(self, tries=0) -> bool:
62  """Set up a Firmata instance."""
63  try:
64  _LOGGER.debug("Connecting to Firmata %s", self.namename)
65  self.apiapi = await get_board(self.configconfig)
66  except RuntimeError as err:
67  _LOGGER.error("Error connecting to PyMata board %s: %s", self.namename, err)
68  return False
69  except serial.SerialTimeoutException as err:
70  _LOGGER.error(
71  "Timeout writing to serial port for PyMata board %s: %s", self.namename, err
72  )
73  return False
74  except serial.SerialException as err:
75  _LOGGER.error(
76  "Error connecting to serial port for PyMata board %s: %s",
77  self.namename,
78  err,
79  )
80  return False
81 
82  self.firmware_versionfirmware_version = await self.apiapi.get_firmware_version()
83  if not self.firmware_versionfirmware_version:
84  _LOGGER.error(
85  "Error retrieving firmware version from Firmata board %s", self.namename
86  )
87  return False
88 
89  if CONF_SAMPLING_INTERVAL in self.configconfig:
90  try:
91  await self.apiapi.set_sampling_interval(
92  self.configconfig[CONF_SAMPLING_INTERVAL]
93  )
94  except RuntimeError as err:
95  _LOGGER.error(
96  "Error setting sampling interval for PyMata board %s: %s",
97  self.namename,
98  err,
99  )
100  return False
101 
102  _LOGGER.debug("Firmata connection successful for %s", self.namename)
103  return True
104 
105  async def async_reset(self) -> bool:
106  """Reset the board to default state."""
107  _LOGGER.debug("Shutting down board %s", self.namename)
108  # If the board was never setup, continue.
109  if self.apiapi is None:
110  return True
111 
112  await self.apiapi.shutdown()
113  self.apiapi = None
114 
115  return True
116 
117  def mark_pin_used(self, pin: FirmataPinType) -> bool:
118  """Test if a pin is used already on the board or mark as used."""
119  if pin in self.used_pins:
120  return False
121  self.used_pins.append(pin)
122  return True
123 
124  def get_pin_type(self, pin: FirmataPinType) -> tuple[Literal[0, 1], int]:
125  """Return the type and Firmata location of a pin on the board."""
126  pin_type: Literal[0, 1]
127  firmata_pin: int
128  if isinstance(pin, str):
129  pin_type = PIN_TYPE_ANALOG
130  firmata_pin = int(pin[1:])
131  firmata_pin += self.apiapi.first_analog_pin
132  else:
133  pin_type = PIN_TYPE_DIGITAL
134  firmata_pin = pin
135  return (pin_type, firmata_pin)
136 
137 
138 async def get_board(data: Mapping) -> PymataExpress:
139  """Create a Pymata board object."""
140  board_data = {}
141 
142  if CONF_SERIAL_PORT in data:
143  board_data["com_port"] = data[CONF_SERIAL_PORT]
144  if CONF_SERIAL_BAUD_RATE in data:
145  board_data["baud_rate"] = data[CONF_SERIAL_BAUD_RATE]
146  if CONF_ARDUINO_INSTANCE_ID in data:
147  board_data["arduino_instance_id"] = data[CONF_ARDUINO_INSTANCE_ID]
148 
149  if CONF_ARDUINO_WAIT in data:
150  board_data["arduino_wait"] = data[CONF_ARDUINO_WAIT]
151  if CONF_SLEEP_TUNE in data:
152  board_data["sleep_tune"] = data[CONF_SLEEP_TUNE]
153 
154  board_data["autostart"] = False
155  board_data["shutdown_on_exception"] = True
156  board_data["close_loop_on_shutdown"] = False
157 
158  board = PymataExpress(**board_data)
159 
160  await board.start_aio()
161  return board
tuple[Literal[0, 1], int] get_pin_type(self, FirmataPinType pin)
Definition: board.py:124
bool mark_pin_used(self, FirmataPinType pin)
Definition: board.py:117
PymataExpress get_board(Mapping data)
Definition: board.py:138