1 """ScreenlogicDataUpdateCoordinator definition."""
3 from datetime
import timedelta
5 from typing
import TYPE_CHECKING
7 from screenlogicpy
import ScreenLogicGateway
8 from screenlogicpy.const.common
import (
12 ScreenLogicCommunicationError,
14 from screenlogicpy.device_const.system
import EQUIPMENT_FLAG
22 from .config_flow
import async_discover_gateways_by_unique_id, name_for_mac
23 from .const
import DEFAULT_SCAN_INTERVAL, DOMAIN
25 _LOGGER = logging.getLogger(__name__)
27 REQUEST_REFRESH_DELAY = 2
28 HEATER_COOLDOWN_DELAY = 6
32 hass: HomeAssistant, entry: ConfigEntry
33 ) -> dict[str, str | int]:
34 """Construct connect_info from configuration entry and returns it to caller."""
38 if mac
in discovered_gateways:
39 return discovered_gateways[mac]
41 _LOGGER.debug(
"Gateway rediscovery failed for %s", entry.title)
43 assert mac
is not None
47 SL_GATEWAY_IP: entry.data[CONF_IP_ADDRESS],
48 SL_GATEWAY_PORT: entry.data[CONF_PORT],
53 """Class to manage the data update for the Screenlogic component."""
59 config_entry: ConfigEntry,
60 gateway: ScreenLogicGateway,
62 """Initialize the Screenlogic Data Update Coordinator."""
67 seconds=config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
73 update_interval=interval,
77 hass, _LOGGER, cooldown=REQUEST_REFRESH_DELAY, immediate=
False
82 """Update data sets based on equipment config."""
83 if not self.
gatewaygateway.is_client:
84 await self.
gatewaygateway.async_get_status()
85 if EQUIPMENT_FLAG.INTELLICHEM
in self.
gatewaygateway.equipment_flags:
86 await self.
gatewaygateway.async_get_chemistry()
88 await self.
gatewaygateway.async_get_pumps()
89 if EQUIPMENT_FLAG.CHLORINATOR
in self.
gatewaygateway.equipment_flags:
90 await self.
gatewaygateway.async_get_scg()
93 """Fetch data from the Screenlogic gateway."""
96 if not self.
gatewaygateway.is_connected:
100 await self.
gatewaygateway.async_connect(**connect_info)
103 except ScreenLogicCommunicationError
as sle:
104 if self.
gatewaygateway.is_connected:
105 await self.
gatewaygateway.async_disconnect()
None _async_update_configured_data(self)
None __init__(self, HomeAssistant hass, *ConfigEntry config_entry, ScreenLogicGateway gateway)
None _async_update_data(self)
str name_for_mac(str mac)
dict[str, dict[str, Any]] async_discover_gateways_by_unique_id()
dict[str, str|int] async_get_connect_info(HomeAssistant hass, ConfigEntry entry)