1 """Data update coordinator for RSS/Atom feeds."""
3 from __future__
import annotations
5 from calendar
import timegm
6 from datetime
import datetime
8 from logging
import getLogger
9 from time
import gmtime, struct_time
10 from typing
import TYPE_CHECKING
11 from urllib.error
import URLError
21 from .const
import DEFAULT_SCAN_INTERVAL, DOMAIN, EVENT_FEEDREADER
27 _LOGGER = getLogger(__name__)
31 DataUpdateCoordinator[list[feedparser.FeedParserDict] |
None]
33 """Abstraction over Feedparser module."""
35 config_entry: ConfigEntry
44 """Initialize the FeedManager object, poll as per scan interval."""
48 name=f
"{DOMAIN} {url}",
49 update_interval=DEFAULT_SCAN_INTERVAL,
58 self.
_feed_feed: feedparser.FeedParserDict |
None =
None
63 """Send no entries log at debug level."""
64 _LOGGER.debug(
"No new entries to be published in feed %s", self.
urlurl)
67 """Fetch the feed data."""
68 _LOGGER.debug(
"Fetching new data from feed %s", self.
urlurl)
70 def _parse_feed() -> feedparser.FeedParserDict:
71 return feedparser.parse(
73 etag=
None if not self.
_feed_feed
else self.
_feed_feed.
get(
"etag"),
74 modified=
None if not self.
_feed_feed
else self.
_feed_feed.
get(
"modified"),
77 feed = await self.
hasshass.async_add_executor_job(_parse_feed)
80 raise UpdateFailed(f
"Error fetching feed data from {self.url}")
89 if isinstance(feed.bozo_exception, URLError):
91 f
"Error fetching feed data from {self.url} : {feed.bozo_exception}"
96 "Possible issue parsing feed %s: %s",
103 """Set up the feed manager."""
105 self.
loggerlogger.debug(
"Feed data fetched from %s : %s", self.
urlurl, feed[
"feed"])
106 if feed_author := feed[
"feed"].
get(
"author"):
108 self.
feed_versionfeed_version = feedparser.api.SUPPORTED_VERSIONS.get(feed[
"version"])
112 """Update the feed and publish new entries to the event bus."""
113 assert self.
_feed_feed
is not None
122 "%s entri(es) available in feed %s",
123 len(self.
_feed_feed.entries),
126 if not self.
_feed_feed.entries:
131 assert isinstance(self.
_feed_feed.entries, list)
136 _LOGGER.debug(
"Fetch from feed %s completed", self.
urlurl)
141 return self.
_feed_feed.entries
145 """Filter the entries provided and return the ones to keep."""
146 assert self.
_feed_feed
is not None
149 "Processing only the first %s entries in feed %s",
157 """Update last_entry_timestamp and fire entry."""
160 if time_stamp := entry.get(
"updated_parsed")
or entry.get(
"published_parsed"):
164 "No updated_parsed or published_parsed info available for entry %s",
167 entry[
"feed_url"] = self.
urlurl
169 _LOGGER.debug(
"New event fired for entry %s", entry.get(
"link"))
173 """Publish new entries to the event bus."""
174 assert self.
_feed_feed
is not None
184 for entry
in self.
_feed_feed.entries:
187 time_stamp := entry.get(
"updated_parsed")
188 or entry.get(
"published_parsed")
190 and time_stamp > last_entry_timestamp
195 _LOGGER.debug(
"Already processed entry %s", entry.get(
"link"))
196 if new_entry_count == 0:
199 _LOGGER.debug(
"%d entries published in feed %s", new_entry_count, self.
urlurl)
203 """Represent a data storage."""
206 """Initialize data storage."""
207 self.
_data_data: dict[str, struct_time] = {}
209 self._store: Store[dict[str, str]] =
Store(hass, STORAGE_VERSION, DOMAIN)
213 """Set up storage."""
214 if (store_data := await self._store.
async_load())
is not None:
217 feed_id: gmtime(datetime.fromisoformat(timestamp_string).timestamp())
218 for feed_id, timestamp_string
in store_data.items()
223 """Return stored timestamp for given feed id."""
228 """Update timestamp for given feed id."""
229 self.
_data_data[feed_id] = timestamp
234 """Save feed data to storage."""
236 feed_id: dt_util.utc_from_timestamp(timegm(struct_utc)).isoformat()
237 for feed_id, struct_utc
in self.
_data_data.items()
None _publish_new_entries(self)
None _filter_entries(self)
feedparser.FeedParserDict _async_fetch_feed(self)
None _update_and_fire_entry(self, feedparser.FeedParserDict entry)
None _log_no_entries(self)
None __init__(self, HomeAssistant hass, str url, int max_entries, StoredData storage)
list[feedparser.FeedParserDict]|None _async_update_data(self)
None __init__(self, HomeAssistant hass)
None async_put_timestamp(self, str feed_id, struct_time timestamp)
dict[str, str] _async_save_data(self)
struct_time|None get_timestamp(self, str feed_id)
web.Response get(self, web.Request request, str config_key)
None async_load(HomeAssistant hass)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)