1 """Component for monitoring activity on a folder."""
3 from __future__
import annotations
7 from typing
import cast
9 from watchdog.events
import (
17 PatternMatchingEventHandler,
19 from watchdog.observers
import Observer
27 from .const
import CONF_FOLDER, CONF_PATTERNS, DOMAIN, PLATFORMS
29 _LOGGER = logging.getLogger(__name__)
33 """Set up Folder watcher from a config entry."""
35 path: str = entry.options[CONF_FOLDER]
36 patterns: list[str] = entry.options[CONF_PATTERNS]
37 if not hass.config.is_allowed_path(path):
38 _LOGGER.error(
"Folder %s is not valid or allowed", path)
42 f
"setup_not_allowed_path_{path}",
45 severity=IssueSeverity.ERROR,
46 translation_key=
"setup_not_allowed_path",
47 translation_placeholders={
49 "config_variable":
"allowlist_external_dirs",
51 learn_more_url=
"https://www.home-assistant.io/docs/configuration/basic/#allowlist_external_dirs",
54 await hass.async_add_executor_job(Watcher, path, patterns, hass, entry.entry_id)
55 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
60 patterns: list[str], hass: HomeAssistant, entry_id: str
62 """Return the Watchdog EventHandler object."""
67 """Class for handling Watcher events."""
69 def __init__(self, patterns: list[str], hass: HomeAssistant, entry_id: str) ->
None:
70 """Initialise the EventHandler."""
75 def process(self, event: FileSystemEvent, moved: bool =
False) ->
None:
76 """On Watcher event, fire HA event."""
77 _LOGGER.debug(
"process(%s)", event)
78 if not event.is_directory:
79 folder, file_name = os.path.split(event.src_path)
81 "event_type": event.event_type,
82 "path": event.src_path,
89 event = cast(FileSystemMovedEvent, event)
90 dest_folder, dest_file_name = os.path.split(event.dest_path)
92 "dest_path": event.dest_path,
93 "dest_file": dest_file_name,
94 "dest_folder": dest_folder,
96 fireable.update(_extra)
97 self.
hasshass.bus.fire(
101 signal = f
"folder_watcher-{self.entry_id}"
110 self.
processprocess(event, moved=
True)
126 """Class for starting Watchdog."""
129 self, path: str, patterns: list[str], hass: HomeAssistant, entry_id: str
131 """Initialise the watchdog observer."""
136 if not hass.is_running:
137 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, self.
startupstartup)
140 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, self.
shutdownshutdown)
142 def startup(self, event: Event |
None) ->
None:
143 """Start the watcher."""
147 """Shutdown the watcher."""
None __init__(self, list[str] patterns, HomeAssistant hass, str entry_id)
None on_moved(self, FileMovedEvent event)
None process(self, FileSystemEvent event, bool moved=False)
None on_deleted(self, FileDeletedEvent event)
None on_modified(self, FileModifiedEvent event)
None on_created(self, FileCreatedEvent event)
None on_closed(self, FileClosedEvent event)
None shutdown(self, Event|None event)
None __init__(self, str path, list[str] patterns, HomeAssistant hass, str entry_id)
None startup(self, Event|None event)
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
EventHandler create_event_handler(list[str] patterns, HomeAssistant hass, str entry_id)
None async_create_issue(HomeAssistant hass, str entry_id)
None dispatcher_send(HomeAssistant hass, str signal, *Any args)