1 """The devolo Home Network integration."""
3 from __future__
import annotations
5 from asyncio
import Semaphore
6 from dataclasses
import dataclass
10 from devolo_plc_api
import Device
11 from devolo_plc_api.device_api
import (
17 from devolo_plc_api.exceptions.device
import (
19 DevicePasswordProtected,
22 from devolo_plc_api.plcnet_api
import LogicalNetwork
29 EVENT_HOMEASSISTANT_STOP,
39 CONNECTED_PLC_DEVICES,
40 CONNECTED_WIFI_CLIENTS,
42 FIRMWARE_UPDATE_INTERVAL,
45 NEIGHBORING_WIFI_NETWORKS,
47 SHORT_UPDATE_INTERVAL,
51 from .coordinator
import DevoloDataUpdateCoordinator
53 _LOGGER = logging.getLogger(__name__)
55 type DevoloHomeNetworkConfigEntry = ConfigEntry[DevoloHomeNetworkData]
60 """The devolo Home Network data."""
63 coordinators: dict[str, DevoloDataUpdateCoordinator[Any]]
67 hass: HomeAssistant, entry: DevoloHomeNetworkConfigEntry
69 """Set up devolo Home Network from a config entry."""
70 zeroconf_instance = await zeroconf.async_get_async_instance(hass)
72 device_registry = dr.async_get(hass)
73 semaphore = Semaphore(1)
77 ip=entry.data[CONF_IP_ADDRESS], zeroconf_instance=zeroconf_instance
79 await device.async_connect(session_instance=async_client)
80 device.password = entry.data.get(
84 except DeviceNotFound
as err:
86 translation_domain=DOMAIN,
87 translation_key=
"connection_failed",
88 translation_placeholders={
"ip_address": entry.data[CONF_IP_ADDRESS]},
93 async
def async_update_firmware_available() -> UpdateFirmwareCheck:
94 """Fetch data from API endpoint."""
98 return await device.device.async_check_firmware_available()
99 except DeviceUnavailable
as err:
101 translation_domain=DOMAIN,
102 translation_key=
"update_failed",
103 translation_placeholders={
"error":
str(err)},
106 async
def async_update_connected_plc_devices() -> LogicalNetwork:
107 """Fetch data from API endpoint."""
111 return await device.plcnet.async_get_network_overview()
112 except DeviceUnavailable
as err:
114 translation_domain=DOMAIN,
115 translation_key=
"update_failed",
116 translation_placeholders={
"error":
str(err)},
119 async
def async_update_guest_wifi_status() -> WifiGuestAccessGet:
120 """Fetch data from API endpoint."""
124 return await device.device.async_get_wifi_guest_access()
125 except DeviceUnavailable
as err:
127 translation_domain=DOMAIN,
128 translation_key=
"update_failed",
129 translation_placeholders={
"error":
str(err)},
131 except DevicePasswordProtected
as err:
133 translation_domain=DOMAIN, translation_key=
"password_wrong"
136 async
def async_update_led_status() -> bool:
137 """Fetch data from API endpoint."""
141 return await device.device.async_get_led_setting()
142 except DeviceUnavailable
as err:
144 translation_domain=DOMAIN,
145 translation_key=
"update_failed",
146 translation_placeholders={
"error":
str(err)},
149 async
def async_update_last_restart() -> int:
150 """Fetch data from API endpoint."""
154 return await device.device.async_uptime()
155 except DeviceUnavailable
as err:
157 translation_domain=DOMAIN,
158 translation_key=
"update_failed",
159 translation_placeholders={
"error":
str(err)},
161 except DevicePasswordProtected
as err:
163 translation_domain=DOMAIN, translation_key=
"password_wrong"
166 async
def async_update_wifi_connected_station() -> list[ConnectedStationInfo]:
167 """Fetch data from API endpoint."""
171 return await device.device.async_get_wifi_connected_station()
172 except DeviceUnavailable
as err:
174 translation_domain=DOMAIN,
175 translation_key=
"update_failed",
176 translation_placeholders={
"error":
str(err)},
179 async
def async_update_wifi_neighbor_access_points() -> list[NeighborAPInfo]:
180 """Fetch data from API endpoint."""
184 return await device.device.async_get_wifi_neighbor_access_points()
185 except DeviceUnavailable
as err:
187 translation_domain=DOMAIN,
188 translation_key=
"update_failed",
189 translation_placeholders={
"error":
str(err)},
192 async
def disconnect(event: Event) ->
None:
193 """Disconnect from device."""
194 await device.async_disconnect()
196 coordinators: dict[str, DevoloDataUpdateCoordinator[Any]] = {}
202 name=CONNECTED_PLC_DEVICES,
204 update_method=async_update_connected_plc_devices,
205 update_interval=LONG_UPDATE_INTERVAL,
207 if device.device
and "led" in device.device.features:
214 update_method=async_update_led_status,
215 update_interval=SHORT_UPDATE_INTERVAL,
217 if device.device
and "restart" in device.device.features:
224 update_method=async_update_last_restart,
225 update_interval=SHORT_UPDATE_INTERVAL,
227 if device.device
and "update" in device.device.features:
232 name=REGULAR_FIRMWARE,
234 update_method=async_update_firmware_available,
235 update_interval=FIRMWARE_UPDATE_INTERVAL,
237 if device.device
and "wifi1" in device.device.features:
242 name=CONNECTED_WIFI_CLIENTS,
244 update_method=async_update_wifi_connected_station,
245 update_interval=SHORT_UPDATE_INTERVAL,
251 name=NEIGHBORING_WIFI_NETWORKS,
253 update_method=async_update_wifi_neighbor_access_points,
254 update_interval=LONG_UPDATE_INTERVAL,
260 name=SWITCH_GUEST_WIFI,
262 update_method=async_update_guest_wifi_status,
263 update_interval=SHORT_UPDATE_INTERVAL,
266 for coordinator
in coordinators.values():
267 await coordinator.async_config_entry_first_refresh()
269 entry.runtime_data.coordinators = coordinators
271 await hass.config_entries.async_forward_entry_setups(entry,
platforms(device))
273 entry.async_on_unload(
274 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, disconnect)
281 hass: HomeAssistant, entry: DevoloHomeNetworkConfigEntry
283 """Unload a config entry."""
284 device = entry.runtime_data.device
285 unload_ok = await hass.config_entries.async_unload_platforms(
289 await device.async_disconnect()
296 """Assemble supported platforms."""
297 supported_platforms = {Platform.BUTTON, Platform.SENSOR, Platform.SWITCH}
299 supported_platforms.add(Platform.BINARY_SENSOR)
300 if device.device
and "wifi1" in device.device.features:
301 supported_platforms.add(Platform.DEVICE_TRACKER)
302 supported_platforms.add(Platform.IMAGE)
303 if device.device
and "update" in device.device.features:
304 supported_platforms.add(Platform.UPDATE)
305 return supported_platforms
310 """Update device registry with new firmware version."""
312 device_entry := device_registry.async_get_device(
313 identifiers={(DOMAIN,
str(device.serial_number))}
315 )
and device_entry.sw_version != device.firmware_version:
316 device_registry.async_update_device(
317 device_id=device_entry.id, sw_version=device.firmware_version
set[Platform] platforms(Device device)
None update_sw_version(dr.DeviceRegistry device_registry, Device device)
bool async_setup_entry(HomeAssistant hass, DevoloHomeNetworkConfigEntry entry)
bool async_unload_entry(HomeAssistant hass, DevoloHomeNetworkConfigEntry entry)
httpx.AsyncClient get_async_client(HomeAssistant hass, bool verify_ssl=True)