1 """Adds config flow for System Monitor."""
3 from __future__
import annotations
5 from collections.abc
import Mapping
8 import voluptuous
as vol
15 SchemaCommonFlowHandler,
16 SchemaConfigFlowHandler,
26 from .const
import CONF_PROCESS, DOMAIN
27 from .util
import get_all_running_processes
31 handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
33 """Validate sensor input."""
36 sensors: dict[str, list] = handler.options.setdefault(BINARY_SENSOR_DOMAIN, {})
37 processes = sensors.setdefault(CONF_PROCESS, [])
38 previous_processes = processes.copy()
40 processes.extend(user_input[CONF_PROCESS])
42 entity_registry = er.async_get(handler.parent_handler.hass)
43 for process
in previous_processes:
44 if process
not in processes
and (
45 entity_id := entity_registry.async_get_entity_id(
46 BINARY_SENSOR_DOMAIN, DOMAIN,
slugify(f
"binary_process_{process}")
49 entity_registry.async_remove(entity_id)
55 """Return process sensor setup schema."""
56 hass = handler.parent_handler.hass
57 processes =
list(await hass.async_add_executor_job(get_all_running_processes, hass))
65 mode=SelectSelectorMode.DROPDOWN,
74 """Return suggested values for sensor setup."""
75 sensors: dict[str, list] = handler.options.get(BINARY_SENSOR_DOMAIN, {})
76 processes: list[str] = sensors.get(CONF_PROCESS, [])
77 return {CONF_PROCESS: processes}
85 get_sensor_setup_schema,
86 suggested_values=get_suggested_value,
87 validate_user_input=validate_sensor_setup,
93 """Handle a config flow for System Monitor."""
95 config_flow = CONFIG_FLOW
96 options_flow = OPTIONS_FLOW
101 """Return config entry title."""
102 return "System Monitor"
106 self, data: Mapping[str, Any], **kwargs: Any
107 ) -> ConfigFlowResult:
108 """Finish config flow and create a config entry."""
ConfigFlowResult async_create_entry(self, Mapping[str, Any] data, **Any kwargs)
str async_config_entry_title(self, Mapping[str, Any] options)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
dict[str, Any] validate_sensor_setup(SchemaCommonFlowHandler handler, dict[str, Any] user_input)
vol.Schema get_sensor_setup_schema(SchemaCommonFlowHandler handler)
dict[str, Any] get_suggested_value(SchemaCommonFlowHandler handler)