1 """Config flow for RSS/Atom feeds."""
3 from __future__
import annotations
11 import voluptuous
as vol
31 from .const
import CONF_MAX_ENTRIES, DEFAULT_MAX_ENTRIES, DOMAIN
33 LOGGER = logging.getLogger(__name__)
36 async
def async_fetch_feed(hass: HomeAssistant, url: str) -> feedparser.FeedParserDict:
38 return await hass.async_add_executor_job(feedparser.parse, url)
42 """Handle a config flow."""
45 _max_entries: int |
None =
None
50 config_entry: ConfigEntry,
52 """Get the options flow for this handler."""
57 user_input: dict[str, Any] |
None =
None,
58 errors: dict[str, str] |
None =
None,
59 description_placeholders: dict[str, str] |
None =
None,
60 step_id: str =
"user",
61 ) -> ConfigFlowResult:
62 """Show the user form."""
63 if user_input
is None:
67 data_schema=vol.Schema(
70 CONF_URL, default=user_input.get(CONF_URL,
"")
74 description_placeholders=description_placeholders,
79 """Abort import flow on error."""
83 f
"import_yaml_error_{DOMAIN}_{error}_{slugify(url)}",
84 breaks_in_ha_version=
"2025.1.0",
87 severity=IssueSeverity.WARNING,
88 translation_key=f
"import_yaml_error_{error}",
89 translation_placeholders={
"url": url},
94 self, user_input: dict[str, Any] |
None =
None
95 ) -> ConfigFlowResult:
96 """Handle a flow initialized by the user."""
105 LOGGER.debug(
"feed bozo_exception: %s", feed.bozo_exception)
106 if isinstance(feed.bozo_exception, urllib.error.URLError):
107 if self.context[
"source"] == SOURCE_IMPORT:
109 return self.
show_user_formshow_user_form(user_input, {
"base":
"url_error"})
111 feed_title = html.unescape(feed[
"feed"][
"title"])
116 options={CONF_MAX_ENTRIES: self.
_max_entries_max_entries
or DEFAULT_MAX_ENTRIES},
120 """Handle an import flow."""
125 self, user_input: dict[str, Any] |
None =
None
126 ) -> ConfigFlowResult:
127 """Handle a reconfiguration flow initialized by the user."""
131 user_input={**reconfigure_entry.data},
132 description_placeholders={
"name": reconfigure_entry.title},
133 step_id=
"reconfigure",
139 LOGGER.debug(
"feed bozo_exception: %s", feed.bozo_exception)
140 if isinstance(feed.bozo_exception, urllib.error.URLError):
142 user_input=user_input,
143 description_placeholders={
"name": reconfigure_entry.title},
144 step_id=
"reconfigure",
145 errors={
"base":
"url_error"},
148 self.hass.config_entries.async_update_entry(reconfigure_entry, data=user_input)
153 """Handle an options flow."""
156 self, user_input: dict[str, Any] |
None =
None
157 ) -> ConfigFlowResult:
158 """Handle options flow."""
160 if user_input
is not None:
163 data_schema = vol.Schema(
168 CONF_MAX_ENTRIES, DEFAULT_MAX_ENTRIES
173 return self.
async_show_formasync_show_form(step_id=
"init", data_schema=data_schema)
OptionsFlow async_get_options_flow(ConfigEntry config_entry)
ConfigFlowResult abort_on_import_error(self, str url, str error)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_reconfigure(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_import(self, dict[str, Any] import_data)
ConfigFlowResult show_user_form(self, dict[str, Any]|None user_input=None, dict[str, str]|None errors=None, dict[str, str]|None description_placeholders=None, str step_id="user")
ConfigFlowResult async_step_init(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigEntry _get_reconfigure_entry(self)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
ConfigEntry config_entry(self)
None config_entry(self, ConfigEntry value)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
_FlowResultT async_abort(self, *str reason, Mapping[str, str]|None description_placeholders=None)
feedparser.FeedParserDict async_fetch_feed(HomeAssistant hass, str url)
None async_create_issue(HomeAssistant hass, str entry_id)