1 """Collection of useful functions for the HomeKit component."""
3 from __future__
import annotations
12 from typing
import Any, cast
14 from pyhap.accessory
import Accessory
16 import voluptuous
as vol
21 persistent_notification,
28 DOMAIN
as MEDIA_PLAYER_DOMAIN,
29 MediaPlayerDeviceClass,
30 MediaPlayerEntityFeature,
36 ATTR_SUPPORTED_FEATURES,
44 EventStateChangedData,
59 CONF_AUDIO_PACKET_SIZE,
62 CONF_LINKED_BATTERY_CHARGING_SENSOR,
63 CONF_LINKED_BATTERY_SENSOR,
64 CONF_LINKED_DOORBELL_SENSOR,
65 CONF_LINKED_HUMIDITY_SENSOR,
66 CONF_LINKED_MOTION_SENSOR,
67 CONF_LINKED_OBSTRUCTION_SENSOR,
68 CONF_LOW_BATTERY_THRESHOLD,
80 CONF_VIDEO_PACKET_SIZE,
83 DEFAULT_AUDIO_PACKET_SIZE,
84 DEFAULT_LOW_BATTERY_THRESHOLD,
89 DEFAULT_SUPPORT_AUDIO,
92 DEFAULT_VIDEO_PACKET_SIZE,
106 VIDEO_CODEC_H264_OMX,
107 VIDEO_CODEC_H264_V4L2M2M,
110 from .models
import HomeKitConfigEntry
112 _LOGGER = logging.getLogger(__name__)
115 NUMBERS_ONLY_RE = re.compile(
r"[^\d.]+")
116 VERSION_RE = re.compile(
r"([0-9]+)(\.[0-9]+)?(\.[0-9]+)?")
117 INVALID_END_CHARS =
"-_ "
118 MAX_VERSION_PART = 2**32 - 1
122 VALID_VIDEO_CODECS = [
124 VIDEO_CODEC_H264_OMX,
125 VIDEO_CODEC_H264_V4L2M2M,
128 VALID_AUDIO_CODECS = [AUDIO_CODEC_OPUS, VIDEO_CODEC_COPY]
130 BASIC_INFO_SCHEMA = vol.Schema(
132 vol.Optional(CONF_NAME): cv.string,
133 vol.Optional(CONF_LINKED_BATTERY_SENSOR): cv.entity_domain(sensor.DOMAIN),
134 vol.Optional(CONF_LINKED_BATTERY_CHARGING_SENSOR): cv.entity_domain(
138 CONF_LOW_BATTERY_THRESHOLD, default=DEFAULT_LOW_BATTERY_THRESHOLD
143 FEATURE_SCHEMA = BASIC_INFO_SCHEMA.extend(
144 {vol.Optional(CONF_FEATURE_LIST, default=
None): cv.ensure_list}
147 CAMERA_SCHEMA = BASIC_INFO_SCHEMA.extend(
149 vol.Optional(CONF_STREAM_ADDRESS): vol.All(ipaddress.ip_address, cv.string),
150 vol.Optional(CONF_STREAM_SOURCE): cv.string,
151 vol.Optional(CONF_AUDIO_CODEC, default=DEFAULT_AUDIO_CODEC): vol.In(
154 vol.Optional(CONF_SUPPORT_AUDIO, default=DEFAULT_SUPPORT_AUDIO): cv.boolean,
155 vol.Optional(CONF_MAX_WIDTH, default=DEFAULT_MAX_WIDTH): cv.positive_int,
156 vol.Optional(CONF_MAX_HEIGHT, default=DEFAULT_MAX_HEIGHT): cv.positive_int,
157 vol.Optional(CONF_MAX_FPS, default=DEFAULT_MAX_FPS): cv.positive_int,
158 vol.Optional(CONF_AUDIO_MAP, default=DEFAULT_AUDIO_MAP): cv.string,
159 vol.Optional(CONF_VIDEO_MAP, default=DEFAULT_VIDEO_MAP): cv.string,
160 vol.Optional(CONF_STREAM_COUNT, default=DEFAULT_STREAM_COUNT): vol.All(
161 vol.Coerce(int), vol.Range(min=1, max=10)
163 vol.Optional(CONF_VIDEO_CODEC, default=DEFAULT_VIDEO_CODEC): vol.In(
167 CONF_AUDIO_PACKET_SIZE, default=DEFAULT_AUDIO_PACKET_SIZE
170 CONF_VIDEO_PACKET_SIZE, default=DEFAULT_VIDEO_PACKET_SIZE
172 vol.Optional(CONF_LINKED_MOTION_SENSOR): cv.entity_domain(
173 [binary_sensor.DOMAIN, EVENT_DOMAIN]
175 vol.Optional(CONF_LINKED_DOORBELL_SENSOR): cv.entity_domain(
176 [binary_sensor.DOMAIN, EVENT_DOMAIN]
181 HUMIDIFIER_SCHEMA = BASIC_INFO_SCHEMA.extend(
182 {vol.Optional(CONF_LINKED_HUMIDITY_SENSOR): cv.entity_domain(sensor.DOMAIN)}
186 COVER_SCHEMA = BASIC_INFO_SCHEMA.extend(
188 vol.Optional(CONF_LINKED_OBSTRUCTION_SENSOR): cv.entity_domain(
194 CODE_SCHEMA = BASIC_INFO_SCHEMA.extend(
195 {vol.Optional(ATTR_CODE, default=
None): vol.Any(
None, cv.string)}
198 MEDIA_PLAYER_SCHEMA = vol.Schema(
200 vol.Required(CONF_FEATURE): vol.All(
214 SWITCH_TYPE_SCHEMA = BASIC_INFO_SCHEMA.extend(
216 vol.Optional(CONF_TYPE, default=TYPE_SWITCH): vol.All(
232 SENSOR_SCHEMA = BASIC_INFO_SCHEMA.extend(
234 vol.Optional(CONF_THRESHOLD_CO): vol.Any(
None, cv.positive_int),
235 vol.Optional(CONF_THRESHOLD_CO2): vol.Any(
None, cv.positive_int),
240 HOMEKIT_CHAR_TRANSLATIONS = {
274 def validate_entity_config(values: dict) -> dict[str, dict]:
275 """Validate config entry for CONF_ENTITY."""
276 if not isinstance(values, dict):
277 raise vol.Invalid(
"expected a dictionary")
280 for entity_id, config
in values.items():
281 entity = cv.entity_id(entity_id)
284 if not isinstance(config, dict):
285 raise vol.Invalid(f
"The configuration for {entity} must be a dictionary.")
287 if domain
in (
"alarm_control_panel",
"lock"):
290 elif domain == media_player.const.DOMAIN:
293 for feature
in config[CONF_FEATURE_LIST]:
295 key = params.pop(CONF_FEATURE)
296 if key
in feature_list:
297 raise vol.Invalid(f
"A feature can be added only once for {entity}")
298 feature_list[key] = params
299 config[CONF_FEATURE_LIST] = feature_list
301 elif domain ==
"camera":
304 elif domain ==
"switch":
307 elif domain ==
"humidifier":
310 elif domain ==
"cover":
313 elif domain ==
"sensor":
319 entities[entity] = config
323 def get_media_player_features(state: State) -> list[str]:
324 """Determine features for media players."""
325 features = state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
329 MediaPlayerEntityFeature.TURN_ON | MediaPlayerEntityFeature.TURN_OFF
331 supported_modes.append(FEATURE_ON_OFF)
332 if features & (MediaPlayerEntityFeature.PLAY | MediaPlayerEntityFeature.PAUSE):
333 supported_modes.append(FEATURE_PLAY_PAUSE)
334 if features & (MediaPlayerEntityFeature.PLAY | MediaPlayerEntityFeature.STOP):
335 supported_modes.append(FEATURE_PLAY_STOP)
336 if features & MediaPlayerEntityFeature.VOLUME_MUTE:
337 supported_modes.append(FEATURE_TOGGLE_MUTE)
338 return supported_modes
341 def validate_media_player_features(state: State, feature_list: str) -> bool:
342 """Validate features for media players."""
343 if not (supported_modes := get_media_player_features(state)):
344 _LOGGER.error(
"%s does not support any media_player features", state.entity_id)
351 error_list = [feature
for feature
in feature_list
if feature
not in supported_modes]
355 "%s does not support media_player features: %s", state.entity_id, error_list
361 def async_show_setup_message(
362 hass: HomeAssistant, entry_id: str, bridge_name: str, pincode: bytes, uri: str
364 """Display persistent notification with setup information."""
365 pin = pincode.decode()
366 _LOGGER.info(
"Pincode: %s", pin)
368 buffer = io.BytesIO()
369 url = pyqrcode.create(uri)
370 url.svg(buffer, scale=5, module_color=
"#000", background=
"#FFF")
371 pairing_secret = secrets.token_hex(32)
373 entry = cast(HomeKitConfigEntry, hass.config_entries.async_get_entry(entry_id))
374 entry_data = entry.runtime_data
376 entry_data.pairing_qr = buffer.getvalue()
377 entry_data.pairing_qr_secret = pairing_secret
380 f
"To set up {bridge_name} in the Home App, "
381 "scan the QR code or enter the following code:\n"
383 f
""
385 persistent_notification.async_create(hass, message,
"HomeKit Pairing", entry_id)
388 def async_dismiss_setup_message(hass: HomeAssistant, entry_id: str) ->
None:
389 """Dismiss persistent notification and remove QR code."""
390 persistent_notification.async_dismiss(hass, entry_id)
393 def convert_to_float(state: Any) -> float |
None:
394 """Return float of state, catch errors."""
397 except (ValueError, TypeError):
401 def coerce_int(state: str) -> int:
405 except (ValueError, TypeError):
409 def cleanup_name_for_homekit(name: str |
None) -> str:
410 """Ensure the name of the device will not crash homekit."""
419 name.translate(HOMEKIT_CHAR_TRANSLATIONS)
420 .lstrip(INVALID_END_CHARS)[:MAX_NAME_LENGTH]
421 .rstrip(INVALID_END_CHARS)
425 def temperature_to_homekit(temperature: float, unit: str) -> float:
426 """Convert temperature to Celsius for HomeKit."""
427 return TemperatureConverter.convert(temperature, unit, UnitOfTemperature.CELSIUS)
430 def temperature_to_states(temperature: float, unit: str) -> float:
431 """Convert temperature back from Celsius to Home Assistant unit."""
432 return TemperatureConverter.convert(temperature, UnitOfTemperature.CELSIUS, unit)
435 def density_to_air_quality(density: float) -> int:
436 """Map PM2.5 µg/m3 density to HomeKit AirQuality level."""
448 def density_to_air_quality_pm10(density: float) -> int:
449 """Map PM10 µg/m3 density to HomeKit AirQuality level."""
461 def density_to_air_quality_nitrogen_dioxide(density: float) -> int:
462 """Map nitrogen dioxide µg/m3 to HomeKit AirQuality level."""
474 def density_to_air_quality_voc(density: float) -> int:
475 """Map VOCs µg/m3 to HomeKit AirQuality level.
477 The VOC mappings use the IAQ guidelines for Europe released by the WHO (World Health Organization).
478 Referenced from Sensirion_Gas_Sensors_SGP3x_TVOC_Concept.pdf
479 https://github.com/paulvha/svm30/blob/master/extras/Sensirion_Gas_Sensors_SGP3x_TVOC_Concept.pdf
492 def get_persist_filename_for_entry_id(entry_id: str) -> str:
493 """Determine the filename of the homekit state file."""
494 return f
"{DOMAIN}.{entry_id}.state"
497 def get_aid_storage_filename_for_entry_id(entry_id: str) -> str:
498 """Determine the filename of homekit aid storage file."""
499 return f
"{DOMAIN}.{entry_id}.aids"
502 def get_iid_storage_filename_for_entry_id(entry_id: str) -> str:
503 """Determine the filename of homekit iid storage file."""
504 return f
"{DOMAIN}.{entry_id}.iids"
507 def get_persist_fullpath_for_entry_id(hass: HomeAssistant, entry_id: str) -> str:
508 """Determine the path to the homekit state file."""
509 return hass.config.path(STORAGE_DIR, get_persist_filename_for_entry_id(entry_id))
512 def get_aid_storage_fullpath_for_entry_id(hass: HomeAssistant, entry_id: str) -> str:
513 """Determine the path to the homekit aid storage file."""
514 return hass.config.path(
515 STORAGE_DIR, get_aid_storage_filename_for_entry_id(entry_id)
519 def get_iid_storage_fullpath_for_entry_id(hass: HomeAssistant, entry_id: str) -> str:
520 """Determine the path to the homekit iid storage file."""
521 return hass.config.path(
522 STORAGE_DIR, get_iid_storage_filename_for_entry_id(entry_id)
526 def _format_version_part(version_part: str) -> str:
527 return str(
max(0,
min(MAX_VERSION_PART, coerce_int(version_part))))
530 def format_version(version: str) -> str |
None:
531 """Extract the version string in a format homekit can consume."""
532 split_ver =
str(version).replace(
"-",
".").replace(
" ",
".")
533 num_only = NUMBERS_ONLY_RE.sub(
"", split_ver)
534 if (match := VERSION_RE.search(num_only))
is None:
536 value =
".".join(map(_format_version_part, match.group(0).split(
".")))
537 return None if _is_zero_but_true(value)
else value
540 def _is_zero_but_true(value: Any) -> bool:
541 """Zero but true values can crash apple watches."""
542 return convert_to_float(value) == 0
545 def remove_state_files_for_entry_id(hass: HomeAssistant, entry_id: str) ->
None:
546 """Remove the state files from disk."""
548 get_persist_fullpath_for_entry_id(hass, entry_id),
549 get_aid_storage_fullpath_for_entry_id(hass, entry_id),
550 get_iid_storage_fullpath_for_entry_id(hass, entry_id),
552 if os.path.exists(path):
556 def _get_test_socket() -> socket.socket:
557 """Create a socket to test binding ports."""
558 test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
559 test_socket.setblocking(
False)
560 test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
565 def async_port_is_available(port: int) -> bool:
566 """Check to see if a port is available."""
568 _get_test_socket().bind((
"", port))
575 def async_find_next_available_port(hass: HomeAssistant, start_port: int) -> int:
576 """Find the next available port not assigned to a config entry."""
578 entry.data[CONF_PORT]
579 for entry
in hass.config_entries.async_entries(DOMAIN)
580 if CONF_PORT
in entry.data
587 """Find the next available port starting with the given port."""
588 test_socket = _get_test_socket()
589 for port
in range(start_port, MAX_PORT + 1):
590 if port
in exclude_ports:
593 test_socket.bind((
"", port))
600 raise RuntimeError(
"unreachable")
603 def pid_is_alive(pid: int) -> bool:
604 """Check to see if a process is alive."""
612 def accessory_friendly_name(hass_name: str, accessory: Accessory) -> str:
613 """Return the combined name for the accessory.
615 The mDNS name and the Home Assistant config entry
616 name are usually different which means they need to
617 see both to identify the accessory.
619 accessory_mdns_name = cast(str, accessory.display_name)
620 if hass_name.casefold().startswith(accessory_mdns_name.casefold()):
622 if accessory_mdns_name.casefold().startswith(hass_name.casefold()):
623 return accessory_mdns_name
624 return f
"{hass_name} ({accessory_mdns_name})"
627 def state_needs_accessory_mode(state: State) -> bool:
628 """Return if the entity represented by the state must be paired in accessory mode."""
629 if state.domain
in (CAMERA_DOMAIN, LOCK_DOMAIN):
633 state.domain == MEDIA_PLAYER_DOMAIN
634 and state.attributes.get(ATTR_DEVICE_CLASS)
635 in (MediaPlayerDeviceClass.TV, MediaPlayerDeviceClass.RECEIVER)
636 or state.domain == REMOTE_DOMAIN
637 and state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
638 & RemoteEntityFeature.ACTIVITY
642 def state_changed_event_is_same_state(event: Event[EventStateChangedData]) -> bool:
643 """Check if a state changed event is the same state."""
644 event_data = event.data
645 old_state = event_data[
"old_state"]
646 new_state = event_data[
"new_state"]
647 return bool(new_state
and old_state
and new_state.state == old_state.state)
int _async_find_next_available_port(AddressTupleVXType source)
tuple[str, str] split_entity_id(str entity_id)