#!/usr/bin/env python3 """ Flask dashboard for CM4 eMMC provisioning. Monitors deployment status, shows device connection steps, backup/restore. Supports USB boot mode (ask Backup/Deploy) and network-booted devices (register, then Backup/Deploy). """ import json import os import re import shutil import subprocess import time import urllib.request from pathlib import Path from flask import Flask, render_template, jsonify, request, send_file, Response app = Flask(__name__) @app.after_request def no_cache(response): """Prevent browser from caching the dashboard so deploys are visible immediately.""" if request.path == "/" or request.path.startswith("/api/"): response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response.headers["Pragma"] = "no-cache" return response BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning")) STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json")) LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log")) ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request")) DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source")) BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups"))) GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img"))) NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json"))) BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json"))) BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR / "build_cloudinit_request.json"))) SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json"))) SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json"))) CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json"))) # Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition) DEFAULT_USER_DATA = """#cloud-config package_update: true package_upgrade: false packages: - curl runcmd: - curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh - chmod +x /tmp/bootstrap.sh - /tmp/bootstrap.sh """ DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001 local-hostname: reterminal """ DEFAULT_NETWORK_CONFIG = """version: 2 ethernets: eth0: dhcp4: true """ DEFAULT_STATUS = { "phase": "idle", "message": "Waiting for reTerminal in boot mode or network.", "progress": None, "updated": None, } def read_status(): try: with open(STATUS_FILE, "r") as f: data = json.load(f) out = {**DEFAULT_STATUS, **data} if out.get("phase") == "waiting_choice": try: with open(DEVICE_SOURCE_FILE, "r") as sf: out["device_source"] = (sf.read() or "").strip() or "usb" except (FileNotFoundError, OSError): out["device_source"] = "usb" return out except (FileNotFoundError, json.JSONDecodeError): return DEFAULT_STATUS def read_log_tail(lines=50): try: with open(LOG_FILE, "r") as f: all_lines = f.readlines() return "".join(all_lines[-lines:]).strip() if all_lines else "" except (FileNotFoundError, PermissionError): return "" def _load_network_devices(): try: if NETWORK_DEVICES_FILE.is_file(): with open(NETWORK_DEVICES_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"devices": []} def _save_network_devices(data): try: os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True) with open(NETWORK_DEVICES_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json" def _load_backups_meta(): try: if BACKUPS_META_FILE.is_file(): with open(BACKUPS_META_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {} def _save_backups_meta(data): try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) with open(BACKUPS_META_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False def _safe_backup_name(name): """Reject path traversal and ensure it's a backup filename we manage.""" if not name or ".." in name or "/" in name or "\\" in name: return False if not name.endswith((".img", ".img.gz", ".img.xz")): return False return True def list_backups(): if not BACKUPS_DIR.is_dir(): return [] meta = _load_backups_meta() out = [] for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")): try: st = p.stat() m = meta.get(p.name, {}) out.append({ "name": p.name, "display_name": m.get("name") or p.name, "description": m.get("description") or "", "size": st.st_size, "mtime": st.st_mtime, }) except OSError: pass return out @app.route("/") def index(): return render_template("index.html") @app.route("/api/status") def api_status(): return jsonify(read_status()) @app.route("/api/status-clear", methods=["POST"]) def api_status_clear(): """Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again).""" try: with open(STATUS_FILE, "w") as f: json.dump({ "phase": "idle", "message": DEFAULT_STATUS["message"], "progress": None, "updated": None, }, f) return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write status"}), 500 @app.route("/api/log") def api_log(): return jsonify({"log": read_log_tail()}) @app.route("/api/pending-devices") def api_pending_devices(): """Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy.""" st = read_status() usb = None if st.get("phase") == "waiting_choice": usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")} data = _load_network_devices() network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")] return jsonify({"usb": usb, "network": network}) @app.route("/api/device-action", methods=["POST"]) def api_device_action(): """User chose Backup or Deploy for a device. source=usb | network; for network pass mac=.""" body = request.get_json(force=True, silent=True) or {} source = (body.get("source") or "").strip().lower() action = (body.get("action") or "").strip().lower() if action not in ("backup", "deploy"): return jsonify({"ok": False, "error": "action must be 'backup' or 'deploy'"}), 400 if source == "usb": try: os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True) # If user requested "shrink after backup", create flag so host runs PiShrink after dd if action == "backup" and body.get("shrink"): try: (BASE_DIR / "shrink_next_backup").write_text("1") except (PermissionError, OSError): pass # host may still have SHRINK_BACKUP=1 with open(ACTION_REQUEST_FILE, "w") as f: f.write(action) return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write action file"}), 500 if source == "network": mac = (body.get("mac") or "").strip() if not mac: return jsonify({"ok": False, "error": "mac required for network device"}), 400 data = _load_network_devices() for d in data.get("devices", []): if (d.get("mac") or "").lower() == mac.lower(): d["action"] = action d["action_at"] = time.time() _save_network_devices(data) return jsonify({"ok": True}) return jsonify({"ok": False, "error": "Device not found"}), 404 return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400 @app.route("/api/register-device", methods=["POST"]) def api_register_device(): """Called by a network-booted device to register (mac, ip).""" body = request.get_json(force=True, silent=True) or request.form mac = (body.get("mac") or "").strip() ip = (body.get("ip") or request.remote_addr or "").strip() if not mac: return jsonify({"ok": False, "error": "mac required"}), 400 data = _load_network_devices() devices = data.get("devices", []) for d in devices: if (d.get("mac") or "").lower() == mac.lower(): d["ip"] = ip d["registered_at"] = time.time() d["action"] = d.get("action") or "wait" _save_network_devices(data) return jsonify({"ok": True, "message": "registered"}) devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"}) data["devices"] = devices _save_network_devices(data) return jsonify({"ok": True, "message": "registered"}) @app.route("/api/device-action-poll") def api_device_action_poll(): """Network device polls this to get its assigned action (deploy/backup) and URL.""" mac = (request.args.get("mac") or "").strip() if not mac: return jsonify({"action": "wait"}), 200 data = _load_network_devices() base = request.host_url.rstrip("/") for d in data.get("devices", []): if (d.get("mac") or "").lower() == mac.lower(): action = d.get("action") or "wait" if action == "deploy": return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"}) if action == "backup": return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"}) return jsonify({"action": "wait"}) return jsonify({"action": "wait"}) @app.route("/api/golden-image") def api_golden_image(): """Stream the golden image for network deploy (device pulls and writes to eMMC).""" if not GOLDEN_IMAGE.is_file(): return jsonify({"error": "Golden image not found"}), 404 return send_file( GOLDEN_IMAGE, mimetype="application/octet-stream", as_attachment=True, download_name="golden.img", ) @app.route("/api/backup-upload", methods=["POST"]) def api_backup_upload(): """Network device uploads its eMMC backup (raw body).""" mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20] if not mac: return jsonify({"error": "mac query param required"}), 400 BACKUPS_DIR.mkdir(parents=True, exist_ok=True) name = f"backup-net-{mac}-{int(time.time())}.img" path = BACKUPS_DIR / name try: with open(path, "wb") as f: while True: chunk = request.stream.read(1024 * 1024) if not chunk: break f.write(chunk) return jsonify({"ok": True, "file": name}) except (OSError, IOError) as e: if path.exists(): path.unlink(missing_ok=True) return jsonify({"error": str(e)}), 500 @app.route("/api/backups") def api_backups(): return jsonify({"backups": list_backups(), "backups_dir": str(BACKUPS_DIR)}) @app.route("/api/backups/upload", methods=["POST"]) def api_backups_upload(): """Upload an image file from the dashboard (multipart form).""" if "file" not in request.files and "image" not in request.files: return jsonify({"ok": False, "error": "no file in request (use field 'file' or 'image')"}), 400 f = request.files.get("file") or request.files.get("image") if not f or not f.filename: return jsonify({"ok": False, "error": "no file selected"}), 400 base = (f.filename.rsplit(".", 1)[0] if "." in f.filename else f.filename).strip() or "upload" safe_base = re.sub(r"[^\w\-.]", "_", base)[:80] if not safe_base: safe_base = "upload" ext = "" if f.filename.lower().endswith(".img.xz"): ext = ".img.xz" elif f.filename.lower().endswith(".img.gz"): ext = ".img.gz" elif f.filename.lower().endswith(".img"): ext = ".img" else: ext = ".img" name = f"{safe_base}-{int(time.time())}{ext}" if not _safe_backup_name(name): name = f"upload-{int(time.time())}.img" path = BACKUPS_DIR / name try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) f.save(str(path)) return jsonify({"ok": True, "name": name, "message": f"Uploaded {name}"}) except (OSError, IOError) as e: if path.exists(): path.unlink(missing_ok=True) return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups//set-as-golden", methods=["POST"]) def api_backup_set_as_golden(name): """Use this backup as the golden image (symlink when in backups dir to avoid copying).""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True) if GOLDEN_IMAGE.exists(): GOLDEN_IMAGE.unlink() # Symlink to the backup so we use the same file instead of duplicating path_resolved = path.resolve() try: path_resolved.relative_to(BACKUPS_DIR.resolve()) os.symlink(path_resolved, GOLDEN_IMAGE) except ValueError: # Backup not under BACKUPS_DIR (shouldn't happen for list); fall back to copy shutil.copy2(path, GOLDEN_IMAGE) return jsonify({"ok": True, "message": f"Golden image set from {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 def _request_host_shrink(name, action="shrink", format="xz"): """Write shrink request for host and poll shrink_status.json. Returns (ok, message_or_error).""" req = {"name": name, "action": action} if action == "compress": req["format"] = "gz" if format == "gz" else "xz" try: SHRINK_REQUEST_FILE.write_text(json.dumps(req)) except OSError as e: return False, str(e) deadline = time.monotonic() + 2100 # 35 min while time.monotonic() < deadline: time.sleep(5) if not SHRINK_STATUS_FILE.exists(): continue try: data = json.loads(SHRINK_STATUS_FILE.read_text()) except (OSError, ValueError): continue if data.get("name") != name: continue phase = data.get("phase") if phase == "done": return True, data.get("message") or f"Shrunk {name}" if phase == "error": return False, data.get("error") or "PiShrink failed" return False, "Shrink timed out (run on host may still be in progress)" @app.route("/api/backups//shrink", methods=["POST"]) def api_backup_shrink(name): """Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"): return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 ok, msg = _request_host_shrink(name, action="shrink") if ok: return jsonify({"ok": True, "message": msg}) return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500) @app.route("/api/backups//compress", methods=["POST"]) def api_backup_compress(name): """Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"): return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 body = request.get_json(force=True, silent=True) or {} fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower() if fmt not in ("gz", "gzip", "xz"): fmt = "xz" if fmt == "gzip": fmt = "gz" ok, msg = _request_host_shrink(name, action="compress", format=fmt) if ok: ext = ".xz" if fmt == "xz" else ".gz" return jsonify({"ok": True, "message": msg or f"Compressed to {name}{ext}"}) return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500) @app.route("/api/backups/", methods=["PATCH"]) def api_backup_update(name): """Update backup metadata (display name, description) or rename the file.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 body = request.get_json(force=True, silent=True) or {} meta = _load_backups_meta() entry = meta.get(name, {}) new_filename = (body.get("filename") or "").strip() if new_filename: if not _safe_backup_name(new_filename): return jsonify({"ok": False, "error": "invalid new filename"}), 400 new_path = BACKUPS_DIR / new_filename if new_path.exists() and new_path != path: return jsonify({"ok": False, "error": "target filename already exists"}), 409 try: path.rename(new_path) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""} if name in meta: del meta[name] name = new_filename path = BACKUPS_DIR / name else: if "name" in body: entry["name"] = (body.get("name") or "").strip() or path.name if "description" in body: entry["description"] = (body.get("description") or "").strip() meta[name] = entry if not _save_backups_meta(meta): return jsonify({"ok": False, "error": "could not save metadata"}), 500 return jsonify({"ok": True, "name": name}) @app.route("/api/backups/", methods=["DELETE"]) def api_backup_delete(name): """Delete a backup file. If it is the current golden image (symlink), the golden link is removed.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 try: if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink(): try: if (GOLDEN_IMAGE.resolve() == path.resolve()): GOLDEN_IMAGE.unlink() except OSError: pass path.unlink() meta = _load_backups_meta() meta.pop(name, None) _save_backups_meta(meta) return jsonify({"ok": True, "message": f"Deleted {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups/", methods=["GET"]) def api_backup_download(name): if not _safe_backup_name(name): return jsonify({"error": "invalid name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"error": "not found"}), 404 return send_file(path, as_attachment=True, download_name=name) def _build_status_read(): try: if BUILD_STATUS_FILE.is_file(): with open(BUILD_STATUS_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"phase": "idle", "message": "", "output_name": None, "error": None} def _build_status_write(phase, message, output_name=None, error=None): try: BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(BUILD_STATUS_FILE, "w") as f: json.dump({ "phase": phase, "message": message, "output_name": output_name, "error": error, "updated": time.time(), }, f, indent=2) except OSError: pass def _raspios_latest_url(variant="lite"): """Resolve latest Raspberry Pi OS (arm64) .img.xz URL. variant=lite|full.""" slug = "raspios_lite_arm64" if variant == "lite" else "raspios_full_arm64" base = f"https://downloads.raspberrypi.com/{slug}/images" headers = {"User-Agent": "Mozilla/5.0 (compatible; CM4-Provisioning/1.0)"} try: req = urllib.request.Request(base + "/", headers=headers) with urllib.request.urlopen(req, timeout=20) as r: html = r.read().decode("utf-8", errors="ignore") folders = re.findall(rf"{re.escape(slug)}-(\d{{4}}-\d{{2}}-\d{{2}})/", html) if not folders: return None latest = sorted(folders)[-1] folder_url = f"{base}/{slug}-{latest}/" req2 = urllib.request.Request(folder_url, headers=headers) with urllib.request.urlopen(req2, timeout=20) as r: folder_html = r.read().decode("utf-8", errors="ignore") m = re.search(r'href="([^"]+\.img\.xz)"', folder_html) if not m: return None href = m.group(1) if href.startswith("http://") or href.startswith("https://"): return href return folder_url.rstrip("/") + "/" + href.lstrip("/") except Exception: return None def _load_cloudinit_templates(): try: if CLOUDINIT_TEMPLATES_FILE.is_file(): with open(CLOUDINIT_TEMPLATES_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"templates": []} def _save_cloudinit_templates(data): try: CLOUDINIT_TEMPLATES_FILE.parent.mkdir(parents=True, exist_ok=True) with open(CLOUDINIT_TEMPLATES_FILE, "w") as f: json.dump(data, f, indent=2) return True except OSError: return False @app.route("/api/build-cloudinit-status") def api_build_cloudinit_status(): """Return current build status (phase, message, output_name, error).""" return jsonify(_build_status_read()) @app.route("/api/build-cloudinit", methods=["POST"]) def api_build_cloudinit(): """Start building a cloud-init image: write request file; host runs build (has loop devices).""" st = _build_status_read() if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"): return jsonify({"ok": False, "error": "A build is already in progress"}), 409 body = request.get_json(silent=True) or {} variant = (body.get("variant") or "lite").strip().lower() if variant not in ("lite", "full"): variant = "lite" _build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…") url = _raspios_latest_url(variant) if not url: _build_status_write("idle", "", error="Could not resolve latest Raspios URL") return jsonify({"ok": False, "error": "Could not resolve latest image URL"}), 503 user_data = body.get("user_data") or DEFAULT_USER_DATA meta_data = body.get("meta_data") or DEFAULT_META_DATA network_config = body.get("network_config") or DEFAULT_NETWORK_CONFIG set_as_golden_after = bool(body.get("set_as_golden_after")) try: BUILD_REQUEST_FILE.parent.mkdir(parents=True, exist_ok=True) with open(BUILD_REQUEST_FILE, "w") as f: json.dump({ "url": url, "variant": variant, "user_data": user_data, "meta_data": meta_data, "network_config": network_config, "set_as_golden_after": set_as_golden_after, }, f, indent=2) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 _build_status_write("resolving", "Build requested; host will run download and inject (check status).") return jsonify({"ok": True, "message": "Build started on host. Download and inject may take 15–45 min. Poll status or refresh the page."}), 202 @app.route("/api/raspios-latest-url") def api_raspios_latest_url(): """Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|full.""" variant = (request.args.get("variant") or "lite").strip().lower() if variant not in ("lite", "full"): variant = "lite" url = _raspios_latest_url(variant) if not url: return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503 return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1], "variant": variant}) @app.route("/api/cloudinit-templates", methods=["GET"]) def api_cloudinit_templates_list(): """List saved cloud-init templates.""" data = _load_cloudinit_templates() return jsonify({"templates": data.get("templates", [])}) @app.route("/api/cloudinit-templates", methods=["POST"]) def api_cloudinit_templates_create(): """Save a new cloud-init template.""" body = request.get_json(force=True, silent=True) or {} name = (body.get("name") or "").strip() if not name: return jsonify({"ok": False, "error": "name required"}), 400 data = _load_cloudinit_templates() templates = data.get("templates", []) tid = str(int(time.time() * 1000)) templates.append({ "id": tid, "name": name, "user_data": body.get("user_data", ""), "meta_data": body.get("meta_data", ""), "network_config": body.get("network_config", ""), }) data["templates"] = templates if not _save_cloudinit_templates(data): return jsonify({"ok": False, "error": "Failed to save"}), 500 return jsonify({"ok": True, "id": tid, "name": name}) @app.route("/api/cloudinit-templates/") def api_cloudinit_templates_get(tid): """Get one template by id.""" data = _load_cloudinit_templates() for t in data.get("templates", []): if t.get("id") == tid: return jsonify(t) return jsonify({"error": "not found"}), 404 @app.route("/api/cloudinit-templates/", methods=["DELETE"]) def api_cloudinit_templates_delete(tid): """Delete a template.""" data = _load_cloudinit_templates() templates = [t for t in data.get("templates", []) if t.get("id") != tid] if len(templates) == len(data.get("templates", [])): return jsonify({"ok": False, "error": "not found"}), 404 data["templates"] = templates if not _save_cloudinit_templates(data): return jsonify({"ok": False, "error": "Failed to save"}), 500 return jsonify({"ok": True}) @app.route("/api/golden-info") def api_golden_info(): """Return whether golden image exists and its size/mtime for UI.""" if not GOLDEN_IMAGE.is_file(): return jsonify({"present": False}) try: st = GOLDEN_IMAGE.stat() return jsonify({"present": True, "size": st.st_size, "mtime": st.st_mtime}) except OSError: return jsonify({"present": False}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)