1 """DataUpdateCoordinator for the LG ThinQ device."""
3 from __future__
import annotations
8 from thinqconnect
import ThinQAPIException
9 from thinqconnect.integration
import HABridge
14 from .const
import DOMAIN
16 _LOGGER = logging.getLogger(__name__)
20 """LG Device's Data Update Coordinator."""
22 def __init__(self, hass: HomeAssistant, ha_bridge: HABridge) ->
None:
23 """Initialize data coordinator."""
27 name=f
"{DOMAIN}_{ha_bridge.device.device_id}",
31 self.
apiapi = ha_bridge
32 self.
device_iddevice_id = ha_bridge.device.device_id
35 alias = ha_bridge.device.alias
46 f
"{self.device_id}_{self.sub_id}" if self.
sub_idsub_id
else self.
device_iddevice_id
50 """Request to the server to update the status from full response data."""
53 except ThinQAPIException
as e:
57 """Refresh current status."""
61 """Handle the status received from the mqtt connection."""
62 data = self.
apiapi.update_status(status)
67 """Handle the status received from the mqtt connection."""
68 data = self.
apiapi.update_notification(message)
74 hass: HomeAssistant, ha_bridge: HABridge
75 ) -> DeviceDataUpdateCoordinator:
76 """Create DeviceDataUpdateCoordinator and device_api per device."""
78 await coordinator.async_refresh()
81 "Setup device's coordinator: %s, model:%s",
82 coordinator.device_name,
83 coordinator.api.device.model_name,
None handle_update_status(self, dict[str, Any] status)
None refresh_status(self)
None handle_notification_message(self, str|None message)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, HABridge ha_bridge)
None async_set_updated_data(self, _DataT data)
DeviceDataUpdateCoordinator async_setup_device_coordinator(HomeAssistant hass, HABridge ha_bridge)
MetOfficeData fetch_data(datapoint.Manager connection, Site site, str mode)