1 """Backup manager for the Backup integration."""
3 from __future__
import annotations
7 from dataclasses
import asdict, dataclass
11 from pathlib
import Path
12 from queue
import SimpleQueue
15 from tarfile
import TarError
16 from tempfile
import TemporaryDirectory
18 from typing
import Any, Protocol, cast
21 from securetar
import SecureTarFile, atomic_contents_add
32 from .const
import DOMAIN, EXCLUDE_FROM_BACKUP, LOGGER
37 @dataclass(slots=True)
48 """Return a dict representation of this backup."""
49 return {**asdict(self),
"path": self.path.as_posix()}
53 """Define the format that backup platforms can have."""
56 """Perform operations before a backup starts."""
59 """Perform operations after a backup finishes."""
63 """Define the format that backup managers can have."""
65 def __init__(self, hass: HomeAssistant) ->
None:
66 """Initialize the backup manager."""
69 self.backups: dict[str, Backup] = {}
71 self.platforms: dict[str, BackupPlatformProtocol] = {}
77 integration_domain: str,
78 platform: BackupPlatformProtocol,
80 """Add a platform to the backup manager."""
81 if not hasattr(platform,
"async_pre_backup")
or not hasattr(
82 platform,
"async_post_backup"
85 "%s does not implement required functions for the backup platform",
89 self.platforms[integration_domain] = platform
92 """Perform pre backup actions."""
96 pre_backup_results = await asyncio.gather(
98 platform.async_pre_backup(self.
hasshass)
99 for platform
in self.platforms.values()
101 return_exceptions=
True,
103 for result
in pre_backup_results:
104 if isinstance(result, Exception):
108 """Perform post backup actions."""
112 post_backup_results = await asyncio.gather(
114 platform.async_post_backup(self.
hasshass)
115 for platform
in self.platforms.values()
117 return_exceptions=
True,
119 for result
in post_backup_results:
120 if isinstance(result, Exception):
124 """Load backup platforms."""
125 await integration_platform.async_process_integration_platforms(
128 LOGGER.debug(
"Loaded %s platforms", len(self.platforms))
133 """Restore a backup."""
137 """Generate a backup."""
143 Return a dictionary of Backup instances keyed by their slug.
152 """Remove a backup."""
158 contents: aiohttp.BodyPartReader,
161 """Receive and store a backup file from upload."""
165 """Backup manager for the Backup integration."""
168 """Initialize the backup manager."""
170 self.
backup_dirbackup_dir = Path(hass.config.path(
"backups"))
174 """Load data of stored backup files."""
175 backups = await self.
hasshass.async_add_executor_job(self.
_read_backups_read_backups)
176 LOGGER.debug(
"Loaded %s backups", len(backups))
181 """Read backups from disk."""
182 backups: dict[str, Backup] = {}
183 for backup_path
in self.
backup_dirbackup_dir.glob(
"*.tar"):
185 with tarfile.open(backup_path,
"r:", bufsize=BUF_SIZE)
as backup_file:
186 if data_file := backup_file.extractfile(
"./backup.json"):
189 slug=cast(str, data[
"slug"]),
190 name=cast(str, data[
"name"]),
191 date=cast(str, data[
"date"]),
193 size=round(backup_path.stat().st_size / 1_048_576, 2),
195 backups[backup.slug] = backup
196 except (OSError, TarError, json.JSONDecodeError, KeyError)
as err:
197 LOGGER.warning(
"Unable to read backup %s: %s", backup_path, err)
201 """Return backups."""
208 """Return a backup."""
212 if not (backup := self.
backupsbackups.
get(slug)):
215 if not backup.path.exists():
218 "Removing tracked backup (%s) that does not exists on the expected"
230 """Remove a backup."""
234 await self.
hasshass.async_add_executor_job(backup.path.unlink,
True)
235 LOGGER.debug(
"Removed backup located at %s", backup.path)
241 contents: aiohttp.BodyPartReader,
244 """Receive and store a backup file from upload."""
245 queue: SimpleQueue[tuple[bytes, asyncio.Future[
None] |
None] |
None] = (
248 temp_dir_handler = await self.
hasshass.async_add_executor_job(TemporaryDirectory)
249 target_temp_file = Path(
250 temp_dir_handler.name, contents.filename
or "backup.tar"
253 def _sync_queue_consumer() -> None:
254 with target_temp_file.open(
"wb")
as file_handle:
256 if (_chunk_future := queue.get())
is None:
258 _chunk, _future = _chunk_future
259 if _future
is not None:
260 self.
hasshass.loop.call_soon_threadsafe(_future.set_result,
None)
261 file_handle.write(_chunk)
263 fut: asyncio.Future[
None] |
None =
None
265 fut = self.
hasshass.async_add_executor_job(_sync_queue_consumer)
266 megabytes_sending = 0
267 while chunk := await contents.read_chunk(BUF_SIZE):
268 megabytes_sending += 1
269 if megabytes_sending % 5 != 0:
270 queue.put_nowait((chunk,
None))
273 chunk_future = self.
hasshass.loop.create_future()
274 queue.put_nowait((chunk, chunk_future))
277 return_when=asyncio.FIRST_COMPLETED,
283 queue.put_nowait(
None)
288 def _move_and_cleanup() -> None:
289 shutil.move(target_temp_file, self.
backup_dirbackup_dir / target_temp_file.name)
290 temp_dir_handler.cleanup()
292 await self.
hasshass.async_add_executor_job(_move_and_cleanup)
296 """Generate a backup."""
303 backup_name = f
"Core {HAVERSION}"
304 date_str = dt_util.now().isoformat()
312 "folders": [
"homeassistant"],
313 "homeassistant": {
"version": HAVERSION},
316 tar_file_path = Path(self.
backup_dirbackup_dir, f
"{backup_data['slug']}.tar")
317 size_in_bytes = await self.
hasshass.async_add_executor_job(
327 size=round(size_in_bytes / 1_048_576, 2),
330 self.
backupsbackups[slug] = backup
331 LOGGER.debug(
"Generated new backup with slug %s", slug)
340 backup_data: dict[str, Any],
342 """Generate backup contents and return the size."""
344 LOGGER.debug(
"Creating backup directory")
347 outer_secure_tarfile = SecureTarFile(
348 tar_file_path,
"w", gzip=
False, bufsize=BUF_SIZE
350 with outer_secure_tarfile
as outer_secure_tarfile_tarfile:
352 fileobj = io.BytesIO(raw_bytes)
353 tar_info = tarfile.TarInfo(name=
"./backup.json")
354 tar_info.size = len(raw_bytes)
355 tar_info.mtime =
int(time.time())
356 outer_secure_tarfile_tarfile.addfile(tar_info, fileobj=fileobj)
357 with outer_secure_tarfile.create_inner_tar(
358 "./homeassistant.tar.gz", gzip=
True
362 origin_path=Path(self.
hasshass.config.path()),
363 excludes=EXCLUDE_FROM_BACKUP,
367 return tar_file_path.stat().st_size
372 This will write the restore information to .HA_RESTORE which
373 will be handled during startup by the restore_backup module.
378 def _write_restore_file() -> None:
379 """Write the restore file."""
380 Path(self.
hasshass.config.path(RESTORE_BACKUP_FILE)).write_text(
381 json.dumps({
"path": backup.path.as_posix()}),
385 await self.
hasshass.async_add_executor_job(_write_restore_file)
386 await self.
hasshass.services.async_call(
"homeassistant",
"restart", {})
390 """Generate a backup slug."""
391 return hashlib.sha1(f
"{date} - {name}".lower().encode()).hexdigest()[:8]
int _mkdir_and_generate_backup_contents(self, Path tar_file_path, dict[str, Any] backup_data)
Backup async_create_backup(self, **Any kwargs)
None async_restore_backup(self, str slug, **Any kwargs)
None async_receive_backup(self, *aiohttp.BodyPartReader contents, **Any kwargs)
Backup|None async_get_backup(self, *str slug, **Any kwargs)
dict[str, Backup] async_get_backups(self, **Any kwargs)
None __init__(self, HomeAssistant hass)
None async_remove_backup(self, *str slug, **Any kwargs)
dict[str, Backup] _read_backups(self)
None _add_platform(self, HomeAssistant hass, str integration_domain, BackupPlatformProtocol platform)
Backup|None async_get_backup(self, *str slug, **Any kwargs)
Backup async_create_backup(self, **Any kwargs)
None async_pre_backup_actions(self, **Any kwargs)
None __init__(self, HomeAssistant hass)
None async_remove_backup(self, *str slug, **Any kwargs)
None load_platforms(self)
None async_restore_backup(self, str slug, **Any kwargs)
None async_post_backup_actions(self, **Any kwargs)
None async_receive_backup(self, *aiohttp.BodyPartReader contents, **Any kwargs)
dict[str, Backup] async_get_backups(self, **Any kwargs)
str _generate_slug(str date, str name)
web.Response get(self, web.Request request, str config_key)
JsonObjectType json_loads_object(bytes|bytearray|memoryview|str obj)