Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The lookin integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Coroutine
7 import logging
8 from typing import Any
9 
10 import aiohttp
11 from aiolookin import (
12  Climate,
13  LookInHttpProtocol,
14  LookinUDPSubscriptions,
15  MeteoSensor,
16  NoUsableService,
17  Remote,
18  start_lookin_udp,
19 )
20 from aiolookin.models import UDPCommandType, UDPEvent
21 
22 from homeassistant.config_entries import ConfigEntry, ConfigEntryState
23 from homeassistant.const import CONF_HOST, Platform
24 from homeassistant.core import HomeAssistant, callback
25 from homeassistant.exceptions import ConfigEntryNotReady
26 from homeassistant.helpers import device_registry as dr
27 from homeassistant.helpers.aiohttp_client import async_get_clientsession
28 
29 from .const import (
30  DOMAIN,
31  METEO_UPDATE_INTERVAL,
32  PLATFORMS,
33  REMOTE_UPDATE_INTERVAL,
34  TYPE_TO_PLATFORM,
35 )
36 from .coordinator import LookinDataUpdateCoordinator, LookinPushCoordinator
37 from .models import LookinData
38 
39 LOGGER = logging.getLogger(__name__)
40 
41 UDP_MANAGER = "udp_manager"
42 
43 
45  lookin_protocol: LookInHttpProtocol,
46  uuid: str,
47 ) -> Callable[[], Coroutine[None, Any, Remote]]:
48  """Create a function to capture the cell variable."""
49 
50  async def _async_update() -> Climate:
51  return await lookin_protocol.get_conditioner(uuid)
52 
53  return _async_update
54 
55 
57  lookin_protocol: LookInHttpProtocol,
58  uuid: str,
59 ) -> Callable[[], Coroutine[None, Any, Remote]]:
60  """Create a function to capture the cell variable."""
61 
62  async def _async_update() -> Remote:
63  return await lookin_protocol.get_remote(uuid)
64 
65  return _async_update
66 
67 
69  """Manage the lookin UDP subscriptions."""
70 
71  def __init__(self) -> None:
72  """Init the manager."""
73  self._lock_lock = asyncio.Lock()
74  self._listener_listener: Callable | None = None
75  self._subscriptions_subscriptions: LookinUDPSubscriptions | None = None
76 
77  async def async_get_subscriptions(self) -> LookinUDPSubscriptions:
78  """Get the shared LookinUDPSubscriptions."""
79  async with self._lock_lock:
80  if not self._listener_listener:
81  self._subscriptions_subscriptions = LookinUDPSubscriptions()
82  self._listener_listener = await start_lookin_udp(self._subscriptions_subscriptions, None)
83  return self._subscriptions_subscriptions
84 
85  async def async_stop(self) -> None:
86  """Stop the listener."""
87  async with self._lock_lock:
88  assert self._listener_listener is not None
89  self._listener_listener()
90  self._listener_listener = None
91  self._subscriptions_subscriptions = None
92 
93 
94 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
95  """Set up lookin from a config entry."""
96  domain_data = hass.data.setdefault(DOMAIN, {})
97  host = entry.data[CONF_HOST]
98  lookin_protocol = LookInHttpProtocol(
99  api_uri=f"http://{host}", session=async_get_clientsession(hass)
100  )
101 
102  try:
103  lookin_device = await lookin_protocol.get_info()
104  devices = await lookin_protocol.get_devices()
105  except (TimeoutError, aiohttp.ClientError, NoUsableService) as ex:
106  raise ConfigEntryNotReady from ex
107 
108  if entry.unique_id != (found_uuid := lookin_device.id.upper()):
109  # If the uuid of the device does not match the unique_id
110  # of the config entry, it likely means the DHCP lease has expired
111  # and the device has been assigned a new IP address. We need to
112  # wait for the next discovery to find the device at its new address
113  # and update the config entry so we do not mix up devices.
114  raise ConfigEntryNotReady(
115  f"Unexpected device found at {host}; expected {entry.unique_id}, "
116  f"found {found_uuid}"
117  )
118 
119  push_coordinator = LookinPushCoordinator(entry.title)
120 
121  if lookin_device.model >= 2:
122  coordinator_class = LookinDataUpdateCoordinator[MeteoSensor]
123  meteo_coordinator = coordinator_class(
124  hass,
125  push_coordinator,
126  name=entry.title,
127  update_method=lookin_protocol.get_meteo_sensor,
128  update_interval=METEO_UPDATE_INTERVAL, # Updates are pushed (fallback is polling)
129  )
130  await meteo_coordinator.async_config_entry_first_refresh()
131 
132  device_coordinators: dict[str, LookinDataUpdateCoordinator[Remote]] = {}
133  for remote in devices:
134  if (platform := TYPE_TO_PLATFORM.get(remote["Type"])) is None:
135  continue
136  uuid = remote["UUID"]
137  if platform == Platform.CLIMATE:
138  updater = _async_climate_updater(lookin_protocol, uuid)
139  else:
140  updater = _async_remote_updater(lookin_protocol, uuid)
141  coordinator = LookinDataUpdateCoordinator(
142  hass,
143  push_coordinator,
144  name=f"{entry.title} {uuid}",
145  update_method=updater,
146  update_interval=REMOTE_UPDATE_INTERVAL, # Updates are pushed (fallback is polling)
147  )
148  await coordinator.async_config_entry_first_refresh()
149  device_coordinators[uuid] = coordinator
150 
151  @callback
152  def _async_meteo_push_update(event: UDPEvent) -> None:
153  """Process an update pushed via UDP."""
154  LOGGER.debug("Processing push message for meteo sensor: %s", event)
155  meteo: MeteoSensor = meteo_coordinator.data
156  meteo.update_from_value(event.value)
157  meteo_coordinator.async_set_updated_data(meteo)
158 
159  if UDP_MANAGER not in domain_data:
160  manager = domain_data[UDP_MANAGER] = LookinUDPManager()
161  else:
162  manager = domain_data[UDP_MANAGER]
163 
164  lookin_udp_subs = await manager.async_get_subscriptions()
165 
166  if lookin_device.model >= 2:
167  entry.async_on_unload(
168  lookin_udp_subs.subscribe_event(
169  lookin_device.id, UDPCommandType.meteo, None, _async_meteo_push_update
170  )
171  )
172 
173  hass.data[DOMAIN][entry.entry_id] = LookinData(
174  host=host,
175  lookin_udp_subs=lookin_udp_subs,
176  lookin_device=lookin_device,
177  meteo_coordinator=meteo_coordinator if lookin_device.model >= 2 else None,
178  devices=devices,
179  lookin_protocol=lookin_protocol,
180  device_coordinators=device_coordinators,
181  )
182 
183  await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
184 
185  return True
186 
187 
188 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
189  """Unload a config entry."""
190  if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
191  hass.data[DOMAIN].pop(entry.entry_id)
192 
193  loaded_entries = [
194  entry
195  for entry in hass.config_entries.async_entries(DOMAIN)
196  if entry.state == ConfigEntryState.LOADED
197  ]
198  if len(loaded_entries) == 1:
199  manager: LookinUDPManager = hass.data[DOMAIN][UDP_MANAGER]
200  await manager.async_stop()
201  return unload_ok
202 
203 
205  hass: HomeAssistant, entry: ConfigEntry, device_entry: dr.DeviceEntry
206 ) -> bool:
207  """Remove lookin config entry from a device."""
208  data: LookinData = hass.data[DOMAIN][entry.entry_id]
209  all_identifiers: set[tuple[str, str]] = {
210  (DOMAIN, data.lookin_device.id),
211  *((DOMAIN, remote["UUID"]) for remote in data.devices),
212  }
213  return not any(
214  identifier
215  for identifier in device_entry.identifiers
216  if identifier in all_identifiers
217  )
LookinUDPSubscriptions async_get_subscriptions(self)
Definition: __init__.py:77
Callable[[], Coroutine[None, Any, Remote]] _async_climate_updater(LookInHttpProtocol lookin_protocol, str uuid)
Definition: __init__.py:47
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:94
Callable[[], Coroutine[None, Any, Remote]] _async_remote_updater(LookInHttpProtocol lookin_protocol, str uuid)
Definition: __init__.py:59
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:188
bool async_remove_config_entry_device(HomeAssistant hass, ConfigEntry entry, dr.DeviceEntry device_entry)
Definition: __init__.py:206
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)