Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the Lektrico Charging Station integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from typing import Any
7 
8 from lektricowifi import Device, DeviceConnectionError
9 
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.const import (
12  ATTR_HW_VERSION,
13  ATTR_SERIAL_NUMBER,
14  CONF_HOST,
15  CONF_TYPE,
16 )
17 from homeassistant.core import HomeAssistant
18 from homeassistant.helpers.httpx_client import get_async_client
19 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
20 
21 from .const import LOGGER
22 
23 SCAN_INTERVAL = timedelta(seconds=10)
24 
25 
27  """Data update coordinator for Lektrico device."""
28 
29  config_entry: ConfigEntry
30 
31  def __init__(self, hass: HomeAssistant, device_name: str) -> None:
32  """Initialize a Lektrico Device."""
33  super().__init__(
34  hass,
35  LOGGER,
36  name=device_name,
37  update_interval=SCAN_INTERVAL,
38  )
39  self.devicedevice = Device(
40  self.config_entryconfig_entry.data[CONF_HOST],
41  asyncClient=get_async_client(hass),
42  )
43  self.serial_number: str = self.config_entryconfig_entry.data[ATTR_SERIAL_NUMBER]
44  self.board_revision: str = self.config_entryconfig_entry.data[ATTR_HW_VERSION]
45  self.device_type: str = self.config_entryconfig_entry.data[CONF_TYPE]
46 
47  async def _async_update_data(self) -> dict[str, Any]:
48  """Async Update device state."""
49  try:
50  return await self.devicedevice.device_info(self.device_type)
51  except DeviceConnectionError as lek_ex:
52  raise UpdateFailed(lek_ex) from lek_ex
httpx.AsyncClient get_async_client(HomeAssistant hass, bool verify_ssl=True)
Definition: httpx_client.py:41