Home Assistant Unofficial Reference 2024.12.1
config_flow.py
Go to the documentation of this file.
1 """Config flow for kraken integration."""
2 
3 from __future__ import annotations
4 
5 from typing import Any
6 
7 import krakenex
8 from pykrakenapi.pykrakenapi import KrakenAPI
9 import voluptuous as vol
10 
11 from homeassistant.config_entries import (
12  ConfigEntry,
13  ConfigFlow,
14  ConfigFlowResult,
15  OptionsFlow,
16 )
17 from homeassistant.const import CONF_SCAN_INTERVAL
18 from homeassistant.core import callback
19 from homeassistant.helpers import config_validation as cv
20 
21 from .const import CONF_TRACKED_ASSET_PAIRS, DEFAULT_SCAN_INTERVAL, DOMAIN
22 from .utils import get_tradable_asset_pairs
23 
24 
25 class KrakenConfigFlow(ConfigFlow, domain=DOMAIN):
26  """Handle a config flow for kraken."""
27 
28  VERSION = 1
29 
30  @staticmethod
31  @callback
33  config_entry: ConfigEntry,
34  ) -> KrakenOptionsFlowHandler:
35  """Get the options flow for this handler."""
37 
38  async def async_step_user(
39  self, user_input: dict[str, Any] | None = None
40  ) -> ConfigFlowResult:
41  """Handle the initial step."""
42  if self._async_current_entries_async_current_entries():
43  return self.async_abortasync_abortasync_abort(reason="already_configured")
44  if user_input is not None:
45  return self.async_create_entryasync_create_entryasync_create_entry(title=DOMAIN, data=user_input)
46  return self.async_show_formasync_show_formasync_show_form(
47  step_id="user",
48  data_schema=None,
49  errors={},
50  )
51 
52 
54  """Handle Kraken client options."""
55 
56  async def async_step_init(
57  self, user_input: dict[str, Any] | None = None
58  ) -> ConfigFlowResult:
59  """Manage the Kraken options."""
60  if user_input is not None:
61  return self.async_create_entryasync_create_entry(title="", data=user_input)
62 
63  api = KrakenAPI(krakenex.API(), retry=0, crl_sleep=0)
64  tradable_asset_pairs = await self.hass.async_add_executor_job(
65  get_tradable_asset_pairs, api
66  )
67  tradable_asset_pairs_for_multi_select = {v: v for v in tradable_asset_pairs}
68 
69  # Ensure that a previously selected tracked asset pair is still available in multiselect
70  # even if it is not tradable anymore
71  tracked_asset_pairs = self.config_entryconfig_entryconfig_entry.options.get(
72  CONF_TRACKED_ASSET_PAIRS, []
73  )
74  tradable_asset_pairs_for_multi_select.update(
75  {
76  tracked_asset_pair: tracked_asset_pair
77  for tracked_asset_pair in tracked_asset_pairs
78  }
79  )
80 
81  options = {
82  vol.Optional(
83  CONF_SCAN_INTERVAL,
84  default=self.config_entryconfig_entryconfig_entry.options.get(
85  CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
86  ),
87  ): int,
88  vol.Optional(
89  CONF_TRACKED_ASSET_PAIRS,
90  default=tracked_asset_pairs,
91  ): cv.multi_select(tradable_asset_pairs_for_multi_select),
92  }
93 
94  return self.async_show_formasync_show_form(step_id="init", data_schema=vol.Schema(options))
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:40
KrakenOptionsFlowHandler async_get_options_flow(ConfigEntry config_entry)
Definition: config_flow.py:34
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
Definition: config_flow.py:58
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
list[ConfigEntry] _async_current_entries(self, bool|None include_ignore=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)