3 from __future__
import annotations
6 from datetime
import timedelta
10 from eagle100
import Eagle
as Eagle100Reader
20 CONF_HARDWARE_ADDRESS,
24 from .data
import UPDATE_100_ERRORS
26 _LOGGER = logging.getLogger(__name__)
30 """Get the latest data from the Eagle device."""
32 eagle100_reader: Eagle100Reader |
None =
None
33 eagle200_meter: aioeagle.ElectricMeter |
None =
None
35 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) ->
None:
36 """Initialize the data object."""
42 self.
modelmodel =
"EAGLE-200"
48 name=entry.data[CONF_CLOUD_ID],
50 update_method=update_method,
55 """Return the cloud ID."""
56 return self.
entryentry.data[CONF_CLOUD_ID]
60 """Return entry type."""
61 return self.
entryentry.data[CONF_TYPE]
65 """Return hardware address of meter."""
66 return self.
entryentry.data[CONF_HARDWARE_ADDRESS]
70 """Return if the hub is connected to the electric meter."""
77 """Get the latest data from the Eagle-200 device."""
79 hub = aioeagle.EagleHub(
80 aiohttp_client.async_get_clientsession(self.
hasshass),
82 self.
entryentry.data[CONF_INSTALL_CODE],
83 host=self.
entryentry.data[CONF_HOST],
85 eagle200_meter = aioeagle.ElectricMeter.create_instance(
90 is_connected = eagle200_meter.is_connected
92 async
with asyncio.timeout(30):
93 data = await eagle200_meter.get_device_query()
97 elif is_connected
and not eagle200_meter.is_connected:
98 _LOGGER.warning(
"Lost connection with electricity meter")
100 _LOGGER.debug(
"API data: %s", data)
101 return {var[
"Name"]: var[
"Value"]
for var
in data.values()}
104 """Get the latest data from the Eagle-100 device."""
107 except UPDATE_100_ERRORS
as error:
108 raise UpdateFailed
from error
110 _LOGGER.debug(
"API data: %s", data)
114 """Fetch and return the four sensor values in a dict."""
118 self.
entryentry.data[CONF_INSTALL_CODE],
119 self.
entryentry.data[CONF_HOST],
124 resp = self.
eagle100_readereagle100_reader.get_instantaneous_demand()[
"InstantaneousDemand"]
125 out[
"zigbee:InstantaneousDemand"] = resp[
"Demand"]
127 resp = self.
eagle100_readereagle100_reader.get_current_summation()[
"CurrentSummation"]
128 out[
"zigbee:CurrentSummationDelivered"] = resp[
"SummationDelivered"]
129 out[
"zigbee:CurrentSummationReceived"] = resp[
"SummationReceived"]
def _async_update_data_200(self)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
def _fetch_data_100(self)
def hardware_address(self)
def _async_update_data_100(self)