1 """Support for consuming values for the Volkszaehler API."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from volkszaehler
import Volkszaehler
9 from volkszaehler.exceptions
import VolkszaehlerApiConnectionError
10 import voluptuous
as vol
13 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
16 SensorEntityDescription,
20 CONF_MONITORED_CONDITIONS,
35 _LOGGER = logging.getLogger(__name__)
37 DEFAULT_HOST =
"localhost"
38 DEFAULT_NAME =
"Volkszaehler"
43 SENSOR_TYPES: tuple[SensorEntityDescription, ...] = (
47 native_unit_of_measurement=UnitOfPower.WATT,
48 device_class=SensorDeviceClass.POWER,
54 native_unit_of_measurement=UnitOfEnergy.WATT_HOUR,
55 device_class=SensorDeviceClass.ENERGY,
56 icon=
"mdi:power-plug",
61 native_unit_of_measurement=UnitOfPower.WATT,
62 device_class=SensorDeviceClass.POWER,
68 native_unit_of_measurement=UnitOfPower.WATT,
69 device_class=SensorDeviceClass.POWER,
70 icon=
"mdi:arrow-down",
74 SENSOR_KEYS: list[str] = [desc.key
for desc
in SENSOR_TYPES]
76 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
78 vol.Required(CONF_UUID): cv.string,
79 vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
80 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
81 vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
82 vol.Optional(CONF_MONITORED_CONDITIONS, default=[
"average"]): vol.All(
83 cv.ensure_list, [vol.In(SENSOR_KEYS)]
92 async_add_entities: AddEntitiesCallback,
93 discovery_info: DiscoveryInfoType |
None =
None,
95 """Set up the Volkszaehler sensors."""
97 host: str = config[CONF_HOST]
98 name: str = config[CONF_NAME]
99 port: int = config[CONF_PORT]
100 uuid: str = config[CONF_UUID]
101 conditions: list[str] = config[CONF_MONITORED_CONDITIONS]
104 vz_api =
VolkszaehlerData(Volkszaehler(session, uuid, host=host, port=port))
106 await vz_api.async_update()
108 if vz_api.api.data
is None:
109 raise PlatformNotReady
113 for description
in SENSOR_TYPES
114 if description.key
in conditions
121 """Implementation of a Volkszaehler sensor."""
124 self, vz_api: VolkszaehlerData, name: str, description: SensorEntityDescription
126 """Initialize the Volkszaehler sensor."""
134 """Could the device be accessed during the last update call."""
135 return self.
vz_apivz_api.available
138 """Get the latest data from REST API."""
141 if self.
vz_apivz_api.api.data
is not None:
148 """The class for handling the data retrieval from the Volkszaehler API."""
151 """Initialize the data object."""
155 @Throttle(MIN_TIME_BETWEEN_UPDATES)
157 """Get the latest data from the Volkszaehler REST API."""
160 await self.
apiapi.get_data()
162 except VolkszaehlerApiConnectionError:
163 _LOGGER.error(
"Unable to fetch data from the Volkszaehler API")
None __init__(self, Volkszaehler api)
None __init__(self, VolkszaehlerData vz_api, str name, SensorEntityDescription description)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)