1 """Helper sensor for calculating utility costs."""
3 from __future__
import annotations
6 from collections.abc
import Callable, Mapping
8 from dataclasses
import dataclass
10 from typing
import Any, Final, Literal, cast
36 from .const
import DOMAIN
37 from .data
import EnergyManager, async_get_manager
39 SUPPORTED_STATE_CLASSES = {
40 SensorStateClass.MEASUREMENT,
41 SensorStateClass.TOTAL,
42 SensorStateClass.TOTAL_INCREASING,
44 VALID_ENERGY_UNITS: set[str] = {
45 UnitOfEnergy.GIGA_JOULE,
46 UnitOfEnergy.KILO_WATT_HOUR,
47 UnitOfEnergy.MEGA_JOULE,
48 UnitOfEnergy.MEGA_WATT_HOUR,
49 UnitOfEnergy.WATT_HOUR,
51 VALID_ENERGY_UNITS_GAS = {
52 UnitOfVolume.CENTUM_CUBIC_FEET,
53 UnitOfVolume.CUBIC_FEET,
54 UnitOfVolume.CUBIC_METERS,
57 VALID_VOLUME_UNITS_WATER: set[str] = {
58 UnitOfVolume.CENTUM_CUBIC_FEET,
59 UnitOfVolume.CUBIC_FEET,
60 UnitOfVolume.CUBIC_METERS,
64 _LOGGER = logging.getLogger(__name__)
70 async_add_entities: AddEntitiesCallback,
71 discovery_info: DiscoveryInfoType |
None =
None,
73 """Set up the energy sensors."""
75 await sensor_manager.async_start()
78 @dataclass(slots=True)
80 """Adapter to allow sources and their flows to be used as sensors."""
82 source_type: Literal[
"grid",
"gas",
"water"]
83 flow_type: Literal[
"flow_from",
"flow_to",
None]
84 stat_energy_key: Literal[
"stat_energy_from",
"stat_energy_to"]
85 total_money_key: Literal[
"stat_cost",
"stat_compensation"]
90 SOURCE_ADAPTERS: Final = (
127 """Class to handle creation/removal of sensor data."""
130 self, manager: EnergyManager, async_add_entities: AddEntitiesCallback
132 """Initialize sensor manager."""
135 self.current_entities: dict[tuple[str, str |
None, str], EnergyCostSensor] = {}
145 """Process manager data."""
146 to_add: list[EnergyCostSensor] = []
147 to_remove =
dict(self.current_entities)
149 async
def finish() -> None:
152 await asyncio.wait(ent.add_finished
for ent
in to_add)
154 for key, entity
in to_remove.items():
155 self.current_entities.pop(key)
156 await entity.async_remove()
158 if not self.
managermanager.data:
162 for energy_source
in self.
managermanager.data[
"energy_sources"]:
163 for adapter
in SOURCE_ADAPTERS:
164 if adapter.source_type != energy_source[
"type"]:
167 if adapter.flow_type
is None:
176 for flow
in energy_source[adapter.flow_type]:
189 adapter: SourceAdapter,
190 config: Mapping[str, Any],
191 to_add: list[EnergyCostSensor],
192 to_remove: dict[tuple[str, str |
None, str], EnergyCostSensor],
194 """Process sensor data."""
196 if config.get(adapter.total_money_key)
is not None:
199 key = (adapter.source_type, adapter.flow_type, config[adapter.stat_energy_key])
204 config.get(
"entity_energy_price")
is None
205 and config.get(
"number_energy_price")
is None
209 if current_entity := to_remove.pop(key,
None):
210 current_entity.update_config(config)
217 to_add.append(self.current_entities[key])
221 """Set the result of a future unless it is done."""
222 if not future.done():
223 future.set_result(
None)
227 """Calculate costs incurred by consuming energy.
229 This is intended as a fallback for when no specific cost sensor is available for the
233 _attr_entity_registry_visible_default =
False
234 _attr_should_poll =
False
236 _wrong_state_class_reported =
False
237 _wrong_unit_reported =
False
241 adapter: SourceAdapter,
242 config: Mapping[str, Any],
244 """Initialize the sensor."""
255 self.add_finished: asyncio.Future[
None] = (
256 asyncio.get_running_loop().create_future()
259 def _reset(self, energy_state: State) ->
None:
260 """Reset the cost sensor."""
268 """Update incurred costs."""
269 if self.
_adapter_adapter.source_type ==
"grid":
270 valid_units = VALID_ENERGY_UNITS
271 default_price_unit: str |
None = UnitOfEnergy.KILO_WATT_HOUR
273 elif self.
_adapter_adapter.source_type ==
"gas":
274 valid_units = VALID_ENERGY_UNITS_GAS
276 default_price_unit =
None
278 elif self.
_adapter_adapter.source_type ==
"water":
279 valid_units = VALID_VOLUME_UNITS_WATER
280 if self.
hasshass.config.units
is METRIC_SYSTEM:
281 default_price_unit = UnitOfVolume.CUBIC_METERS
283 default_price_unit = UnitOfVolume.GALLONS
285 energy_state = self.
hasshass.states.get(
289 if energy_state
is None:
292 state_class = energy_state.attributes.get(ATTR_STATE_CLASS)
293 if state_class
not in SUPPORTED_STATE_CLASSES:
297 "Found unexpected state_class %s for %s",
299 energy_state.entity_id,
305 state_class == SensorStateClass.MEASUREMENT
306 and ATTR_LAST_RESET
not in energy_state.attributes
311 energy =
float(energy_state.state)
316 if self.
_config_config[
"entity_energy_price"]
is not None:
317 energy_price_state = self.
hasshass.states.get(
318 self.
_config_config[
"entity_energy_price"]
321 if energy_price_state
is None:
325 energy_price =
float(energy_price_state.state)
331 self.
_reset_reset(energy_state)
334 energy_price_unit: str |
None = energy_price_state.attributes.get(
335 ATTR_UNIT_OF_MEASUREMENT,
""
340 if energy_price_unit
not in valid_units:
341 energy_price_unit = default_price_unit
344 energy_price = cast(float, self.
_config_config[
"number_energy_price"])
345 energy_price_unit = default_price_unit
349 self.
_reset_reset(energy_state)
352 energy_unit: str |
None = energy_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
354 if energy_unit
is None or energy_unit
not in valid_units:
358 "Found unexpected unit %s for %s",
359 energy_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT),
360 energy_state.entity_id,
366 state_class != SensorStateClass.TOTAL_INCREASING
367 and energy_state.attributes.get(ATTR_LAST_RESET)
370 or state_class == SensorStateClass.TOTAL_INCREASING
380 energy_state_copy = copy.copy(energy_state)
381 energy_state_copy.state =
"0.0"
382 self.
_reset_reset(energy_state_copy)
388 if energy_price_unit
is None:
389 converted_energy_price = energy_price
391 converter: Callable[[float, str, str], float]
392 if energy_unit
in VALID_ENERGY_UNITS:
393 converter = unit_conversion.EnergyConverter.convert
395 converter = unit_conversion.VolumeConverter.convert
397 converted_energy_price = converter(
404 cur_value + (energy - old_energy_value) * converted_energy_price
410 """Register callbacks."""
411 energy_state = self.
hasshass.states.get(self.
_config_config[self.
_adapter_adapter.stat_energy_key])
413 name = energy_state.name
419 self.
_attr_name_attr_name = f
"{name} {self._adapter.name_suffix}"
424 self.
hasshass.data[DOMAIN][
"cost_sensors"][
439 """Handle child updates."""
445 """Abort adding an entity to a platform."""
450 """Handle removing from hass."""
451 self.
hasshass.data[DOMAIN][
"cost_sensors"].pop(
458 """Update the config."""
463 """Return the units of measurement."""
464 return self.
hasshass.config.currency
468 """Return the unique ID of the sensor."""
469 entity_registry = er.async_get(self.
hasshass)
470 if registry_entry := entity_registry.async_get(
473 prefix = registry_entry.id
477 return f
"{prefix}_{self._adapter.source_type}_{self._adapter.entity_id_suffix}"
bool _wrong_state_class_reported
None add_to_platform_abort(self)
str|None native_unit_of_measurement(self)
None __init__(self, SourceAdapter adapter, Mapping[str, Any] config)
_wrong_state_class_reported
None async_added_to_hass(self)
None async_will_remove_from_hass(self)
None _async_state_changed_listener(self, *Any _)
None _reset(self, State energy_state)
None update_config(self, Mapping[str, Any] config)
_last_energy_sensor_state
bool _wrong_unit_reported
None _process_manager_data(self)
None _process_sensor_data(self, SourceAdapter adapter, Mapping[str, Any] config, list[EnergyCostSensor] to_add, dict[tuple[str, str|None, str], EnergyCostSensor] to_remove)
None __init__(self, EnergyManager manager, AddEntitiesCallback async_add_entities)
None async_write_ha_state(self)
None async_on_remove(self, CALLBACK_TYPE func)
EnergyManager async_get_manager(HomeAssistant hass)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
None _set_result_unless_done(asyncio.Future[None] future)
bool reset_detected(HomeAssistant hass, str entity_id, float fstate, float|None previous_fstate, State state)
bool valid_entity_id(str entity_id)
tuple[str, str] split_entity_id(str entity_id)
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)