1 """Config flow for Local Calendar integration."""
3 from __future__
import annotations
6 from pathlib
import Path
10 from ical.calendar_stream
import CalendarStream
11 from ical.exceptions
import CalendarParseError
12 import voluptuous
as vol
32 _LOGGER = logging.getLogger(__name__)
34 STEP_USER_DATA_SCHEMA = vol.Schema(
36 vol.Required(CONF_CALENDAR_NAME): str,
37 vol.Optional(CONF_IMPORT, default=ATTR_CREATE_EMPTY): selector.SelectSelector(
38 selector.SelectSelectorConfig(
43 translation_key=CONF_IMPORT,
49 STEP_IMPORT_DATA_SCHEMA = vol.Schema(
51 vol.Required(CONF_ICS_FILE): selector.FileSelector(
52 config=selector.FileSelectorConfig(accept=
".ics")
59 """Handle a config flow for Local Calendar."""
64 """Initialize the config flow."""
65 self.
datadata: dict[str, Any] = {}
68 self, user_input: dict[str, Any] |
None =
None
69 ) -> ConfigFlowResult:
70 """Handle the initial step."""
71 if user_input
is None:
73 step_id=
"user", data_schema=STEP_USER_DATA_SCHEMA
76 key =
slugify(user_input[CONF_CALENDAR_NAME])
78 user_input[CONF_STORAGE_KEY] = key
79 if user_input.get(CONF_IMPORT) == ATTR_IMPORT_ICS_FILE:
83 title=user_input[CONF_CALENDAR_NAME],
88 self, user_input: dict[str, Any] |
None =
None
89 ) -> ConfigFlowResult:
90 """Handle optional iCal (.ics) import."""
92 if user_input
is not None:
94 await self.hass.async_add_executor_job(
95 save_uploaded_ics_file,
97 user_input[CONF_ICS_FILE],
98 self.
datadata[CONF_STORAGE_KEY],
100 except HomeAssistantError
as err:
101 _LOGGER.debug(
"Error saving uploaded file: %s", err)
102 errors[CONF_ICS_FILE] =
"invalid_ics_file"
105 title=self.
datadata[CONF_CALENDAR_NAME], data=self.
datadata
109 step_id=
"import_ics_file",
110 data_schema=STEP_IMPORT_DATA_SCHEMA,
116 hass: HomeAssistant, uploaded_file_id: str, storage_key: str
118 """Validate the uploaded file and move it to the storage directory."""
121 ics = file.read_text(encoding=
"utf8")
123 CalendarStream.from_ics(ics)
124 except CalendarParseError
as err:
126 dest_path = Path(hass.config.path(STORAGE_PATH.format(key=storage_key)))
127 shutil.move(file, dest_path)
ConfigFlowResult async_step_user(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_step_import_ics_file(self, dict[str, Any]|None user_input=None)
ConfigFlowResult async_create_entry(self, *str title, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None, Mapping[str, Any]|None options=None)
None _async_abort_entries_match(self, dict[str, Any]|None match_dict=None)
ConfigFlowResult async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_show_form(self, *str|None step_id=None, vol.Schema|None data_schema=None, dict[str, str]|None errors=None, Mapping[str, str]|None description_placeholders=None, bool|None last_step=None, str|None preview=None)
_FlowResultT async_create_entry(self, *str|None title=None, Mapping[str, Any] data, str|None description=None, Mapping[str, str]|None description_placeholders=None)
Iterator[Path] process_uploaded_file(HomeAssistant hass, str file_id)
def save_uploaded_ics_file(HomeAssistant hass, str uploaded_file_id, str storage_key)