1 """DataUpdateCoordinator for Motionblinds integration."""
4 from datetime
import timedelta
8 from motionblinds
import DEVICE_TYPES_WIFI, ParseException
22 _LOGGER = logging.getLogger(__name__)
26 """Class to manage fetching data from single endpoint."""
31 logger: logging.Logger,
32 coordinator_info: dict[str, Any],
35 update_interval: timedelta,
37 """Initialize global data updater."""
42 update_interval=update_interval,
45 self.
api_lockapi_lock = coordinator_info[KEY_API_LOCK]
46 self.
_gateway_gateway = coordinator_info[KEY_GATEWAY]
50 """Fetch data from gateway."""
53 except (TimeoutError, ParseException):
55 return {ATTR_AVAILABLE:
False}
57 return {ATTR_AVAILABLE:
True}
60 """Fetch data from a blind."""
62 if blind.device_type
in DEVICE_TYPES_WIFI:
63 blind.Update_from_cache()
67 blind.Update_trigger()
68 except (TimeoutError, ParseException):
70 return {ATTR_AVAILABLE:
False}
72 return {ATTR_AVAILABLE:
True}
75 """Fetch the latest data from the gateway and blinds."""
79 data[KEY_GATEWAY] = await self.
hasshass.async_add_executor_job(
83 for blind
in self.
_gateway_gateway.device_list.values():
84 await asyncio.sleep(1.5)
86 data[blind.mac] = await self.
hasshass.async_add_executor_job(
90 all_available = all(device[ATTR_AVAILABLE]
for device
in data.values())
def _async_update_data(self)
def update_blind(self, blind)
None __init__(self, HomeAssistant hass, logging.Logger logger, dict[str, Any] coordinator_info, *str name, timedelta update_interval)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)