Home Assistant Unofficial Reference 2024.12.1
pin.py
Go to the documentation of this file.
1 """Code to handle pins on a Firmata board."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 import logging
7 from typing import cast
8 
9 from .board import FirmataBoard, FirmataPinType
10 from .const import PIN_MODE_INPUT, PIN_MODE_PULLUP, PIN_TYPE_ANALOG
11 
12 _LOGGER = logging.getLogger(__name__)
13 
14 
15 class FirmataPinUsedException(Exception):
16  """Represents an exception when a pin is already in use."""
17 
18 
20  """Manages a single Firmata board pin."""
21 
22  def __init__(self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str) -> None:
23  """Initialize the pin."""
24  self.boardboard = board
25  self._pin_pin = pin
26  self._pin_mode_pin_mode = pin_mode
27  self._pin_type_pin_type, self._firmata_pin_firmata_pin = self.boardboard.get_pin_type(self._pin_pin)
28  self._state: bool | int | None = None
29  self._analog_pin_analog_pin: int | None = None
30 
31  if self._pin_type_pin_type == PIN_TYPE_ANALOG:
32  # Pymata wants the analog pin formatted as the # from "A#"
33  self._analog_pin_analog_pin = int(cast(str, self._pin_pin)[1:])
34 
35  def setup(self):
36  """Set up a pin and make sure it is valid."""
37  if not self.boardboard.mark_pin_used(self._pin_pin):
38  raise FirmataPinUsedException(f"Pin {self._pin} already used!")
39 
40 
42  """Representation of a Firmata Digital Output Pin."""
43 
44  _state: bool
45 
46  def __init__(
47  self,
48  board: FirmataBoard,
49  pin: FirmataPinType,
50  pin_mode: str,
51  initial: bool,
52  negate: bool,
53  ) -> None:
54  """Initialize the digital output pin."""
55  self._initial_initial = initial
56  self._negate_negate = negate
57  super().__init__(board, pin, pin_mode)
58 
59  async def start_pin(self) -> None:
60  """Set initial state on a pin."""
61  _LOGGER.debug(
62  "Setting initial state for digital output pin %s on board %s",
63  self._pin_pin,
64  self.boardboard.name,
65  )
66  api = self.boardboard.api
67  # Only PIN_MODE_OUTPUT mode is supported as binary digital output
68  await api.set_pin_mode_digital_output(self._firmata_pin_firmata_pin)
69 
70  if self._initial_initial:
71  new_pin_state = not self._negate_negate
72  else:
73  new_pin_state = self._negate_negate
74  await api.digital_pin_write(self._firmata_pin_firmata_pin, int(new_pin_state))
75  self._state_state = self._initial_initial
76 
77  @property
78  def is_on(self) -> bool:
79  """Return true if digital output is on."""
80  return self._state_state
81 
82  async def turn_on(self) -> None:
83  """Turn on digital output."""
84  _LOGGER.debug("Turning digital output on pin %s on", self._pin_pin)
85  new_pin_state = not self._negate_negate
86  await self.boardboard.api.digital_pin_write(self._firmata_pin_firmata_pin, int(new_pin_state))
87  self._state_state = True
88 
89  async def turn_off(self) -> None:
90  """Turn off digital output."""
91  _LOGGER.debug("Turning digital output on pin %s off", self._pin_pin)
92  new_pin_state = self._negate_negate
93  await self.boardboard.api.digital_pin_write(self._firmata_pin_firmata_pin, int(new_pin_state))
94  self._state_state = False
95 
96 
98  """Representation of a Firmata PWM/analog Output Pin."""
99 
100  _state: int
101 
102  def __init__(
103  self,
104  board: FirmataBoard,
105  pin: FirmataPinType,
106  pin_mode: str,
107  initial: bool,
108  minimum: int,
109  maximum: int,
110  ) -> None:
111  """Initialize the PWM/analog output pin."""
112  self._initial_initial = initial
113  self._min_min = minimum
114  self._max_max = maximum
115  self._range_range = self._max_max - self._min_min
116  super().__init__(board, pin, pin_mode)
117 
118  async def start_pin(self) -> None:
119  """Set initial state on a pin."""
120  _LOGGER.debug(
121  "Setting initial state for PWM/analog output pin %s on board %s to %d",
122  self._pin_pin,
123  self.boardboard.name,
124  self._initial_initial,
125  )
126  api = self.boardboard.api
127  await api.set_pin_mode_pwm_output(self._firmata_pin_firmata_pin)
128 
129  new_pin_state = round((self._initial_initial * self._range_range) / 255) + self._min_min
130  await api.pwm_write(self._firmata_pin_firmata_pin, new_pin_state)
131  self._state_state = self._initial_initial
132 
133  @property
134  def state(self) -> int:
135  """Return PWM/analog state."""
136  return self._state_state
137 
138  async def set_level(self, level: int) -> None:
139  """Set PWM/analog output."""
140  _LOGGER.debug("Setting PWM/analog output on pin %s to %d", self._pin_pin, level)
141  new_pin_state = round((level * self._range_range) / 255) + self._min_min
142  await self.boardboard.api.pwm_write(self._firmata_pin_firmata_pin, new_pin_state)
143  self._state_state = level
144 
145 
147  """Representation of a Firmata Digital Input Pin."""
148 
149  _state: bool
150 
151  def __init__(
152  self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str, negate: bool
153  ) -> None:
154  """Initialize the digital input pin."""
155  self._negate_negate = negate
156  self._forward_callback_forward_callback: Callable[[], None]
157  super().__init__(board, pin, pin_mode)
158 
159  async def start_pin(self, forward_callback: Callable[[], None]) -> None:
160  """Get initial state and start reporting a pin."""
161  _LOGGER.debug(
162  "Starting reporting updates for digital input pin %s on board %s",
163  self._pin_pin,
164  self.boardboard.name,
165  )
166  self._forward_callback_forward_callback = forward_callback
167  api = self.boardboard.api
168  if self._pin_mode_pin_mode_pin_mode == PIN_MODE_INPUT:
169  await api.set_pin_mode_digital_input(self._pin_pin, self.latch_callbacklatch_callback)
170  elif self._pin_mode_pin_mode_pin_mode == PIN_MODE_PULLUP:
171  await api.set_pin_mode_digital_input_pullup(self._pin_pin, self.latch_callbacklatch_callback)
172 
173  new_state = bool((await self.boardboard.api.digital_read(self._firmata_pin_firmata_pin))[0])
174  if self._negate_negate:
175  new_state = not new_state
176  self._state_state = new_state
177 
178  await api.enable_digital_reporting(self._pin_pin)
179  self._forward_callback_forward_callback()
180 
181  async def stop_pin(self) -> None:
182  """Stop reporting digital input pin."""
183  _LOGGER.debug(
184  "Stopping reporting updates for digital input pin %s on board %s",
185  self._pin_pin,
186  self.boardboard.name,
187  )
188  api = self.boardboard.api
189  await api.disable_digital_reporting(self._pin_pin)
190 
191  @property
192  def is_on(self) -> bool:
193  """Return true if digital input is on."""
194  return self._state_state
195 
196  async def latch_callback(self, data: list) -> None:
197  """Update pin state on callback."""
198  if data[1] != self._firmata_pin_firmata_pin:
199  return
200  _LOGGER.debug(
201  "Received latch %d for digital input pin %d on board %s",
202  data[2],
203  self._firmata_pin_firmata_pin,
204  self.boardboard.name,
205  )
206  new_state = bool(data[2])
207  if self._negate_negate:
208  new_state = not new_state
209  if self._state_state == new_state:
210  return
211  self._state_state = new_state
212  self._forward_callback_forward_callback()
213 
214 
216  """Representation of a Firmata Analog Input Pin."""
217 
218  _analog_pin: int
219  _state: int
220 
221  def __init__(
222  self, board: FirmataBoard, pin: FirmataPinType, pin_mode: str, differential: int
223  ) -> None:
224  """Initialize the analog input pin."""
225  self._differential_differential = differential
226  self._forward_callback_forward_callback: Callable[[], None]
227  super().__init__(board, pin, pin_mode)
228 
229  async def start_pin(self, forward_callback: Callable[[], None]) -> None:
230  """Get initial state and start reporting a pin."""
231  _LOGGER.debug(
232  "Starting reporting updates for analog input pin %s on board %s",
233  self._pin_pin,
234  self.boardboard.name,
235  )
236  self._forward_callback_forward_callback = forward_callback
237  api = self.boardboard.api
238  # Only PIN_MODE_ANALOG_INPUT mode is supported as sensor input
239  await api.set_pin_mode_analog_input(
240  self._analog_pin_analog_pin, self.latch_callbacklatch_callback, self._differential_differential
241  )
242 
243  self._state_state = (await self.boardboard.api.analog_read(self._analog_pin_analog_pin))[0]
244 
245  self._forward_callback_forward_callback()
246 
247  async def stop_pin(self) -> None:
248  """Stop reporting analog input pin."""
249  _LOGGER.debug(
250  "Stopping reporting updates for analog input pin %s on board %s",
251  self._pin_pin,
252  self.boardboard.name,
253  )
254  api = self.boardboard.api
255  await api.disable_analog_reporting(self._analog_pin_analog_pin)
256 
257  @property
258  def state(self) -> int:
259  """Return sensor state."""
260  return self._state_state
261 
262  async def latch_callback(self, data: list) -> None:
263  """Update pin state on callback."""
264  if data[1] != self._analog_pin_analog_pin:
265  return
266  _LOGGER.debug(
267  "Received latch %d for analog input pin %s on board %s",
268  data[2],
269  self._pin_pin,
270  self.boardboard.name,
271  )
272  new_state = data[2]
273  if self._state_state == new_state:
274  _LOGGER.debug("stopping")
275  return
276  self._state_state = new_state
277  self._forward_callback_forward_callback()
None start_pin(self, Callable[[], None] forward_callback)
Definition: pin.py:229
None __init__(self, FirmataBoard board, FirmataPinType pin, str pin_mode, int differential)
Definition: pin.py:223
None start_pin(self, Callable[[], None] forward_callback)
Definition: pin.py:159
None __init__(self, FirmataBoard board, FirmataPinType pin, str pin_mode, bool negate)
Definition: pin.py:153
None __init__(self, FirmataBoard board, FirmataPinType pin, str pin_mode, bool initial, bool negate)
Definition: pin.py:53
None __init__(self, FirmataBoard board, FirmataPinType pin, str pin_mode)
Definition: pin.py:22
None __init__(self, FirmataBoard board, FirmataPinType pin, str pin_mode, bool initial, int minimum, int maximum)
Definition: pin.py:110