Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for Arduino-compatible Microcontrollers through Firmata."""
2 
3 from copy import copy
4 import logging
5 
6 import voluptuous as vol
7 
8 from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
9 from homeassistant.const import (
10  CONF_BINARY_SENSORS,
11  CONF_LIGHTS,
12  CONF_MAXIMUM,
13  CONF_MINIMUM,
14  CONF_NAME,
15  CONF_PIN,
16  CONF_SENSORS,
17  CONF_SWITCHES,
18  EVENT_HOMEASSISTANT_STOP,
19 )
20 from homeassistant.core import HomeAssistant
21 from homeassistant.helpers import config_validation as cv, device_registry as dr
22 from homeassistant.helpers.typing import ConfigType
23 
24 from .board import FirmataBoard
25 from .const import (
26  CONF_ARDUINO_INSTANCE_ID,
27  CONF_ARDUINO_WAIT,
28  CONF_DIFFERENTIAL,
29  CONF_INITIAL_STATE,
30  CONF_NEGATE_STATE,
31  CONF_PIN_MODE,
32  CONF_PLATFORM_MAP,
33  CONF_SAMPLING_INTERVAL,
34  CONF_SERIAL_BAUD_RATE,
35  CONF_SERIAL_PORT,
36  CONF_SLEEP_TUNE,
37  DOMAIN,
38  FIRMATA_MANUFACTURER,
39  PIN_MODE_ANALOG,
40  PIN_MODE_INPUT,
41  PIN_MODE_OUTPUT,
42  PIN_MODE_PULLUP,
43  PIN_MODE_PWM,
44 )
45 
46 _LOGGER = logging.getLogger(__name__)
47 
48 DATA_CONFIGS = "board_configs"
49 
50 ANALOG_PIN_SCHEMA = vol.All(cv.string, vol.Match(r"^A[0-9]+$"))
51 
52 SWITCH_SCHEMA = vol.Schema(
53  {
54  vol.Required(CONF_NAME): cv.string,
55  # Both digital and analog pins may be used as digital output
56  vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA),
57  vol.Required(CONF_PIN_MODE): PIN_MODE_OUTPUT,
58  vol.Optional(CONF_INITIAL_STATE, default=False): cv.boolean,
59  vol.Optional(CONF_NEGATE_STATE, default=False): cv.boolean,
60  },
61  required=True,
62 )
63 
64 LIGHT_SCHEMA = vol.Schema(
65  {
66  vol.Required(CONF_NAME): cv.string,
67  # Both digital and analog pins may be used as PWM/analog output
68  vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA),
69  vol.Required(CONF_PIN_MODE): PIN_MODE_PWM,
70  vol.Optional(CONF_INITIAL_STATE, default=0): cv.positive_int,
71  vol.Optional(CONF_MINIMUM, default=0): cv.positive_int,
72  vol.Optional(CONF_MAXIMUM, default=255): cv.positive_int,
73  },
74  required=True,
75 )
76 
77 BINARY_SENSOR_SCHEMA = vol.Schema(
78  {
79  vol.Required(CONF_NAME): cv.string,
80  # Both digital and analog pins may be used as digital input
81  vol.Required(CONF_PIN): vol.Any(cv.positive_int, ANALOG_PIN_SCHEMA),
82  vol.Required(CONF_PIN_MODE): vol.Any(PIN_MODE_INPUT, PIN_MODE_PULLUP),
83  vol.Optional(CONF_NEGATE_STATE, default=False): cv.boolean,
84  },
85  required=True,
86 )
87 
88 SENSOR_SCHEMA = vol.Schema(
89  {
90  vol.Required(CONF_NAME): cv.string,
91  # Currently only analog input sensor is implemented
92  vol.Required(CONF_PIN): ANALOG_PIN_SCHEMA,
93  vol.Required(CONF_PIN_MODE): PIN_MODE_ANALOG,
94  # Default differential is 40 to avoid a flood of messages on initial setup
95  # in case pin is unplugged. Firmata responds really really fast
96  vol.Optional(CONF_DIFFERENTIAL, default=40): vol.All(
97  cv.positive_int, vol.Range(min=1)
98  ),
99  },
100  required=True,
101 )
102 
103 BOARD_CONFIG_SCHEMA = vol.Schema(
104  {
105  vol.Required(CONF_SERIAL_PORT): cv.string,
106  vol.Optional(CONF_SERIAL_BAUD_RATE): cv.positive_int,
107  vol.Optional(CONF_ARDUINO_INSTANCE_ID): cv.positive_int,
108  vol.Optional(CONF_ARDUINO_WAIT): cv.positive_int,
109  vol.Optional(CONF_SLEEP_TUNE): vol.All(
110  vol.Coerce(float), vol.Range(min=0.0001)
111  ),
112  vol.Optional(CONF_SAMPLING_INTERVAL): cv.positive_int,
113  vol.Optional(CONF_SWITCHES): [SWITCH_SCHEMA],
114  vol.Optional(CONF_LIGHTS): [LIGHT_SCHEMA],
115  vol.Optional(CONF_BINARY_SENSORS): [BINARY_SENSOR_SCHEMA],
116  vol.Optional(CONF_SENSORS): [SENSOR_SCHEMA],
117  },
118  required=True,
119 )
120 
121 CONFIG_SCHEMA = vol.Schema(
122  {DOMAIN: vol.All(cv.ensure_list, [BOARD_CONFIG_SCHEMA])}, extra=vol.ALLOW_EXTRA
123 )
124 
125 
126 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
127  """Set up the Firmata domain."""
128  # Delete specific entries that no longer exist in the config
129  if hass.config_entries.async_entries(DOMAIN):
130  for entry in hass.config_entries.async_entries(DOMAIN):
131  remove = True
132  for board in config[DOMAIN]:
133  if entry.data[CONF_SERIAL_PORT] == board[CONF_SERIAL_PORT]:
134  remove = False
135  break
136  if remove:
137  await hass.config_entries.async_remove(entry.entry_id)
138 
139  # Setup new entries and update old entries
140  for board in config[DOMAIN]:
141  firmata_config = copy(board)
142  existing_entry = False
143  for entry in hass.config_entries.async_entries(DOMAIN):
144  if board[CONF_SERIAL_PORT] == entry.data[CONF_SERIAL_PORT]:
145  existing_entry = True
146  firmata_config[CONF_NAME] = entry.data[CONF_NAME]
147  hass.config_entries.async_update_entry(entry, data=firmata_config)
148  break
149  if not existing_entry:
150  hass.async_create_task(
151  hass.config_entries.flow.async_init(
152  DOMAIN,
153  context={"source": SOURCE_IMPORT},
154  data=firmata_config,
155  )
156  )
157 
158  return True
159 
160 
161 async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
162  """Set up a Firmata board for a config entry."""
163  if DOMAIN not in hass.data:
164  hass.data[DOMAIN] = {}
165 
166  _LOGGER.debug(
167  "Setting up Firmata id %s, name %s, config %s",
168  config_entry.entry_id,
169  config_entry.data[CONF_NAME],
170  config_entry.data,
171  )
172 
173  board = FirmataBoard(config_entry.data)
174 
175  if not await board.async_setup():
176  return False
177 
178  hass.data[DOMAIN][config_entry.entry_id] = board
179 
180  async def handle_shutdown(event) -> None:
181  """Handle shutdown of board when Home Assistant shuts down."""
182  # Ensure board was not already removed previously before shutdown
183  if config_entry.entry_id in hass.data[DOMAIN]:
184  await board.async_reset()
185 
186  config_entry.async_on_unload(
187  hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, handle_shutdown)
188  )
189 
190  device_registry = dr.async_get(hass)
191  device_registry.async_get_or_create(
192  config_entry_id=config_entry.entry_id,
193  connections=set(),
194  identifiers={(DOMAIN, board.name)},
195  manufacturer=FIRMATA_MANUFACTURER,
196  name=board.name,
197  sw_version=board.firmware_version,
198  )
199 
200  await hass.config_entries.async_forward_entry_setups(
201  config_entry,
202  [
203  platform
204  for conf, platform in CONF_PLATFORM_MAP.items()
205  if conf in config_entry.data
206  ],
207  )
208  return True
209 
210 
211 async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
212  """Shutdown and close a Firmata board for a config entry."""
213  _LOGGER.debug("Closing Firmata board %s", config_entry.data[CONF_NAME])
214  results: list[bool] = []
215  if platforms := [
216  platform
217  for conf, platform in CONF_PLATFORM_MAP.items()
218  if conf in config_entry.data
219  ]:
220  results.append(
221  await hass.config_entries.async_unload_platforms(config_entry, platforms)
222  )
223  results.append(await hass.data[DOMAIN].pop(config_entry.entry_id).async_reset())
224 
225  return False not in results
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:126
bool async_setup_entry(HomeAssistant hass, ConfigEntry config_entry)
Definition: __init__.py:161
bool async_unload_entry(HomeAssistant hass, ConfigEntry config_entry)
Definition: __init__.py:211