1 """Data update coordinator for the Steam integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
6 from typing
import TYPE_CHECKING
9 from steam.api
import _interface_method
as INTMethod
16 from .const
import CONF_ACCOUNTS, DOMAIN, LOGGER
19 from .
import SteamConfigEntry
23 DataUpdateCoordinator[dict[str, dict[str, str | int]]]
25 """Data update coordinator for the Steam integration."""
27 config_entry: SteamConfigEntry
29 def __init__(self, hass: HomeAssistant) ->
None:
30 """Initialize the coordinator."""
40 steam.api.key.set(self.
config_entryconfig_entry.data[CONF_API_KEY])
42 def _update(self) -> dict[str, dict[str, str | int]]:
43 """Fetch data from API endpoint."""
44 accounts = self.
config_entryconfig_entry.options[CONF_ACCOUNTS]
52 steamid=_id, include_appinfo=1
55 game[
"appid"]: game[
"img_icon_url"]
for game
in res.get(
"games", [])
57 response = self.
user_interfaceuser_interface.GetPlayerSummaries(steamids=_ids)
59 player[
"steamid"]: player
60 for player
in response[
"response"][
"players"][
"player"]
61 if player[
"steamid"]
in _ids
63 for value
in players.values():
64 data = self.
player_interfaceplayer_interface.GetSteamLevel(steamid=value[
"steamid"])
65 value[
"level"] = data[
"response"].
get(
"player_level")
69 """Send request to the executor."""
71 return await self.
hasshass.async_add_executor_job(self.
_update_update)
73 except (steam.api.HTTPError, steam.api.HTTPTimeoutError)
as ex:
75 raise ConfigEntryAuthFailed
from ex
dict[str, dict[str, str|int]] _update(self)
dict[str, dict[str, str|int]] _async_update_data(self)
None __init__(self, HomeAssistant hass)
web.Response get(self, web.Request request, str config_key)