Home Assistant Unofficial Reference 2024.12.1
manager.py
Go to the documentation of this file.
1 """Backup manager for the Backup integration."""
2 
3 from __future__ import annotations
4 
5 import abc
6 import asyncio
7 from dataclasses import asdict, dataclass
8 import hashlib
9 import io
10 import json
11 from pathlib import Path
12 from queue import SimpleQueue
13 import shutil
14 import tarfile
15 from tarfile import TarError
16 from tempfile import TemporaryDirectory
17 import time
18 from typing import Any, Protocol, cast
19 
20 import aiohttp
21 from securetar import SecureTarFile, atomic_contents_add
22 
23 from homeassistant.backup_restore import RESTORE_BACKUP_FILE
24 from homeassistant.const import __version__ as HAVERSION
25 from homeassistant.core import HomeAssistant, callback
26 from homeassistant.exceptions import HomeAssistantError
27 from homeassistant.helpers import integration_platform
28 from homeassistant.helpers.json import json_bytes
29 from homeassistant.util import dt as dt_util
30 from homeassistant.util.json import json_loads_object
31 
32 from .const import DOMAIN, EXCLUDE_FROM_BACKUP, LOGGER
33 
34 BUF_SIZE = 2**20 * 4 # 4MB
35 
36 
37 @dataclass(slots=True)
38 class Backup:
39  """Backup class."""
40 
41  slug: str
42  name: str
43  date: str
44  path: Path
45  size: float
46 
47  def as_dict(self) -> dict:
48  """Return a dict representation of this backup."""
49  return {**asdict(self), "path": self.path.as_posix()}
50 
51 
52 class BackupPlatformProtocol(Protocol):
53  """Define the format that backup platforms can have."""
54 
55  async def async_pre_backup(self, hass: HomeAssistant) -> None:
56  """Perform operations before a backup starts."""
57 
58  async def async_post_backup(self, hass: HomeAssistant) -> None:
59  """Perform operations after a backup finishes."""
60 
61 
62 class BaseBackupManager(abc.ABC):
63  """Define the format that backup managers can have."""
64 
65  def __init__(self, hass: HomeAssistant) -> None:
66  """Initialize the backup manager."""
67  self.hasshass = hass
68  self.backing_upbacking_up = False
69  self.backups: dict[str, Backup] = {}
70  self.loaded_platformsloaded_platforms = False
71  self.platforms: dict[str, BackupPlatformProtocol] = {}
72 
73  @callback
75  self,
76  hass: HomeAssistant,
77  integration_domain: str,
78  platform: BackupPlatformProtocol,
79  ) -> None:
80  """Add a platform to the backup manager."""
81  if not hasattr(platform, "async_pre_backup") or not hasattr(
82  platform, "async_post_backup"
83  ):
84  LOGGER.warning(
85  "%s does not implement required functions for the backup platform",
86  integration_domain,
87  )
88  return
89  self.platforms[integration_domain] = platform
90 
91  async def async_pre_backup_actions(self, **kwargs: Any) -> None:
92  """Perform pre backup actions."""
93  if not self.loaded_platformsloaded_platforms:
94  await self.load_platformsload_platforms()
95 
96  pre_backup_results = await asyncio.gather(
97  *(
98  platform.async_pre_backup(self.hasshass)
99  for platform in self.platforms.values()
100  ),
101  return_exceptions=True,
102  )
103  for result in pre_backup_results:
104  if isinstance(result, Exception):
105  raise result
106 
107  async def async_post_backup_actions(self, **kwargs: Any) -> None:
108  """Perform post backup actions."""
109  if not self.loaded_platformsloaded_platforms:
110  await self.load_platformsload_platforms()
111 
112  post_backup_results = await asyncio.gather(
113  *(
114  platform.async_post_backup(self.hasshass)
115  for platform in self.platforms.values()
116  ),
117  return_exceptions=True,
118  )
119  for result in post_backup_results:
120  if isinstance(result, Exception):
121  raise result
122 
123  async def load_platforms(self) -> None:
124  """Load backup platforms."""
125  await integration_platform.async_process_integration_platforms(
126  self.hasshass, DOMAIN, self._add_platform_add_platform, wait_for_platforms=True
127  )
128  LOGGER.debug("Loaded %s platforms", len(self.platforms))
129  self.loaded_platformsloaded_platforms = True
130 
131  @abc.abstractmethod
132  async def async_restore_backup(self, slug: str, **kwargs: Any) -> None:
133  """Restore a backup."""
134 
135  @abc.abstractmethod
136  async def async_create_backup(self, **kwargs: Any) -> Backup:
137  """Generate a backup."""
138 
139  @abc.abstractmethod
140  async def async_get_backups(self, **kwargs: Any) -> dict[str, Backup]:
141  """Get backups.
142 
143  Return a dictionary of Backup instances keyed by their slug.
144  """
145 
146  @abc.abstractmethod
147  async def async_get_backup(self, *, slug: str, **kwargs: Any) -> Backup | None:
148  """Get a backup."""
149 
150  @abc.abstractmethod
151  async def async_remove_backup(self, *, slug: str, **kwargs: Any) -> None:
152  """Remove a backup."""
153 
154  @abc.abstractmethod
156  self,
157  *,
158  contents: aiohttp.BodyPartReader,
159  **kwargs: Any,
160  ) -> None:
161  """Receive and store a backup file from upload."""
162 
163 
165  """Backup manager for the Backup integration."""
166 
167  def __init__(self, hass: HomeAssistant) -> None:
168  """Initialize the backup manager."""
169  super().__init__(hass=hass)
170  self.backup_dirbackup_dir = Path(hass.config.path("backups"))
171  self.loaded_backupsloaded_backups = False
172 
173  async def load_backups(self) -> None:
174  """Load data of stored backup files."""
175  backups = await self.hasshass.async_add_executor_job(self._read_backups_read_backups)
176  LOGGER.debug("Loaded %s backups", len(backups))
177  self.backupsbackups = backups
178  self.loaded_backupsloaded_backups = True
179 
180  def _read_backups(self) -> dict[str, Backup]:
181  """Read backups from disk."""
182  backups: dict[str, Backup] = {}
183  for backup_path in self.backup_dirbackup_dir.glob("*.tar"):
184  try:
185  with tarfile.open(backup_path, "r:", bufsize=BUF_SIZE) as backup_file:
186  if data_file := backup_file.extractfile("./backup.json"):
187  data = json_loads_object(data_file.read())
188  backup = Backup(
189  slug=cast(str, data["slug"]),
190  name=cast(str, data["name"]),
191  date=cast(str, data["date"]),
192  path=backup_path,
193  size=round(backup_path.stat().st_size / 1_048_576, 2),
194  )
195  backups[backup.slug] = backup
196  except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
197  LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
198  return backups
199 
200  async def async_get_backups(self, **kwargs: Any) -> dict[str, Backup]:
201  """Return backups."""
202  if not self.loaded_backupsloaded_backups:
203  await self.load_backupsload_backups()
204 
205  return self.backupsbackups
206 
207  async def async_get_backup(self, *, slug: str, **kwargs: Any) -> Backup | None:
208  """Return a backup."""
209  if not self.loaded_backupsloaded_backups:
210  await self.load_backupsload_backups()
211 
212  if not (backup := self.backupsbackups.get(slug)):
213  return None
214 
215  if not backup.path.exists():
216  LOGGER.debug(
217  (
218  "Removing tracked backup (%s) that does not exists on the expected"
219  " path %s"
220  ),
221  backup.slug,
222  backup.path,
223  )
224  self.backupsbackups.pop(slug)
225  return None
226 
227  return backup
228 
229  async def async_remove_backup(self, *, slug: str, **kwargs: Any) -> None:
230  """Remove a backup."""
231  if (backup := await self.async_get_backupasync_get_backupasync_get_backup(slug=slug)) is None:
232  return
233 
234  await self.hasshass.async_add_executor_job(backup.path.unlink, True)
235  LOGGER.debug("Removed backup located at %s", backup.path)
236  self.backupsbackups.pop(slug)
237 
239  self,
240  *,
241  contents: aiohttp.BodyPartReader,
242  **kwargs: Any,
243  ) -> None:
244  """Receive and store a backup file from upload."""
245  queue: SimpleQueue[tuple[bytes, asyncio.Future[None] | None] | None] = (
246  SimpleQueue()
247  )
248  temp_dir_handler = await self.hasshass.async_add_executor_job(TemporaryDirectory)
249  target_temp_file = Path(
250  temp_dir_handler.name, contents.filename or "backup.tar"
251  )
252 
253  def _sync_queue_consumer() -> None:
254  with target_temp_file.open("wb") as file_handle:
255  while True:
256  if (_chunk_future := queue.get()) is None:
257  break
258  _chunk, _future = _chunk_future
259  if _future is not None:
260  self.hasshass.loop.call_soon_threadsafe(_future.set_result, None)
261  file_handle.write(_chunk)
262 
263  fut: asyncio.Future[None] | None = None
264  try:
265  fut = self.hasshass.async_add_executor_job(_sync_queue_consumer)
266  megabytes_sending = 0
267  while chunk := await contents.read_chunk(BUF_SIZE):
268  megabytes_sending += 1
269  if megabytes_sending % 5 != 0:
270  queue.put_nowait((chunk, None))
271  continue
272 
273  chunk_future = self.hasshass.loop.create_future()
274  queue.put_nowait((chunk, chunk_future))
275  await asyncio.wait(
276  (fut, chunk_future),
277  return_when=asyncio.FIRST_COMPLETED,
278  )
279  if fut.done():
280  # The executor job failed
281  break
282 
283  queue.put_nowait(None) # terminate queue consumer
284  finally:
285  if fut is not None:
286  await fut
287 
288  def _move_and_cleanup() -> None:
289  shutil.move(target_temp_file, self.backup_dirbackup_dir / target_temp_file.name)
290  temp_dir_handler.cleanup()
291 
292  await self.hasshass.async_add_executor_job(_move_and_cleanup)
293  await self.load_backupsload_backups()
294 
295  async def async_create_backup(self, **kwargs: Any) -> Backup:
296  """Generate a backup."""
297  if self.backing_upbacking_upbacking_up:
298  raise HomeAssistantError("Backup already in progress")
299 
300  try:
301  self.backing_upbacking_upbacking_up = True
302  await self.async_pre_backup_actionsasync_pre_backup_actions()
303  backup_name = f"Core {HAVERSION}"
304  date_str = dt_util.now().isoformat()
305  slug = _generate_slug(date_str, backup_name)
306 
307  backup_data = {
308  "slug": slug,
309  "name": backup_name,
310  "date": date_str,
311  "type": "partial",
312  "folders": ["homeassistant"],
313  "homeassistant": {"version": HAVERSION},
314  "compressed": True,
315  }
316  tar_file_path = Path(self.backup_dirbackup_dir, f"{backup_data['slug']}.tar")
317  size_in_bytes = await self.hasshass.async_add_executor_job(
318  self._mkdir_and_generate_backup_contents_mkdir_and_generate_backup_contents,
319  tar_file_path,
320  backup_data,
321  )
322  backup = Backup(
323  slug=slug,
324  name=backup_name,
325  date=date_str,
326  path=tar_file_path,
327  size=round(size_in_bytes / 1_048_576, 2),
328  )
329  if self.loaded_backupsloaded_backups:
330  self.backupsbackups[slug] = backup
331  LOGGER.debug("Generated new backup with slug %s", slug)
332  return backup
333  finally:
334  self.backing_upbacking_upbacking_up = False
335  await self.async_post_backup_actionsasync_post_backup_actions()
336 
338  self,
339  tar_file_path: Path,
340  backup_data: dict[str, Any],
341  ) -> int:
342  """Generate backup contents and return the size."""
343  if not self.backup_dirbackup_dir.exists():
344  LOGGER.debug("Creating backup directory")
345  self.backup_dirbackup_dir.mkdir()
346 
347  outer_secure_tarfile = SecureTarFile(
348  tar_file_path, "w", gzip=False, bufsize=BUF_SIZE
349  )
350  with outer_secure_tarfile as outer_secure_tarfile_tarfile:
351  raw_bytes = json_bytes(backup_data)
352  fileobj = io.BytesIO(raw_bytes)
353  tar_info = tarfile.TarInfo(name="./backup.json")
354  tar_info.size = len(raw_bytes)
355  tar_info.mtime = int(time.time())
356  outer_secure_tarfile_tarfile.addfile(tar_info, fileobj=fileobj)
357  with outer_secure_tarfile.create_inner_tar(
358  "./homeassistant.tar.gz", gzip=True
359  ) as core_tar:
360  atomic_contents_add(
361  tar_file=core_tar,
362  origin_path=Path(self.hasshass.config.path()),
363  excludes=EXCLUDE_FROM_BACKUP,
364  arcname="data",
365  )
366 
367  return tar_file_path.stat().st_size
368 
369  async def async_restore_backup(self, slug: str, **kwargs: Any) -> None:
370  """Restore a backup.
371 
372  This will write the restore information to .HA_RESTORE which
373  will be handled during startup by the restore_backup module.
374  """
375  if (backup := await self.async_get_backupasync_get_backupasync_get_backup(slug=slug)) is None:
376  raise HomeAssistantError(f"Backup {slug} not found")
377 
378  def _write_restore_file() -> None:
379  """Write the restore file."""
380  Path(self.hasshass.config.path(RESTORE_BACKUP_FILE)).write_text(
381  json.dumps({"path": backup.path.as_posix()}),
382  encoding="utf-8",
383  )
384 
385  await self.hasshass.async_add_executor_job(_write_restore_file)
386  await self.hasshass.services.async_call("homeassistant", "restart", {})
387 
388 
389 def _generate_slug(date: str, name: str) -> str:
390  """Generate a backup slug."""
391  return hashlib.sha1(f"{date} - {name}".lower().encode()).hexdigest()[:8]
int _mkdir_and_generate_backup_contents(self, Path tar_file_path, dict[str, Any] backup_data)
Definition: manager.py:341
Backup async_create_backup(self, **Any kwargs)
Definition: manager.py:295
None async_restore_backup(self, str slug, **Any kwargs)
Definition: manager.py:369
None async_receive_backup(self, *aiohttp.BodyPartReader contents, **Any kwargs)
Definition: manager.py:243
Backup|None async_get_backup(self, *str slug, **Any kwargs)
Definition: manager.py:207
dict[str, Backup] async_get_backups(self, **Any kwargs)
Definition: manager.py:200
None __init__(self, HomeAssistant hass)
Definition: manager.py:167
None async_remove_backup(self, *str slug, **Any kwargs)
Definition: manager.py:229
None _add_platform(self, HomeAssistant hass, str integration_domain, BackupPlatformProtocol platform)
Definition: manager.py:79
Backup|None async_get_backup(self, *str slug, **Any kwargs)
Definition: manager.py:147
None async_remove_backup(self, *str slug, **Any kwargs)
Definition: manager.py:151
None async_restore_backup(self, str slug, **Any kwargs)
Definition: manager.py:132
None async_receive_backup(self, *aiohttp.BodyPartReader contents, **Any kwargs)
Definition: manager.py:160
dict[str, Backup] async_get_backups(self, **Any kwargs)
Definition: manager.py:140
str _generate_slug(str date, str name)
Definition: manager.py:389
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
JsonObjectType json_loads_object(bytes|bytearray|memoryview|str obj)
Definition: json.py:54