1 """Monitors home energy use for the ELIQ Online service."""
3 from __future__
import annotations
5 from datetime
import timedelta
9 import voluptuous
as vol
12 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
24 _LOGGER = logging.getLogger(__name__)
26 CONF_CHANNEL_ID =
"channel_id"
28 DEFAULT_NAME =
"ELIQ Online"
32 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
34 vol.Required(CONF_ACCESS_TOKEN): cv.string,
35 vol.Required(CONF_CHANNEL_ID): cv.positive_int,
36 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
44 async_add_entities: AddEntitiesCallback,
45 discovery_info: DiscoveryInfoType |
None =
None,
47 """Set up the ELIQ Online sensor."""
48 access_token = config.get(CONF_ACCESS_TOKEN)
49 name = config.get(CONF_NAME, DEFAULT_NAME)
50 channel_id = config.get(CONF_CHANNEL_ID)
53 api = eliqonline.API(session=session, access_token=access_token)
56 _LOGGER.debug(
"Probing for access to ELIQ Online API")
57 await api.get_data_now(channelid=channel_id)
58 except OSError
as error:
59 _LOGGER.error(
"Could not access the ELIQ Online API: %s", error)
66 """Implementation of an ELIQ Online sensor."""
68 _attr_device_class = SensorDeviceClass.POWER
69 _attr_native_unit_of_measurement = UnitOfPower.WATT
70 _attr_state_class = SensorStateClass.MEASUREMENT
73 """Initialize the sensor."""
79 """Get the latest data."""
81 response = await self.
_api_api.get_data_now(channelid=self.
_channel_id_channel_id)
83 _LOGGER.debug(
"Updated power from server %d W", self.
native_valuenative_value)
85 _LOGGER.warning(
"Invalid response from ELIQ Online API")
86 except (OSError, TimeoutError)
as error:
87 _LOGGER.warning(
"Could not connect to the ELIQ Online API: %s", error)
def __init__(self, api, channel_id, name)
StateType|date|datetime|Decimal native_value(self)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)