1 """Support for Ebusd daemon for communication with eBUS heating systems."""
6 import voluptuous
as vol
10 CONF_MONITORED_CONDITIONS,
20 from .const
import DOMAIN, SENSOR_TYPES
22 _LOGGER = logging.getLogger(__name__)
24 DEFAULT_NAME =
"ebusd"
26 CONF_CIRCUIT =
"circuit"
28 SERVICE_EBUSD_WRITE =
"ebusd_write"
32 """Verify eBusd config."""
33 circuit = config[CONF_CIRCUIT]
34 for condition
in config[CONF_MONITORED_CONDITIONS]:
35 if condition
not in SENSOR_TYPES[circuit]:
36 raise vol.Invalid(f
"Condition '{condition}' not in '{circuit}'.")
40 CONFIG_SCHEMA = vol.Schema(
45 vol.Required(CONF_CIRCUIT): cv.string,
46 vol.Required(CONF_HOST): cv.string,
47 vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
48 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
49 vol.Optional(CONF_MONITORED_CONDITIONS, default=[]): cv.ensure_list,
55 extra=vol.ALLOW_EXTRA,
59 def setup(hass: HomeAssistant, config: ConfigType) -> bool:
60 """Set up the eBusd component."""
61 _LOGGER.debug(
"Integration setup started")
63 name = conf[CONF_NAME]
64 circuit = conf[CONF_CIRCUIT]
65 monitored_conditions = conf.get(CONF_MONITORED_CONDITIONS)
66 server_address = (conf.get(CONF_HOST), conf.get(CONF_PORT))
69 ebusdpy.init(server_address)
70 except (TimeoutError, OSError):
72 hass.data[DOMAIN] =
EbusdData(server_address, circuit)
74 CONF_MONITORED_CONDITIONS: monitored_conditions,
76 "sensor_types": SENSOR_TYPES[circuit],
78 load_platform(hass, Platform.SENSOR, DOMAIN, sensor_config, config)
80 hass.services.register(DOMAIN, SERVICE_EBUSD_WRITE, hass.data[DOMAIN].write)
82 _LOGGER.debug(
"Ebusd integration setup completed")
87 """Get the latest data from Ebusd."""
90 """Initialize the data object."""
96 """Call the Ebusd API to update the data."""
98 _LOGGER.debug(
"Opening socket to ebusd %s", name)
99 command_result = ebusdpy.read(
102 if command_result
is not None:
103 if "ERR:" in command_result:
104 _LOGGER.warning(command_result)
106 self.
valuevalue[name] = command_result
107 except RuntimeError
as err:
109 raise RuntimeError(err)
from err
111 def write(self, call: ServiceCall) ->
None:
112 """Call write method on ebusd."""
113 name = call.data.get(
"name")
114 value = call.data.get(
"value")
117 _LOGGER.debug(
"Opening socket to ebusd %s", name)
118 command_result = ebusdpy.write(self.
_address_address, self.
_circuit_circuit, name, value)
119 if command_result
is not None and "done" not in command_result:
120 _LOGGER.warning(
"Write command failed: %s", name)
121 except RuntimeError
as err:
def update(self, name, stype)
def __init__(self, address, circuit)
None write(self, ServiceCall call)
bool setup(HomeAssistant hass, ConfigType config)
def verify_ebusd_config(config)
None load_platform(core.HomeAssistant hass, Platform|str component, str platform, DiscoveryInfoType|None discovered, ConfigType hass_config)