Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the surepetcare integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from surepy import Surepy, SurepyEntity
9 from surepy.enums import EntityType, Location, LockState
10 from surepy.exceptions import SurePetcareAuthenticationError, SurePetcareError
11 
12 from homeassistant.config_entries import ConfigEntry
13 from homeassistant.const import CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
14 from homeassistant.core import HomeAssistant, ServiceCall
15 from homeassistant.exceptions import ConfigEntryAuthFailed
16 from homeassistant.helpers.aiohttp_client import async_get_clientsession
17 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
18 
19 from .const import (
20  ATTR_FLAP_ID,
21  ATTR_LOCATION,
22  ATTR_LOCK_STATE,
23  ATTR_PET_NAME,
24  DOMAIN,
25  SURE_API_TIMEOUT,
26 )
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 SCAN_INTERVAL = timedelta(minutes=3)
31 
32 
33 class SurePetcareDataCoordinator(DataUpdateCoordinator[dict[int, SurepyEntity]]):
34  """Handle Surepetcare data."""
35 
36  def __init__(self, entry: ConfigEntry, hass: HomeAssistant) -> None:
37  """Initialize the data handler."""
38  self.surepysurepy = Surepy(
39  entry.data[CONF_USERNAME],
40  entry.data[CONF_PASSWORD],
41  auth_token=entry.data[CONF_TOKEN],
42  api_timeout=SURE_API_TIMEOUT,
43  session=async_get_clientsession(hass),
44  )
45  self.lock_states_callbackslock_states_callbacks = {
46  LockState.UNLOCKED.name.lower(): self.surepysurepy.sac.unlock,
47  LockState.LOCKED_IN.name.lower(): self.surepysurepy.sac.lock_in,
48  LockState.LOCKED_OUT.name.lower(): self.surepysurepy.sac.lock_out,
49  LockState.LOCKED_ALL.name.lower(): self.surepysurepy.sac.lock,
50  }
51  super().__init__(
52  hass,
53  _LOGGER,
54  name=DOMAIN,
55  update_interval=SCAN_INTERVAL,
56  )
57 
58  async def _async_update_data(self) -> dict[int, SurepyEntity]:
59  """Get the latest data from Sure Petcare."""
60  try:
61  return await self.surepysurepy.get_entities(refresh=True)
62  except SurePetcareAuthenticationError as err:
63  raise ConfigEntryAuthFailed("Invalid username/password") from err
64  except SurePetcareError as err:
65  raise UpdateFailed(f"Unable to fetch data: {err}") from err
66 
67  async def handle_set_lock_state(self, call: ServiceCall) -> None:
68  """Call when setting the lock state."""
69  flap_id = call.data[ATTR_FLAP_ID]
70  state = call.data[ATTR_LOCK_STATE]
71  await self.lock_states_callbackslock_states_callbacks[state](flap_id)
72  await self.async_request_refreshasync_request_refresh()
73 
74  def get_pets(self) -> dict[str, int]:
75  """Get pets."""
76  pets = {}
77  for surepy_entity in self.datadata.values():
78  if surepy_entity.type == EntityType.PET and surepy_entity.name:
79  pets[surepy_entity.name] = surepy_entity.id
80  return pets
81 
82  async def handle_set_pet_location(self, call: ServiceCall) -> None:
83  """Call when setting the pet location."""
84  pet_name = call.data[ATTR_PET_NAME]
85  location = call.data[ATTR_LOCATION]
86  device_id = self.get_petsget_pets()[pet_name]
87  await self.surepysurepy.sac.set_pet_location(device_id, Location[location.upper()])
88  await self.async_request_refreshasync_request_refresh()
None __init__(self, ConfigEntry entry, HomeAssistant hass)
Definition: coordinator.py:36
list[BaseAprilaireSensor] get_entities(type[BaseAprilaireSensor] entity_class, AprilaireCoordinator coordinator, str unique_id, tuple[AprilaireSensorDescription,...] descriptions)
Definition: sensor.py:64
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)