1 """Support for monitoring energy usage using the DTE energy bridge."""
3 from __future__
import annotations
5 from http
import HTTPStatus
9 import voluptuous
as vol
12 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
24 _LOGGER = logging.getLogger(__name__)
26 CONF_IP_ADDRESS =
"ip"
27 CONF_VERSION =
"version"
29 DEFAULT_NAME =
"Current Energy Usage"
31 DOMAIN =
"dte_energy_bridge"
33 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
35 vol.Required(CONF_IP_ADDRESS): cv.string,
36 vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
37 vol.Optional(CONF_VERSION, default=DEFAULT_VERSION): vol.All(
38 vol.Coerce(int), vol.Any(1, 2)
47 add_entities: AddEntitiesCallback,
48 discovery_info: DiscoveryInfoType |
None =
None,
50 """Set up the DTE energy bridge sensor."""
54 "deprecated_integration",
55 breaks_in_ha_version=
"2025.1.0",
58 severity=IssueSeverity.WARNING,
59 translation_key=
"deprecated_integration",
60 translation_placeholders={
"domain": DOMAIN},
63 name = config[CONF_NAME]
64 ip_address = config[CONF_IP_ADDRESS]
65 version = config[CONF_VERSION]
71 """Implementation of the DTE Energy Bridge sensors."""
73 _attr_device_class = SensorDeviceClass.POWER
74 _attr_native_unit_of_measurement = UnitOfPower.KILO_WATT
75 _attr_state_class = SensorStateClass.MEASUREMENT
78 """Initialize the sensor."""
82 self.
_url_url = f
"http://{ip_address}/instantaneousdemand"
84 self.
_url_url = f
"http://{ip_address}:8888/zigbee/se/instantaneousdemand"
89 """Get the energy usage data from the DTE energy bridge."""
91 response = requests.get(self.
_url_url, timeout=5)
92 except (requests.exceptions.RequestException, ValueError):
94 "Could not update status for DTE Energy Bridge (%s)", self.
_attr_name_attr_name
98 if response.status_code != HTTPStatus.OK:
100 "Invalid status_code from DTE Energy Bridge: %s (%s)",
101 response.status_code,
106 response_split = response.text.split()
108 if len(response_split) != 2:
110 'Invalid response from DTE Energy Bridge: "%s" (%s)',
116 val =
float(response_split[0])
124 if self.
_version_version == 1
and "." in response_split[0]:
def __init__(self, ip_address, name, version)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
def add_entities(account, async_add_entities, tracked)
None create_issue(HomeAssistant hass, str domain, str issue_id, *str|None breaks_in_ha_version=None, dict[str, str|int|float|None]|None data=None, bool is_fixable, bool is_persistent=False, str|None issue_domain=None, str|None learn_more_url=None, IssueSeverity severity, str translation_key, dict[str, str]|None translation_placeholders=None)