#!/usr/bin/env python3 """ Flask dashboard for CM4 eMMC provisioning. Monitors deployment status, shows device connection steps, backup/restore. Supports USB boot mode (ask Backup/Deploy) and network-booted devices (register, then Backup/Deploy). """ import json import os import re import shutil import subprocess import tempfile import threading import time import urllib.request from pathlib import Path from flask import Flask, render_template, jsonify, request, send_file, Response app = Flask(__name__) @app.after_request def no_cache(response): """Prevent browser from caching the dashboard so deploys are visible immediately.""" if request.path == "/" or request.path.startswith("/api/"): response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response.headers["Pragma"] = "no-cache" return response BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning")) STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json")) LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log")) ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request")) DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source")) BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups"))) GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img"))) NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json"))) BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json"))) # Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition) DEFAULT_USER_DATA = """#cloud-config package_update: true package_upgrade: false packages: - curl runcmd: - curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh - chmod +x /tmp/bootstrap.sh - /tmp/bootstrap.sh """ DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001 local-hostname: reterminal """ DEFAULT_NETWORK_CONFIG = """version: 2 ethernets: eth0: dhcp4: true """ DEFAULT_STATUS = { "phase": "idle", "message": "Waiting for reTerminal in boot mode or network.", "progress": None, "updated": None, } def read_status(): try: with open(STATUS_FILE, "r") as f: data = json.load(f) out = {**DEFAULT_STATUS, **data} if out.get("phase") == "waiting_choice": try: with open(DEVICE_SOURCE_FILE, "r") as sf: out["device_source"] = (sf.read() or "").strip() or "usb" except (FileNotFoundError, OSError): out["device_source"] = "usb" return out except (FileNotFoundError, json.JSONDecodeError): return DEFAULT_STATUS def read_log_tail(lines=50): try: with open(LOG_FILE, "r") as f: all_lines = f.readlines() return "".join(all_lines[-lines:]).strip() if all_lines else "" except (FileNotFoundError, PermissionError): return "" def _load_network_devices(): try: if NETWORK_DEVICES_FILE.is_file(): with open(NETWORK_DEVICES_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"devices": []} def _save_network_devices(data): try: os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True) with open(NETWORK_DEVICES_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json" def _load_backups_meta(): try: if BACKUPS_META_FILE.is_file(): with open(BACKUPS_META_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {} def _save_backups_meta(data): try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) with open(BACKUPS_META_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False def _safe_backup_name(name): """Reject path traversal and ensure it's a backup filename we manage.""" if not name or ".." in name or "/" in name or "\\" in name: return False if not name.endswith((".img", ".img.gz", ".img.xz")): return False return True def list_backups(): if not BACKUPS_DIR.is_dir(): return [] meta = _load_backups_meta() out = [] for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")): try: st = p.stat() m = meta.get(p.name, {}) out.append({ "name": p.name, "display_name": m.get("name") or p.name, "description": m.get("description") or "", "size": st.st_size, "mtime": st.st_mtime, }) except OSError: pass return out @app.route("/") def index(): return render_template("index.html") @app.route("/api/status") def api_status(): return jsonify(read_status()) @app.route("/api/status-clear", methods=["POST"]) def api_status_clear(): """Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again).""" try: with open(STATUS_FILE, "w") as f: json.dump({ "phase": "idle", "message": DEFAULT_STATUS["message"], "progress": None, "updated": None, }, f) return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write status"}), 500 @app.route("/api/log") def api_log(): return jsonify({"log": read_log_tail()}) @app.route("/api/pending-devices") def api_pending_devices(): """Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy.""" st = read_status() usb = None if st.get("phase") == "waiting_choice": usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")} data = _load_network_devices() network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")] return jsonify({"usb": usb, "network": network}) @app.route("/api/device-action", methods=["POST"]) def api_device_action(): """User chose Backup or Deploy for a device. source=usb | network; for network pass mac=.""" body = request.get_json(force=True, silent=True) or {} source = (body.get("source") or "").strip().lower() action = (body.get("action") or "").strip().lower() if action not in ("backup", "deploy"): return jsonify({"ok": False, "error": "action must be 'backup' or 'deploy'"}), 400 if source == "usb": try: os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True) # If user requested "shrink after backup", create flag so host runs PiShrink after dd if action == "backup" and body.get("shrink"): try: (BASE_DIR / "shrink_next_backup").write_text("1") except (PermissionError, OSError): pass # host may still have SHRINK_BACKUP=1 with open(ACTION_REQUEST_FILE, "w") as f: f.write(action) return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write action file"}), 500 if source == "network": mac = (body.get("mac") or "").strip() if not mac: return jsonify({"ok": False, "error": "mac required for network device"}), 400 data = _load_network_devices() for d in data.get("devices", []): if (d.get("mac") or "").lower() == mac.lower(): d["action"] = action d["action_at"] = time.time() _save_network_devices(data) return jsonify({"ok": True}) return jsonify({"ok": False, "error": "Device not found"}), 404 return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400 @app.route("/api/register-device", methods=["POST"]) def api_register_device(): """Called by a network-booted device to register (mac, ip).""" body = request.get_json(force=True, silent=True) or request.form mac = (body.get("mac") or "").strip() ip = (body.get("ip") or request.remote_addr or "").strip() if not mac: return jsonify({"ok": False, "error": "mac required"}), 400 data = _load_network_devices() devices = data.get("devices", []) for d in devices: if (d.get("mac") or "").lower() == mac.lower(): d["ip"] = ip d["registered_at"] = time.time() d["action"] = d.get("action") or "wait" _save_network_devices(data) return jsonify({"ok": True, "message": "registered"}) devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"}) data["devices"] = devices _save_network_devices(data) return jsonify({"ok": True, "message": "registered"}) @app.route("/api/device-action-poll") def api_device_action_poll(): """Network device polls this to get its assigned action (deploy/backup) and URL.""" mac = (request.args.get("mac") or "").strip() if not mac: return jsonify({"action": "wait"}), 200 data = _load_network_devices() base = request.host_url.rstrip("/") for d in data.get("devices", []): if (d.get("mac") or "").lower() == mac.lower(): action = d.get("action") or "wait" if action == "deploy": return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"}) if action == "backup": return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"}) return jsonify({"action": "wait"}) return jsonify({"action": "wait"}) @app.route("/api/golden-image") def api_golden_image(): """Stream the golden image for network deploy (device pulls and writes to eMMC).""" if not GOLDEN_IMAGE.is_file(): return jsonify({"error": "Golden image not found"}), 404 return send_file( GOLDEN_IMAGE, mimetype="application/octet-stream", as_attachment=True, download_name="golden.img", ) @app.route("/api/backup-upload", methods=["POST"]) def api_backup_upload(): """Network device uploads its eMMC backup (raw body).""" mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20] if not mac: return jsonify({"error": "mac query param required"}), 400 BACKUPS_DIR.mkdir(parents=True, exist_ok=True) name = f"backup-net-{mac}-{int(time.time())}.img" path = BACKUPS_DIR / name try: with open(path, "wb") as f: while True: chunk = request.stream.read(1024 * 1024) if not chunk: break f.write(chunk) return jsonify({"ok": True, "file": name}) except (OSError, IOError) as e: if path.exists(): path.unlink(missing_ok=True) return jsonify({"error": str(e)}), 500 @app.route("/api/backups") def api_backups(): return jsonify({"backups": list_backups()}) @app.route("/api/backups//set-as-golden", methods=["POST"]) def api_backup_set_as_golden(name): """Copy this backup to golden.img so it becomes the image used for Deploy.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) shutil.copy2(path, GOLDEN_IMAGE) return jsonify({"ok": True, "message": f"Golden image set from {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups//shrink", methods=["POST"]) def api_backup_shrink(name): """Run PiShrink on a raw .img backup (shrinks in place). Requires PiShrink in LXC/host.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"): return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 pishrink = shutil.which("pishrink.sh") or "/usr/local/bin/pishrink.sh" if not pishrink or not os.path.isfile(pishrink): return jsonify({"ok": False, "error": "PiShrink not installed. Install on the host/LXC (e.g. scripts/install-pishrink-on-host.sh)"}), 503 try: proc = subprocess.run( [pishrink, "-n", name], cwd=str(BACKUPS_DIR), capture_output=True, text=True, timeout=3600, ) if proc.returncode != 0: return jsonify({ "ok": False, "error": "PiShrink failed", "detail": (proc.stderr or proc.stdout or "").strip() or f"exit code {proc.returncode}", }), 500 return jsonify({"ok": True, "message": f"Shrunk {name}"}) except subprocess.TimeoutExpired: return jsonify({"ok": False, "error": "PiShrink timed out"}), 504 except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups//compress", methods=["POST"]) def api_backup_compress(name): """Run PiShrink with compression (shrink + gz or xz) on a raw .img backup. Produces .img.gz or .img.xz.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"): return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 body = request.get_json(force=True, silent=True) or {} fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower() if fmt not in ("gz", "gzip", "xz"): fmt = "xz" if fmt == "gzip": fmt = "gz" pishrink = shutil.which("pishrink.sh") or "/usr/local/bin/pishrink.sh" if not pishrink or not os.path.isfile(pishrink): return jsonify({"ok": False, "error": "PiShrink not installed"}), 503 opts = ["-n", "-Z", "-a"] if fmt == "xz" else ["-n", "-z", "-a"] try: proc = subprocess.run( [pishrink] + opts + [name], cwd=str(BACKUPS_DIR), capture_output=True, text=True, timeout=3600, ) if proc.returncode != 0: return jsonify({ "ok": False, "error": "PiShrink failed", "detail": (proc.stderr or proc.stdout or "").strip() or f"exit code {proc.returncode}", }), 500 ext = ".xz" if fmt == "xz" else ".gz" return jsonify({"ok": True, "message": f"Compressed to {name}{ext}"}) except subprocess.TimeoutExpired: return jsonify({"ok": False, "error": "PiShrink timed out"}), 504 except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups/", methods=["PATCH"]) def api_backup_update(name): """Update backup metadata (display name, description) or rename the file.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 body = request.get_json(force=True, silent=True) or {} meta = _load_backups_meta() entry = meta.get(name, {}) new_filename = (body.get("filename") or "").strip() if new_filename: if not _safe_backup_name(new_filename): return jsonify({"ok": False, "error": "invalid new filename"}), 400 new_path = BACKUPS_DIR / new_filename if new_path.exists() and new_path != path: return jsonify({"ok": False, "error": "target filename already exists"}), 409 try: path.rename(new_path) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""} if name in meta: del meta[name] name = new_filename path = BACKUPS_DIR / name else: if "name" in body: entry["name"] = (body.get("name") or "").strip() or path.name if "description" in body: entry["description"] = (body.get("description") or "").strip() meta[name] = entry if not _save_backups_meta(meta): return jsonify({"ok": False, "error": "could not save metadata"}), 500 return jsonify({"ok": True, "name": name}) @app.route("/api/backups/", methods=["GET"]) def api_backup_download(name): if not _safe_backup_name(name): return jsonify({"error": "invalid name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"error": "not found"}), 404 return send_file(path, as_attachment=True, download_name=name) def _build_status_read(): try: if BUILD_STATUS_FILE.is_file(): with open(BUILD_STATUS_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"phase": "idle", "message": "", "output_name": None, "error": None} def _build_status_write(phase, message, output_name=None, error=None): try: BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(BUILD_STATUS_FILE, "w") as f: json.dump({ "phase": phase, "message": message, "output_name": output_name, "error": error, "updated": time.time(), }, f, indent=2) except OSError: pass def _raspios_latest_lite_url(): """Resolve latest Raspberry Pi OS Lite (arm64) .img.xz URL from official index.""" base = "https://downloads.raspberrypi.com/raspios_lite_arm64/images" try: with urllib.request.urlopen(base + "/", timeout=15) as r: html = r.read().decode("utf-8", errors="ignore") # Match raspios_lite_arm64-YYYY-MM-DD/ folders = re.findall(r"raspios_lite_arm64-(\d{4}-\d{2}-\d{2})/", html) if not folders: return None latest = sorted(folders)[-1] folder_url = f"{base}/raspios_lite_arm64-{latest}/" with urllib.request.urlopen(folder_url, timeout=15) as r: folder_html = r.read().decode("utf-8", errors="ignore") # Match .img.xz link (skip .sha256, .torrent, etc.) m = re.search(r'href="([^"]+\.img\.xz)"', folder_html) if not m: return None return folder_url + m.group(1) except Exception: return None def _build_cloudinit_worker(variant, user_data, meta_data, network_config): """Background: download Raspios, inject cloud-init, save to BACKUPS_DIR.""" out_name = f"raspios-{variant}-cloudinit-{time.strftime('%Y%m%d-%H%M%S')}.img" out_path = BACKUPS_DIR / out_name mount_point = None loop_dev = None temp_dir = None img_path = None xz_path = None try: _build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…") url = _raspios_latest_lite_url() if not url: _build_status_write("error", "", output_name=None, error="Could not resolve latest Raspios Lite URL") return _build_status_write("downloading", f"Downloading {url.split('/')[-1]}… (this may take several minutes)") temp_dir = Path(tempfile.mkdtemp(prefix="cloudinit-build-", dir=str(BACKUPS_DIR))) xz_path = temp_dir / "image.img.xz" with urllib.request.urlopen(url, timeout=3600) as resp: total = int(resp.headers.get("Content-Length", 0)) or 0 chunk_size = 1024 * 1024 with open(xz_path, "wb") as f: done = 0 while True: chunk = resp.read(chunk_size) if not chunk: break f.write(chunk) done += len(chunk) if total and done % (50 * 1024 * 1024) < chunk_size: _build_status_write("downloading", f"Downloaded {done // (1024*1024)} MB…") _build_status_write("decompressing", "Decompressing image…") img_path = temp_dir / "image.img" subprocess.run(["xz", "-d", "-k", "-f", str(xz_path)], check=True, capture_output=True, timeout=1800, cwd=str(temp_dir)) if not img_path.exists(): _build_status_write("error", "", output_name=None, error="Decompress failed: image.img not found") return _build_status_write("injecting", "Mounting boot partition and injecting cloud-init…") loop_out = subprocess.run(["losetup", "-f", "--show", "-P", str(img_path)], capture_output=True, text=True, timeout=10) if loop_out.returncode != 0: _build_status_write("error", "", output_name=None, error="losetup failed (loop device may not be available in this container)") return loop_dev = loop_out.stdout.strip() # First partition is usually boot (FAT32) boot_part = f"{loop_dev}p1" if not Path(boot_part).exists(): boot_part = f"{loop_dev}p2" if not Path(boot_part).exists(): _build_status_write("error", "", output_name=None, error="Boot partition not found") return mount_point = temp_dir / "mnt" mount_point.mkdir(exist_ok=True) subprocess.run(["mount", boot_part, str(mount_point)], check=True, capture_output=True, timeout=10) try: (mount_point / "user-data").write_text(user_data or DEFAULT_USER_DATA) (mount_point / "meta-data").write_text(meta_data or DEFAULT_META_DATA) (mount_point / "network-config").write_text(network_config or DEFAULT_NETWORK_CONFIG) finally: subprocess.run(["umount", str(mount_point)], capture_output=True, timeout=10) subprocess.run(["losetup", "-d", loop_dev], capture_output=True, timeout=10) loop_dev = None _build_status_write("finalizing", "Copying image to backups…") shutil.copy2(str(img_path), str(out_path)) _build_status_write("done", f"Built {out_name}", output_name=out_name) except subprocess.TimeoutExpired as e: _build_status_write("error", "", output_name=None, error=f"Timeout: {e}") except subprocess.CalledProcessError as e: _build_status_write("error", "", output_name=None, error=f"Command failed: {e.stderr or e}") except Exception as e: _build_status_write("error", "", output_name=None, error=str(e)) finally: if loop_dev: try: subprocess.run(["losetup", "-d", loop_dev], capture_output=True, timeout=5) except Exception: pass if mount_point and mount_point.exists(): try: subprocess.run(["umount", str(mount_point)], capture_output=True, timeout=5) except Exception: pass if temp_dir and temp_dir.exists(): try: shutil.rmtree(temp_dir, ignore_errors=True) except Exception: pass _build_lock = threading.Lock() _build_thread = None @app.route("/api/build-cloudinit-status") def api_build_cloudinit_status(): """Return current build status (phase, message, output_name, error).""" return jsonify(_build_status_read()) @app.route("/api/build-cloudinit", methods=["POST"]) def api_build_cloudinit(): """Start building a cloud-init ready Raspberry Pi OS image (download latest Lite, inject NoCloud). Runs in background.""" global _build_thread with _build_lock: st = _build_status_read() if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"): return jsonify({"ok": False, "error": "A build is already in progress"}), 409 _build_thread = threading.Thread( target=_build_cloudinit_worker, kwargs={ "variant": "lite", "user_data": (request.get_json(silent=True) or {}).get("user_data") or DEFAULT_USER_DATA, "meta_data": (request.get_json(silent=True) or {}).get("meta_data") or DEFAULT_META_DATA, "network_config": (request.get_json(silent=True) or {}).get("network_config") or DEFAULT_NETWORK_CONFIG, }, daemon=True, ) _build_thread.start() _build_status_write("resolving", "Starting…") return jsonify({"ok": True, "message": "Build started. Download and inject may take 15–30 min. Poll /api/build-cloudinit-status or refresh the page."}), 202 @app.route("/api/raspios-latest-url") def api_raspios_latest_url(): """Return the URL of the latest Raspberry Pi OS Lite arm64 image (for display only).""" url = _raspios_latest_lite_url() if not url: return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503 return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1]}) @app.route("/api/golden-info") def api_golden_info(): """Return whether golden image exists and its size/mtime for UI.""" if not GOLDEN_IMAGE.is_file(): return jsonify({"present": False}) try: st = GOLDEN_IMAGE.stat() return jsonify({"present": True, "size": st.st_size, "mtime": st.st_mtime}) except OSError: return jsonify({"present": False}) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)