1 """Coordinator for speedtestdotnet."""
3 from datetime
import timedelta
5 from typing
import Any, cast
13 from .const
import CONF_SERVER_ID, DEFAULT_SCAN_INTERVAL, DEFAULT_SERVER, DOMAIN
15 _LOGGER = logging.getLogger(__name__)
19 """Get the latest data from speedtest.net."""
21 config_entry: ConfigEntry
24 self, hass: HomeAssistant, config_entry: ConfigEntry, api: speedtest.Speedtest
26 """Initialize the data object."""
30 self.servers: dict[str, dict] = {DEFAULT_SERVER: {}}
35 update_interval=
timedelta(minutes=DEFAULT_SCAN_INTERVAL),
39 """Update list of test servers."""
40 test_servers = self.
apiapi.get_servers()
42 server
for servers
in test_servers.values()
for server
in servers
53 f
"{server['country']} - {server['sponsor']} - {server['name']}"
57 """Get the latest data from speedtest.net."""
59 self.
apiapi.closest.clear()
62 self.
apiapi.get_servers(servers=[server_id])
64 best_server = self.
apiapi.get_best_server()
66 "Executing speedtest.net speed test with server_id: %s",
69 self.
apiapi.download()
71 return cast(dict[str, Any], self.
apiapi.results.dict())
74 """Update Speedtest data."""
77 except speedtest.NoMatchedServers
as err:
78 raise UpdateFailed(
"Selected server is not found.")
from err
79 except speedtest.SpeedtestException
as err:
None update_servers(self)
dict[str, Any] _async_update_data(self)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, speedtest.Speedtest api)
dict[str, Any] update_data(self)