make meta.yml atomic and add a few things like schema_version, proper base and rsync.command
This commit is contained in:
@@ -3,8 +3,6 @@ from __future__ import annotations
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
from ..config.load import load_global_config, load_host_config
|
from ..config.load import load_global_config, load_host_config
|
||||||
from ..config.merge import build_effective_config
|
from ..config.merge import build_effective_config
|
||||||
from ..errors import ConfigError
|
from ..errors import ConfigError
|
||||||
@@ -20,7 +18,7 @@ from ..snapshot import (
|
|||||||
snapshot_dir_name,
|
snapshot_dir_name,
|
||||||
utc_now,
|
utc_now,
|
||||||
)
|
)
|
||||||
from ..util import ensure_dir, realpath_startswith, sanitize_host
|
from ..util import ensure_dir, realpath_startswith, sanitize_host, write_yaml_atomic
|
||||||
|
|
||||||
|
|
||||||
def _host_backup_dirs(backup_root: str, host: str) -> HostBackupDirs:
|
def _host_backup_dirs(backup_root: str, host: str) -> HostBackupDirs:
|
||||||
@@ -56,8 +54,21 @@ def select_scheduled_base(dirs: HostBackupDirs) -> Path | None:
|
|||||||
return _find_latest_snapshot(dirs.manual)
|
return _find_latest_snapshot(dirs.manual)
|
||||||
|
|
||||||
|
|
||||||
def write_meta(path: Path, data: dict[str, Any]) -> None:
|
def _base_meta_from_path(base_dir: Path | None) -> dict[str, Any] | None:
|
||||||
path.write_text(yaml.safe_dump(data, sort_keys=False), encoding="utf-8")
|
if base_dir is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
kind = base_dir.parent.name
|
||||||
|
if kind not in ("scheduled", "manual"):
|
||||||
|
# Should not happen with current selection logic, but keep meta robust.
|
||||||
|
kind = "unknown"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"kind": kind,
|
||||||
|
"dirname": base_dir.name,
|
||||||
|
"id": None,
|
||||||
|
"path": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
|
def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
|
||||||
@@ -158,31 +169,15 @@ def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
|
|||||||
incomplete_dir = dirs.incomplete / snap_name
|
incomplete_dir = dirs.incomplete / snap_name
|
||||||
data_dir = incomplete_dir / "data"
|
data_dir = incomplete_dir / "data"
|
||||||
meta_dir = incomplete_dir / "meta"
|
meta_dir = incomplete_dir / "meta"
|
||||||
|
|
||||||
ensure_dir(data_dir)
|
ensure_dir(data_dir)
|
||||||
ensure_dir(meta_dir)
|
ensure_dir(meta_dir)
|
||||||
|
|
||||||
meta_path = meta_dir / "meta.yaml"
|
meta_path = meta_dir / "meta.yaml"
|
||||||
log_path = meta_dir / "rsync.log"
|
log_path = meta_dir / "rsync.log"
|
||||||
|
|
||||||
meta: dict[str, Any] = {
|
# Pre-build command so we can record it in metadata.
|
||||||
"id": snap_id,
|
|
||||||
"host": host,
|
|
||||||
"type": "scheduled",
|
|
||||||
"label": None,
|
|
||||||
"status": "running",
|
|
||||||
"started_at": format_iso_z(ts),
|
|
||||||
"ended_at": None,
|
|
||||||
"base_snapshot": None,
|
|
||||||
"rsync": {"exit_code": None, "stats": {}},
|
|
||||||
"overrides": {"includes": [], "excludes": [], "base": None},
|
|
||||||
}
|
|
||||||
|
|
||||||
log_path.touch(exist_ok=True)
|
|
||||||
write_meta(meta_path, meta)
|
|
||||||
|
|
||||||
dest = str(data_dir) + "/"
|
dest = str(data_dir) + "/"
|
||||||
|
|
||||||
cmd = build_rsync_command(
|
cmd = build_rsync_command(
|
||||||
rsync_binary=str(rsync_binary),
|
rsync_binary=str(rsync_binary),
|
||||||
rsync_args=list(rsync_args),
|
rsync_args=list(rsync_args),
|
||||||
@@ -197,18 +192,38 @@ def run_scheduled(prefix: Path, host: str, dry_run: bool) -> dict[str, Any]:
|
|||||||
extra_includes=list(includes),
|
extra_includes=list(includes),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
meta: dict[str, Any] = {
|
||||||
|
"schema_version": 1,
|
||||||
|
"id": snap_id,
|
||||||
|
"host": host,
|
||||||
|
"type": "scheduled",
|
||||||
|
"label": None,
|
||||||
|
"status": "running",
|
||||||
|
"started_at": format_iso_z(ts),
|
||||||
|
"ended_at": None,
|
||||||
|
"duration_seconds": None,
|
||||||
|
"base": _base_meta_from_path(base_dir),
|
||||||
|
"rsync": {"exit_code": None, "command": cmd, "stats": {}},
|
||||||
|
# Keep existing fields for future expansion / compatibility with current structure.
|
||||||
|
"overrides": {"includes": [], "excludes": [], "base": None},
|
||||||
|
}
|
||||||
|
|
||||||
|
log_path.touch(exist_ok=True)
|
||||||
|
write_yaml_atomic(meta_path, meta)
|
||||||
|
|
||||||
result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds)
|
result = run_rsync(cmd, log_path=log_path, timeout_seconds=timeout_seconds)
|
||||||
|
|
||||||
end_ts = utc_now()
|
end_ts = utc_now()
|
||||||
meta["ended_at"] = format_iso_z(end_ts)
|
meta["ended_at"] = format_iso_z(end_ts)
|
||||||
|
meta["duration_seconds"] = int((end_ts - ts).total_seconds())
|
||||||
meta["rsync"]["exit_code"] = result.exit_code
|
meta["rsync"]["exit_code"] = result.exit_code
|
||||||
meta["status"] = "success" if result.exit_code == 0 else "failed"
|
meta["status"] = "success" if result.exit_code == 0 else "failed"
|
||||||
write_meta(meta_path, meta)
|
write_yaml_atomic(meta_path, meta)
|
||||||
|
|
||||||
if not log_path.exists():
|
if not log_path.exists():
|
||||||
meta["status"] = "failed"
|
meta["status"] = "failed"
|
||||||
meta["rsync"]["exit_code"] = 99
|
meta["rsync"]["exit_code"] = 99
|
||||||
write_meta(meta_path, meta)
|
write_yaml_atomic(meta_path, meta)
|
||||||
return {
|
return {
|
||||||
"ok": False,
|
"ok": False,
|
||||||
"dry_run": False,
|
"dry_run": False,
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -25,7 +26,6 @@ def sanitize_host(host: str) -> str:
|
|||||||
return host
|
return host
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def ensure_dir(path: Path, mode: int = 0o750) -> None:
|
def ensure_dir(path: Path, mode: int = 0o750) -> None:
|
||||||
path.mkdir(parents=True, exist_ok=True)
|
path.mkdir(parents=True, exist_ok=True)
|
||||||
try:
|
try:
|
||||||
@@ -62,3 +62,60 @@ def to_json_safe(obj: Any) -> Any:
|
|||||||
return obj
|
return obj
|
||||||
return str(obj)
|
return str(obj)
|
||||||
|
|
||||||
|
|
||||||
|
def write_yaml_atomic(path: Path, data: Any) -> None:
|
||||||
|
"""
|
||||||
|
Write YAML to `path` atomically.
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
- Write to a temp file in the same directory
|
||||||
|
- fsync temp file
|
||||||
|
- os.replace(temp, path) (atomic on POSIX)
|
||||||
|
- fsync directory entry (best-effort)
|
||||||
|
|
||||||
|
This helps avoid partial/corrupt meta files on crashes.
|
||||||
|
"""
|
||||||
|
# Local import to keep module load light; PyYAML is already a project dependency.
|
||||||
|
import yaml # type: ignore[import-not-found]
|
||||||
|
|
||||||
|
parent = path.parent
|
||||||
|
parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
tmp_fd: int | None = None
|
||||||
|
tmp_path: Path | None = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with tempfile.NamedTemporaryFile(
|
||||||
|
mode="w",
|
||||||
|
encoding="utf-8",
|
||||||
|
dir=str(parent),
|
||||||
|
prefix=path.name + ".",
|
||||||
|
suffix=".tmp",
|
||||||
|
delete=False,
|
||||||
|
) as tf:
|
||||||
|
tmp_fd = tf.fileno()
|
||||||
|
tmp_path = Path(tf.name)
|
||||||
|
tf.write(yaml.safe_dump(data, sort_keys=False))
|
||||||
|
tf.flush()
|
||||||
|
os.fsync(tmp_fd)
|
||||||
|
|
||||||
|
os.replace(str(tmp_path), str(path))
|
||||||
|
|
||||||
|
# Best-effort directory fsync (helps durability across power loss on some FS)
|
||||||
|
try:
|
||||||
|
dir_fd = os.open(str(parent), os.O_DIRECTORY)
|
||||||
|
try:
|
||||||
|
os.fsync(dir_fd)
|
||||||
|
finally:
|
||||||
|
os.close(dir_fd)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# If anything failed before replace(), try to clean up temp file
|
||||||
|
if tmp_path is not None and tmp_path.exists():
|
||||||
|
try:
|
||||||
|
tmp_path.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user