#!/usr/bin/env python3 # Revision: 2 """ Flask dashboard for CM4 eMMC provisioning. Public home: deploy only (status, logs, how to connect). No login. Admin: login required — backups, cloud-init, portal files, set golden, users. """ import hashlib import json import os import re import shutil import sqlite3 import subprocess import tempfile import time import urllib.request from functools import wraps from pathlib import Path from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, session, Response, stream_with_context from werkzeug.security import generate_password_hash, check_password_hash try: from itsdangerous import BadSignature except ImportError: BadSignature = Exception # noqa: disable invalid-name app = Flask(__name__) # Set CM4_DASHBOARD_SECRET_KEY in env (e.g. via /opt/cm4-provisioning/dashboard.env) so sessions persist across restarts app.secret_key = os.environ.get("CM4_DASHBOARD_SECRET_KEY", os.urandom(24).hex()) # --- Paths --- BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning")) STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json")) LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log")) ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request")) DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source")) BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups"))) CLOUDINIT_IMAGES_DIR = Path(os.environ.get("CM4_CLOUDINIT_IMAGES_DIR", str(BASE_DIR / "cloudinit-images"))) PORTAL_FILES_DIR = Path(os.environ.get("CM4_PORTAL_FILES_DIR", str(BASE_DIR / "portal-files"))) GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img"))) NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json"))) BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json"))) BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR / "build_cloudinit_request.json"))) BUILD_CANCEL_FILE = BUILD_REQUEST_FILE.parent / "build_cloudinit_cancel" SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json"))) SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json"))) FIRST_BOOT_STATUS_FILE = Path(os.environ.get("CM4_FIRST_BOOT_STATUS_FILE", str(BASE_DIR / "first_boot_status.json"))) CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json"))) PORTAL_DESCRIPTIONS_FILE = Path(os.environ.get("CM4_PORTAL_DESCRIPTIONS_FILE", str(BASE_DIR / "portal_descriptions.json"))) DB_PATH = Path(os.environ.get("CM4_DASHBOARD_DB", str(BASE_DIR / "dashboard.db"))) TOGGLE_NETWORK_BOOT_SCRIPT = os.environ.get("CM4_TOGGLE_NETWORK_BOOT_SCRIPT", "/opt/cm4-provisioning/toggle-network-boot-dhcp.sh") DHCP_LEASES_FILE = os.environ.get("CM4_DHCP_LEASES_FILE", "/var/lib/misc/dnsmasq.leases") # EEPROM update (USB boot): tools and output files (shared with host script) EEPROM_DIR = Path(os.environ.get("CM4_EEPROM_DIR", "/opt/cm4-provisioning/eeprom")) EEPROM_CONFIG_TOOL = EEPROM_DIR / "rpi-eeprom-config" EEPROM_FW_FILE = EEPROM_DIR / "pieeprom.bin" EEPROM_UPD_FILE = BASE_DIR / "pieeprom.upd" EEPROM_SIG_FILE = BASE_DIR / "pieeprom.sig" # --- Database (admin users + activity logs) --- def get_db(): conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row return conn def init_db(): os.makedirs(DB_PATH.parent, exist_ok=True) conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at REAL NOT NULL ); CREATE TABLE IF NOT EXISTS admin_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, action TEXT NOT NULL, details TEXT, created_at REAL NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); CREATE INDEX IF NOT EXISTS idx_logs_created ON admin_logs(created_at DESC); """) conn.commit() conn.close() def admin_log(action, details=None): user_id = session.get("user_id") conn = get_db() conn.execute( "INSERT INTO admin_logs (user_id, action, details, created_at) VALUES (?, ?, ?, ?)", (user_id, action, details, time.time()), ) conn.commit() conn.close() def require_admin(f): @wraps(f) def wrapped(*args, **kwargs): try: logged_in = session.get("admin_logged_in") except (BadSignature, ValueError, OSError): # Invalid/rotated secret key or corrupted session cookie → treat as not logged in try: session.clear() except Exception: pass logged_in = False if not logged_in: if request.is_json or request.path.startswith("/api/"): return jsonify({"ok": False, "error": "Login required"}), 401 return redirect(url_for("login", next=request.url)) return f(*args, **kwargs) return wrapped def get_user_by_username(username): conn = get_db() row = conn.execute("SELECT id, username, password_hash FROM users WHERE username = ?", (username,)).fetchone() conn.close() return dict(row) if row else None def create_user(username, password): conn = get_db() try: conn.execute( "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)", (username, generate_password_hash(password), time.time()), ) conn.commit() return True except sqlite3.IntegrityError: return False finally: conn.close() def list_users(): conn = get_db() rows = conn.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall() conn.close() return [{"id": r["id"], "username": r["username"], "created_at": r["created_at"]} for r in rows] def change_password(user_id, new_password): conn = get_db() conn.execute( "UPDATE users SET password_hash = ? WHERE id = ?", (generate_password_hash(new_password), user_id), ) conn.commit() conn.close() def get_recent_logs(limit=100): conn = get_db() rows = conn.execute( "SELECT l.id, l.action, l.details, l.created_at, u.username FROM admin_logs l LEFT JOIN users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT ?", (limit,), ).fetchall() conn.close() return [{"id": r["id"], "action": r["action"], "details": r["details"], "created_at": r["created_at"], "username": r["username"]} for r in rows] @app.after_request def no_cache(response): if request.path == "/" or request.path.startswith("/api/") or request.path.startswith("/admin") or request.path.startswith("/login"): response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" response.headers["Pragma"] = "no-cache" return response @app.errorhandler(500) def handle_500(err): """Log 500 and return a simple response so the user is not left with a blank error.""" import traceback try: traceback.print_exc() except Exception: pass if request.path.startswith("/api/"): return jsonify({"ok": False, "error": "Internal server error"}), 500 return ( "Error" "

Something went wrong

Try logging in again or going home.

" "", 500, {"Content-Type": "text/html; charset=utf-8"}, ) # Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition) DEFAULT_USER_DATA = """#cloud-config package_update: true package_upgrade: false packages: - curl runcmd: - curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh - chmod +x /tmp/bootstrap.sh - /tmp/bootstrap.sh """ DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001 local-hostname: gnss.guard """ DEFAULT_NETWORK_CONFIG = """version: 2 ethernets: eth0: dhcp4: true """ DEFAULT_STATUS = { "phase": "idle", "message": "Waiting for reTerminal in boot mode or network.", "progress": None, "updated": None, } def _write_status(phase, message, progress=None): try: os.makedirs(os.path.dirname(STATUS_FILE) or ".", exist_ok=True) with open(STATUS_FILE, "w") as f: json.dump({"phase": phase, "message": message, "progress": progress, "updated": time.time()}, f) except (PermissionError, OSError): pass def _generate_eeprom_update(boot_order, extra_settings=None): """Generate pieeprom.upd and pieeprom.sig in BASE_DIR. Returns (True, None) or (False, error_message).""" extra_settings = extra_settings or {} if not EEPROM_CONFIG_TOOL.is_file() or not EEPROM_FW_FILE.is_file(): return False, "EEPROM tools not installed. Run install-eeprom-tools-on-lxc.sh on the LXC." try: os.makedirs(BASE_DIR, exist_ok=True) # Read current config from firmware image out = subprocess.run( [str(EEPROM_CONFIG_TOOL), str(EEPROM_FW_FILE)], capture_output=True, text=True, timeout=30, cwd=str(EEPROM_DIR), ) if out.returncode != 0: return False, (out.stderr or out.stdout or "rpi-eeprom-config failed").strip()[:500] lines = (out.stdout or "").strip().splitlines() # Strip lines we replace skip_keys = {"BOOT_ORDER", "NET_BOOT_MAX_RETRIES", "DHCP_TIMEOUT", "DHCP_REQ_TIMEOUT", "TFTP_IP", "NET_INSTALL_AT_POWER_ON"} new_lines = [] for line in lines: key = line.split("=", 1)[0].strip() if "=" in line else "" if key in skip_keys: continue new_lines.append(line) # Add our settings new_lines.append(f"BOOT_ORDER={boot_order}") new_lines.append("NET_BOOT_MAX_RETRIES=3") new_lines.append("DHCP_TIMEOUT=1500") new_lines.append("DHCP_REQ_TIMEOUT=500") new_lines.append("NET_INSTALL_AT_POWER_ON=0") for k, v in extra_settings.items(): new_lines.append(f"{k}={v}") with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: f.write("\n".join(new_lines) + "\n") conf_path = f.name try: out2 = subprocess.run( [str(EEPROM_CONFIG_TOOL), "--config", conf_path, "--out", str(EEPROM_UPD_FILE), str(EEPROM_FW_FILE)], capture_output=True, text=True, timeout=60, cwd=str(EEPROM_DIR), ) if out2.returncode != 0: return False, (out2.stderr or out2.stdout or "rpi-eeprom-config --config failed").strip()[:500] if not EEPROM_UPD_FILE.is_file() or EEPROM_UPD_FILE.stat().st_size == 0: return False, "Generated pieeprom.upd is missing or empty" # Write signature (sha256 hex) h = hashlib.sha256() with open(EEPROM_UPD_FILE, "rb") as f: h.update(f.read()) EEPROM_SIG_FILE.write_text(h.hexdigest()) return True, None finally: try: os.unlink(conf_path) except OSError: pass except subprocess.TimeoutExpired: return False, "Timeout running rpi-eeprom-config" except (PermissionError, OSError) as e: return False, str(e) def read_status(): try: with open(STATUS_FILE, "r") as f: data = json.load(f) out = {**DEFAULT_STATUS, **data} if out.get("phase") == "waiting_choice": try: with open(DEVICE_SOURCE_FILE, "r") as sf: out["device_source"] = (sf.read() or "").strip() or "usb" except (FileNotFoundError, OSError): out["device_source"] = "usb" return out except (FileNotFoundError, json.JSONDecodeError): return DEFAULT_STATUS def read_log_tail(lines=50): try: with open(LOG_FILE, "r") as f: all_lines = f.readlines() return "".join(all_lines[-lines:]).strip() if all_lines else "" except (FileNotFoundError, PermissionError): return "" def _load_network_devices(): try: if NETWORK_DEVICES_FILE.is_file(): with open(NETWORK_DEVICES_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"devices": []} def _save_network_devices(data): try: os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True) with open(NETWORK_DEVICES_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json" CLOUDINIT_META_FILE = CLOUDINIT_IMAGES_DIR / "cloudinit_meta.json" def _load_backups_meta(): try: if BACKUPS_META_FILE.is_file(): with open(BACKUPS_META_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {} def _save_backups_meta(data): try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) with open(BACKUPS_META_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False def _safe_backup_name(name): """Reject path traversal and ensure it's a backup filename we manage.""" if not name or ".." in name or "/" in name or "\\" in name: return False if not name.endswith((".img", ".img.gz", ".img.xz")): return False return True def _load_cloudinit_meta(): try: if CLOUDINIT_META_FILE.is_file(): with open(CLOUDINIT_META_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {} def _save_cloudinit_meta(data): try: CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True) with open(CLOUDINIT_META_FILE, "w") as f: json.dump(data, f, indent=2) return True except (PermissionError, OSError): return False def list_backups(): if not BACKUPS_DIR.is_dir(): return [] meta = _load_backups_meta() out = [] for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")): try: st = p.stat() m = meta.get(p.name, {}) out.append({ "name": p.name, "display_name": m.get("name") or p.name, "description": m.get("description") or "", "size": st.st_size, "mtime": st.st_mtime, }) except OSError: pass return out def list_cloudinit_images(): if not CLOUDINIT_IMAGES_DIR.is_dir(): return [] meta = _load_cloudinit_meta() out = [] for p in sorted(CLOUDINIT_IMAGES_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file() and p.name != "cloudinit_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")): try: st = p.stat() m = meta.get(p.name, {}) out.append({ "name": p.name, "display_name": m.get("name") or p.name, "description": m.get("description") or "", "size": st.st_size, "mtime": st.st_mtime, }) except OSError: pass return out def _golden_current_source(): """Return (source, name) if golden is a symlink to backups or cloudinit, else (None, None).""" if not GOLDEN_IMAGE.exists(): return None, None try: target = GOLDEN_IMAGE.resolve() try: target.relative_to(BACKUPS_DIR.resolve()) return "backups", target.name except ValueError: pass try: target.relative_to(CLOUDINIT_IMAGES_DIR.resolve()) return "cloudinit", target.name except ValueError: pass except OSError: pass return None, None @app.route("/") def index(): return render_template("home.html") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "GET": if session.get("admin_logged_in"): return redirect(url_for("admin")) return render_template("login.html") username = (request.form.get("username") or "").strip() password = (request.form.get("password") or "") if not username: return render_template("login.html", error="Username required") user = get_user_by_username(username) if not user: # First user: allow self-registration if not list_users(): if not password or len(password) < 6: return render_template("login.html", error="First user: choose a password (min 6 characters)") create_user(username, password) session["admin_logged_in"] = True session["user_id"] = get_user_by_username(username)["id"] session["username"] = username admin_log("first_admin_created", username) return redirect(request.args.get("next") or url_for("admin")) return render_template("login.html", error="Invalid username or password") if not check_password_hash(user["password_hash"], password): return render_template("login.html", error="Invalid username or password") session["admin_logged_in"] = True session["user_id"] = user["id"] session["username"] = user["username"] admin_log("login", username) return redirect(request.args.get("next") or url_for("admin")) @app.route("/logout") def logout(): if session.get("admin_logged_in"): admin_log("logout", session.get("username")) session.clear() return redirect(url_for("index")) @app.route("/admin") @require_admin def admin(): return render_template("admin.html", username=session.get("username", "Admin")) @app.route("/admin/portal-files") @require_admin def admin_portal_files(): return render_template("portal_files.html", username=session.get("username", "Admin")) @app.route("/admin/cloudinit-build") @require_admin def admin_cloudinit_build(): return render_template("cloudinit_build.html", username=session.get("username", "Admin")) # Serve portal files for wget (e.g. cloud-init first boot). No auth. # Subpaths allowed (e.g. first-boot/splash.png); ".." and "\\" forbidden to prevent traversal. @app.route("/files/") def serve_portal_file(filename): if ".." in filename or "\\" in filename: return jsonify({"error": "invalid path"}), 400 path = (PORTAL_FILES_DIR / filename).resolve() try: path.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return jsonify({"error": "invalid path"}), 400 if not path.is_file(): return jsonify({"error": "not found"}), 404 return send_file(path, as_attachment=False, download_name=filename.split("/")[-1]) @app.route("/api/portal-files-debug") def api_portal_files_debug(): """No-auth debug: what PORTAL_FILES_DIR the process sees (for troubleshooting).""" try: names = [] if PORTAL_FILES_DIR.is_dir(): for p in sorted(PORTAL_FILES_DIR.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())): if ".." in p.name or p.name.startswith("."): continue names.append({"name": p.name, "type": "dir" if p.is_dir() else "file"}) return jsonify({ "portal_files_dir": str(PORTAL_FILES_DIR), "exists": PORTAL_FILES_DIR.exists(), "is_dir": PORTAL_FILES_DIR.is_dir(), "items": names, "CM4_PROVISIONING_DIR": os.environ.get("CM4_PROVISIONING_DIR"), }) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/status") def api_status(): return jsonify(read_status()) @app.route("/api/status-clear", methods=["POST"]) def api_status_clear(): """Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again).""" try: with open(STATUS_FILE, "w") as f: json.dump({ "phase": "idle", "message": DEFAULT_STATUS["message"], "progress": None, "updated": None, }, f) return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write status"}), 500 def _first_boot_status_read(): """Read first-boot status (device reports progress during first-boot.sh).""" try: with open(FIRST_BOOT_STATUS_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {"phase": "idle", "message": "", "step": "", "step_name": "", "hostname": "", "ip": "", "updated": None} def _first_boot_status_write(phase, message="", step="", step_name="", hostname="", ip="", error=None): """Write first-boot status (called by POST from device).""" try: os.makedirs(os.path.dirname(FIRST_BOOT_STATUS_FILE) or ".", exist_ok=True) data = { "phase": phase, "message": message, "step": step, "step_name": step_name, "hostname": hostname, "ip": ip or "", "updated": time.time(), } if error: data["error"] = error with open(FIRST_BOOT_STATUS_FILE, "w") as f: json.dump(data, f, indent=2) except (PermissionError, OSError): pass @app.route("/api/first-boot-status", methods=["GET"]) def api_first_boot_status_get(): """Return current first-boot progress (for dashboard to poll).""" return jsonify(_first_boot_status_read()) @app.route("/api/first-boot-status", methods=["POST"]) def api_first_boot_status_post(): """Called by device during first-boot.sh to report progress. No auth (device on local net).""" body = request.get_json(silent=True) or {} phase = (body.get("phase") or "running").strip().lower() if phase not in ("started", "running", "done", "error"): phase = "running" message = (body.get("message") or "").strip()[:500] step = (body.get("step") or "").strip()[:10] step_name = (body.get("step_name") or "").strip()[:80] hostname = (body.get("hostname") or "").strip()[:64] ip = (body.get("ip") or "").strip()[:45] error = (body.get("error") or "").strip()[:500] if phase == "error" else None _first_boot_status_write(phase=phase, message=message, step=step, step_name=step_name, hostname=hostname, ip=ip, error=error) return jsonify({"ok": True}) @app.route("/api/log") def api_log(): return jsonify({"log": read_log_tail()}) @app.route("/api/eeprom-presets") def api_eeprom_presets(): """Return available boot order presets for the Update EEPROM dropdown.""" presets = [ {"id": "0x1", "label": "eMMC only"}, {"id": "0xf21", "label": "eMMC first, then network"}, {"id": "0xf12", "label": "Network first, then eMMC"}, ] return jsonify({"presets": presets}) @app.route("/api/pending-devices") def api_pending_devices(): """Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy.""" st = read_status() usb = None if st.get("phase") == "waiting_choice": usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")} data = _load_network_devices() network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")] return jsonify({"usb": usb, "network": network}) @app.route("/api/device-action", methods=["POST"]) def api_device_action(): """User chose Backup, Deploy, or Update EEPROM for a device. source=usb | network; for network pass mac=.""" body = request.get_json(force=True, silent=True) or {} source = (body.get("source") or "").strip().lower() action = (body.get("action") or "").strip().lower() if action not in ("backup", "deploy", "reboot", "eeprom_update"): return jsonify({"ok": False, "error": "action must be 'backup', 'deploy', 'reboot', or 'eeprom_update'"}), 400 if action == "reboot" and source != "network": return jsonify({"ok": False, "error": "'reboot' is only for network devices"}), 400 if action == "eeprom_update" and source != "usb": return jsonify({"ok": False, "error": "'eeprom_update' is only for USB-connected devices"}), 400 if source == "usb": if action == "eeprom_update": boot_order = (body.get("boot_order") or "0xf21").strip().lower() if boot_order not in ("0x1", "0xf21", "0xf12"): return jsonify({"ok": False, "error": "boot_order must be 0x1, 0xf21, or 0xf12"}), 400 ok, err = _generate_eeprom_update(boot_order) if not ok: return jsonify({"ok": False, "error": err or "Failed to generate EEPROM update"}), 500 try: os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True) with open(ACTION_REQUEST_FILE, "w") as f: f.write("eeprom_update") _write_status("eeprom_update", "EEPROM update ready; host will write to device boot partition.") return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write action file"}), 500 try: os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True) # If user requested "shrink after backup", create flag so host runs PiShrink after dd if action == "backup" and body.get("shrink"): try: (BASE_DIR / "shrink_next_backup").write_text("1") except (PermissionError, OSError): pass # host may still have SHRINK_BACKUP=1 with open(ACTION_REQUEST_FILE, "w") as f: f.write(action) if action == "deploy": _first_boot_status_write("idle", "", hostname="", ip="") return jsonify({"ok": True}) except (PermissionError, OSError): return jsonify({"ok": False, "error": "Could not write action file"}), 500 if source == "network": mac = (body.get("mac") or "").strip() if not mac: return jsonify({"ok": False, "error": "mac required for network device"}), 400 data = _load_network_devices() for d in data.get("devices", []): if (d.get("mac") or "").lower() == mac.lower(): d["action"] = action d["action_at"] = time.time() _save_network_devices(data) ip = d.get("ip") or mac if action == "deploy": _write_status("flashing", f"Deploying to {ip} (network)...") _first_boot_status_write("idle", "", hostname="", ip="") elif action == "backup": _write_status("backup", f"Backing up {ip} (network)...") return jsonify({"ok": True}) return jsonify({"ok": False, "error": "Device not found"}), 404 return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400 @app.route("/api/register-device", methods=["POST"]) def api_register_device(): """Called by a network-booted device to register (mac, ip).""" body = request.get_json(force=True, silent=True) or request.form mac = (body.get("mac") or "").strip() ip = (body.get("ip") or request.remote_addr or "").strip() if not mac: return jsonify({"ok": False, "error": "mac required"}), 400 data = _load_network_devices() devices = data.get("devices", []) for d in devices: if (d.get("mac") or "").lower() == mac.lower(): d["ip"] = ip d["registered_at"] = time.time() d["action"] = d.get("action") or "wait" _save_network_devices(data) return jsonify({"ok": True, "message": "registered"}) devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"}) data["devices"] = devices _save_network_devices(data) return jsonify({"ok": True, "message": "registered"}) def _dhcp_network_boot_run(cmd): """Run toggle script with enable|disable|status. Returns (ok, output_or_error).""" if not os.path.isfile(TOGGLE_NETWORK_BOOT_SCRIPT) or not os.access(TOGGLE_NETWORK_BOOT_SCRIPT, os.X_OK): return False, "Toggle script not installed" try: out = subprocess.run( [TOGGLE_NETWORK_BOOT_SCRIPT, cmd], capture_output=True, text=True, timeout=10, ) if out.returncode != 0: return False, (out.stderr or out.stdout or "script failed").strip() return True, (out.stdout or "").strip() except subprocess.TimeoutExpired: return False, "Timeout" except Exception as e: return False, str(e) @app.route("/api/dhcp-network-boot", methods=["GET"]) def api_dhcp_network_boot_get(): """Return whether DHCP network-boot options (66/67) are enabled.""" ok, out = _dhcp_network_boot_run("status") if not ok: return jsonify({"enabled": None, "error": out}), 200 return jsonify({"enabled": out.strip().lower() == "enabled"}) @app.route("/api/dhcp-network-boot", methods=["POST"]) def api_dhcp_network_boot_post(): """Enable or disable DHCP network-boot options (DHCP server keeps running). Body: { \"enabled\": true|false }.""" body = request.get_json(force=True, silent=True) or {} enabled = body.get("enabled") if enabled is None: return jsonify({"ok": False, "error": "enabled required (true|false)"}), 400 cmd = "enable" if enabled else "disable" ok, out = _dhcp_network_boot_run(cmd) if not ok: return jsonify({"ok": False, "error": out}), 500 return jsonify({"ok": True, "enabled": enabled}) @app.route("/api/action-done", methods=["POST"]) def api_action_done(): """Called by a device when deploy or backup has completed. Disables DHCP network-boot so the device boots from eMMC next time.""" mac = request.args.get("mac") or ((request.get_json(silent=True) or {}).get("mac") or "") # Remove device from network-devices list so it doesn't keep showing if mac: data = _load_network_devices() data["devices"] = [d for d in data.get("devices", []) if (d.get("mac") or "").lower() != mac.lower()] _save_network_devices(data) ok, _ = _dhcp_network_boot_run("disable") _write_status("done", f"Done ({mac or 'network device'}). Network boot disabled; device will boot from eMMC on next boot.") if not ok: return jsonify({"ok": False, "error": "Could not disable DHCP network boot"}), 500 return jsonify({"ok": True, "message": "Network boot disabled; device will boot from eMMC on next boot"}) def _read_dhcp_leases(): """Read dnsmasq lease file. Returns (leases_list, error_string). leases_list items: {expiry, mac, ip, hostname}.""" if not DHCP_LEASES_FILE or not os.path.isfile(DHCP_LEASES_FILE): return [], None try: leases = [] with open(DHCP_LEASES_FILE, "r") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 4: leases.append({ "expiry": int(parts[0]) if parts[0].isdigit() else 0, "mac": parts[1], "ip": parts[2], "hostname": parts[3] if len(parts) > 3 else "", }) return leases, None except (OSError, PermissionError) as e: return [], str(e) @app.route("/api/dhcp-leases") def api_dhcp_leases(): """Return current DHCP leases from dnsmasq (when dashboard runs on LXC with dnsmasq).""" leases, err = _read_dhcp_leases() if err: return jsonify({"leases": [], "error": err}) return jsonify({"leases": leases, "error": None}) @app.route("/api/device-action-poll") def api_device_action_poll(): """Network device polls this to get its assigned action (deploy/backup) and URL.""" mac = (request.args.get("mac") or "").strip() if not mac: return jsonify({"action": "wait"}), 200 data = _load_network_devices() base = request.host_url.rstrip("/") for d in data.get("devices", []): if (d.get("mac") or "").lower() == mac.lower(): action = d.get("action") or "wait" if action == "deploy": return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"}) if action == "backup": return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"}) if action == "reboot": return jsonify({"action": "reboot"}) return jsonify({"action": "wait"}) return jsonify({"action": "wait"}) @app.route("/api/golden-image") def api_golden_image(): """Stream the golden image for network deploy (device pulls and writes to eMMC). Decompresses .img.xz/.img.gz on the fly.""" if not GOLDEN_IMAGE.is_file(): return jsonify({"error": "Golden image not found"}), 404 resolved = GOLDEN_IMAGE.resolve() if str(resolved).endswith(".img.xz"): def stream(): proc = subprocess.Popen( ["xz", "-d", "-c", str(resolved)], stdout=subprocess.PIPE, bufsize=65536, ) try: while True: chunk = proc.stdout.read(65536) if not chunk: break yield chunk finally: proc.wait() return Response( stream_with_context(stream()), mimetype="application/octet-stream", headers={"Content-Disposition": "attachment; filename=golden.img"}, ) if str(resolved).endswith(".img.gz"): def stream(): proc = subprocess.Popen( ["gzip", "-c", "-d", str(resolved)], stdout=subprocess.PIPE, bufsize=65536, ) try: while True: chunk = proc.stdout.read(65536) if not chunk: break yield chunk finally: proc.wait() return Response( stream_with_context(stream()), mimetype="application/octet-stream", headers={"Content-Disposition": "attachment; filename=golden.img"}, ) return send_file( GOLDEN_IMAGE, mimetype="application/octet-stream", as_attachment=True, download_name="golden.img", ) @app.route("/api/backup-upload", methods=["POST"]) def api_backup_upload(): """Network device uploads its eMMC backup (raw body).""" mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20] if not mac: return jsonify({"error": "mac query param required"}), 400 BACKUPS_DIR.mkdir(parents=True, exist_ok=True) name = f"backup-net-{mac}-{int(time.time())}.img" path = BACKUPS_DIR / name try: with open(path, "wb") as f: while True: chunk = request.stream.read(1024 * 1024) if not chunk: break f.write(chunk) return jsonify({"ok": True, "file": name}) except (OSError, IOError) as e: if path.exists(): path.unlink(missing_ok=True) return jsonify({"error": str(e)}), 500 @app.route("/api/backups") @require_admin def api_backups(): return jsonify({"backups": list_backups(), "backups_dir": str(BACKUPS_DIR)}) @app.route("/api/cloudinit-images") @require_admin def api_cloudinit_images(): return jsonify({"images": list_cloudinit_images(), "cloudinit_dir": str(CLOUDINIT_IMAGES_DIR)}) def _load_portal_descriptions(): """Return dict mapping path -> description (path can be file or folder).""" if not PORTAL_DESCRIPTIONS_FILE.is_file(): return {} try: with open(PORTAL_DESCRIPTIONS_FILE, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): return {} def _save_portal_descriptions(descriptions): try: PORTAL_DESCRIPTIONS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(PORTAL_DESCRIPTIONS_FILE, "w", encoding="utf-8") as f: json.dump(descriptions, f, indent=2) return True except OSError: return False def _portal_files_list_impl(subpath): """Shared impl for listing portal files. Returns (items, descriptions, base_url, current_path).""" base_url = request.host_url.rstrip("/") + "/files/" empty_items = [] if ".." in subpath or "\\" in subpath: return empty_items, {}, base_url, subpath if not PORTAL_FILES_DIR.is_dir(): try: PORTAL_FILES_DIR.mkdir(parents=True, exist_ok=True) except OSError: pass if not PORTAL_FILES_DIR.is_dir(): return empty_items, {}, base_url, subpath list_dir = (PORTAL_FILES_DIR / subpath).resolve() if subpath else PORTAL_FILES_DIR try: list_dir.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return empty_items, {}, base_url, subpath if not list_dir.is_dir(): return empty_items, {}, base_url, subpath items = [] for p in sorted(list_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())): if ".." in p.name or p.name.startswith("."): continue rel = (subpath + "/" + p.name) if subpath else p.name try: if p.is_dir(): items.append({"type": "folder", "path": rel, "name": p.name}) else: items.append({"type": "file", "path": rel, "name": p.name, "size": p.stat().st_size, "mtime": p.stat().st_mtime}) except OSError: pass descriptions = _load_portal_descriptions() return items, descriptions, base_url, subpath @app.route("/api/portal-files") def api_portal_files_list(): """List one level: root or contents of path=... (folders and files). ?debug=1 allows unauthenticated read-only list.""" subpath = request.args.get("path", "").strip().strip("/") debug = request.args.get("debug") == "1" if not debug and not session.get("admin_logged_in"): if request.is_json or request.path.startswith("/api/"): return jsonify({"ok": False, "error": "Login required"}), 401 return redirect(url_for("login", next=request.url)) items, descriptions, base_url, subpath = _portal_files_list_impl(subpath) resp = jsonify({ "items": items, "base_url": base_url, "descriptions": descriptions, "current_path": subpath, "portal_files_dir": str(PORTAL_FILES_DIR), }) resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate" return resp @app.route("/api/portal-files/descriptions", methods=["GET", "PATCH"]) @require_admin def api_portal_descriptions(): if request.method == "PATCH": data = request.get_json(force=True, silent=True) or {} desc = data.get("descriptions") if not isinstance(desc, dict): return jsonify({"ok": False, "error": "descriptions must be a dict"}), 400 if not _save_portal_descriptions(desc): return jsonify({"ok": False, "error": "save failed"}), 500 admin_log("portal_descriptions", "updated") return jsonify({"ok": True}) return jsonify({"descriptions": _load_portal_descriptions()}) @app.route("/api/portal-files/folder", methods=["POST"]) @require_admin def api_portal_folder_create(): data = request.get_json(force=True, silent=True) or {} path = (data.get("path") or "").strip().strip("/") if not path: return jsonify({"ok": False, "error": "path required"}), 400 if ".." in path or "\\" in path: return jsonify({"ok": False, "error": "invalid path"}), 400 full = (PORTAL_FILES_DIR / path).resolve() try: full.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return jsonify({"ok": False, "error": "invalid path"}), 400 try: full.mkdir(parents=True, exist_ok=True) admin_log("portal_folder_create", path) return jsonify({"ok": True, "path": path}) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/portal-files/upload", methods=["POST"]) @require_admin def api_portal_files_upload(): if "file" not in request.files and "upload" not in request.files: return jsonify({"ok": False, "error": "no file (use field 'file' or 'upload')"}), 400 f = request.files.get("file") or request.files.get("upload") if not f or not f.filename: return jsonify({"ok": False, "error": "no file selected"}), 400 base_name = re.sub(r"[^\w\-./]", "_", f.filename)[:120] or "upload" if ".." in base_name or base_name.startswith("/"): return jsonify({"ok": False, "error": "invalid filename"}), 400 subpath = (request.form.get("path") or "").strip().strip("/") if subpath and (".." in subpath or "\\" in subpath): return jsonify({"ok": False, "error": "invalid path"}), 400 name = (subpath + "/" + base_name) if subpath else base_name path = (PORTAL_FILES_DIR / name).resolve() try: path.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return jsonify({"ok": False, "error": "invalid path"}), 400 try: path.parent.mkdir(parents=True, exist_ok=True) f.save(str(path)) admin_log("portal_upload", name) return jsonify({"ok": True, "name": name, "url": request.host_url.rstrip("/") + "/files/" + name}) except (OSError, IOError) as e: if path.exists(): path.unlink(missing_ok=True) return jsonify({"ok": False, "error": str(e)}), 500 # Text extensions allowed for editor (others are read-only or binary) _PORTAL_EDITABLE_EXTENSIONS = frozenset( ".sh .bash .conf .cfg .ini .desktop .json .py .yml .yaml .md .txt .plymouth .script".split() ) def _portal_language_for_path(name): """Return Ace/editor language mode from filename.""" n = (name or "").lower() if n.endswith(".sh") or n.endswith(".bash"): return "sh" if n.endswith(".json"): return "json" if n.endswith(".py"): return "python" if n.endswith(".yml") or n.endswith(".yaml"): return "yaml" if n.endswith(".md"): return "markdown" if n.endswith(".conf") or n.endswith(".cfg") or n.endswith(".ini") or n.endswith(".desktop") or n.endswith(".plymouth"): return "ini" if n.endswith(".script"): return "sh" return "plain_text" @app.route("/api/portal-files/content", methods=["GET"]) @require_admin def api_portal_file_content_get(): """Return file content as text for the editor. path= query param. Rejects binary.""" path_arg = (request.args.get("path") or "").strip() if not path_arg or ".." in path_arg or "\\" in path_arg: return jsonify({"ok": False, "error": "invalid path"}), 400 path = (PORTAL_FILES_DIR / path_arg).resolve() try: path.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return jsonify({"ok": False, "error": "invalid path"}), 400 if not path.is_file(): return jsonify({"ok": False, "error": "not found"}), 404 ext = path.suffix.lower() if path.suffix else "" if ext and ext not in _PORTAL_EDITABLE_EXTENSIONS: return jsonify({"ok": False, "error": "binary or unsupported file type for editing"}), 400 try: raw = path.read_bytes() text = raw.decode("utf-8") except UnicodeDecodeError: return jsonify({"ok": False, "error": "binary or unsupported encoding"}), 400 except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 return jsonify({ "ok": True, "path": path_arg, "content": text, "language": _portal_language_for_path(path.name), }) @app.route("/api/portal-files/content", methods=["PUT"]) @require_admin def api_portal_file_content_put(): """Write file content from editor. JSON body: { \"path\": \"...\", \"content\": \"...\" }.""" data = request.get_json(silent=True) or {} path_arg = (data.get("path") or "").strip() content = data.get("content") if not path_arg or ".." in path_arg or "\\" in path_arg: return jsonify({"ok": False, "error": "invalid path"}), 400 if content is None: return jsonify({"ok": False, "error": "content required"}), 400 path = (PORTAL_FILES_DIR / path_arg).resolve() try: path.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return jsonify({"ok": False, "error": "invalid path"}), 400 if path.is_dir(): return jsonify({"ok": False, "error": "cannot edit a folder"}), 400 ext = path.suffix.lower() if path.suffix else "" if path.exists() and ext and ext not in _PORTAL_EDITABLE_EXTENSIONS: return jsonify({"ok": False, "error": "unsupported file type for editing"}), 400 if not isinstance(content, str): content = str(content) try: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") admin_log("portal_edit", path_arg) return jsonify({"ok": True, "path": path_arg}) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/portal-files/", methods=["DELETE"]) @require_admin def api_portal_file_delete(name): if ".." in name or "\\" in name: return jsonify({"ok": False, "error": "invalid name"}), 400 path = (PORTAL_FILES_DIR / name).resolve() try: path.relative_to(PORTAL_FILES_DIR.resolve()) except ValueError: return jsonify({"ok": False, "error": "invalid path"}), 400 if path.is_file(): try: path.unlink() admin_log("portal_delete", name) return jsonify({"ok": True}) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 if path.is_dir(): if any(path.iterdir()): return jsonify({"ok": False, "error": "folder not empty"}), 400 try: path.rmdir() admin_log("portal_folder_delete", name) return jsonify({"ok": True}) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 return jsonify({"ok": False, "error": "not found"}), 404 @app.route("/api/backups/upload", methods=["POST"]) @require_admin def api_backups_upload(): """Upload an image file from the dashboard (multipart form).""" if "file" not in request.files and "image" not in request.files: return jsonify({"ok": False, "error": "no file in request (use field 'file' or 'image')"}), 400 f = request.files.get("file") or request.files.get("image") if not f or not f.filename: return jsonify({"ok": False, "error": "no file selected"}), 400 base = (f.filename.rsplit(".", 1)[0] if "." in f.filename else f.filename).strip() or "upload" safe_base = re.sub(r"[^\w\-.]", "_", base)[:80] if not safe_base: safe_base = "upload" ext = "" if f.filename.lower().endswith(".img.xz"): ext = ".img.xz" elif f.filename.lower().endswith(".img.gz"): ext = ".img.gz" elif f.filename.lower().endswith(".img"): ext = ".img" else: ext = ".img" name = f"{safe_base}-{int(time.time())}{ext}" if not _safe_backup_name(name): name = f"upload-{int(time.time())}.img" path = BACKUPS_DIR / name try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) f.save(str(path)) return jsonify({"ok": True, "name": name, "message": f"Uploaded {name}"}) except (OSError, IOError) as e: if path.exists(): path.unlink(missing_ok=True) return jsonify({"ok": False, "error": str(e)}), 500 def _set_golden_from_path(path): """Set golden image to point to given path (file in backups or cloudinit dir).""" GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True) if GOLDEN_IMAGE.exists(): GOLDEN_IMAGE.unlink() path_resolved = path.resolve() try: path_resolved.relative_to(BACKUPS_DIR.resolve()) except ValueError: try: path_resolved.relative_to(CLOUDINIT_IMAGES_DIR.resolve()) except ValueError: shutil.copy2(path, GOLDEN_IMAGE) return os.symlink(path_resolved, GOLDEN_IMAGE) @app.route("/api/backups//set-as-golden", methods=["POST"]) @require_admin def api_backup_set_as_golden(name): """Use this backup as the golden image (symlink).""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 try: BACKUPS_DIR.mkdir(parents=True, exist_ok=True) _set_golden_from_path(path) admin_log("set_golden", f"backups/{name}") return jsonify({"ok": True, "message": f"Golden image set from backup {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/cloudinit-images//set-as-golden", methods=["POST"]) @require_admin def api_cloudinit_set_as_golden(name): """Use this cloud-init image as the golden image (symlink).""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid name"}), 400 path = CLOUDINIT_IMAGES_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "image not found"}), 404 try: CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True) _set_golden_from_path(path) admin_log("set_golden", f"cloudinit/{name}") return jsonify({"ok": True, "message": f"Golden image set from cloud-init image {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 def _request_host_shrink(name, action="shrink", format="xz"): """Write shrink request for host and poll shrink_status.json. Returns (ok, message_or_error).""" req = {"name": name, "action": action} if action == "compress": req["format"] = "gz" if format == "gz" else "xz" try: SHRINK_REQUEST_FILE.write_text(json.dumps(req)) except OSError as e: return False, str(e) deadline = time.monotonic() + 2100 # 35 min while time.monotonic() < deadline: time.sleep(5) if not SHRINK_STATUS_FILE.exists(): continue try: data = json.loads(SHRINK_STATUS_FILE.read_text()) except (OSError, ValueError): continue if data.get("name") != name: continue phase = data.get("phase") if phase == "done": return True, data.get("message") or f"Shrunk {name}" if phase == "error": return False, data.get("error") or "PiShrink failed" return False, "Shrink timed out (run on host may still be in progress)" @app.route("/api/cloudinit-images/", methods=["GET"]) @require_admin def api_cloudinit_download(name): if not _safe_backup_name(name): return jsonify({"error": "invalid name"}), 400 path = CLOUDINIT_IMAGES_DIR / name if not path.is_file(): return jsonify({"error": "not found"}), 404 return send_file(path, as_attachment=True, download_name=name) @app.route("/api/cloudinit-images/", methods=["PATCH"]) @require_admin def api_cloudinit_update(name): if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid name"}), 400 path = CLOUDINIT_IMAGES_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "not found"}), 404 body = request.get_json(force=True, silent=True) or {} meta = _load_cloudinit_meta() entry = meta.get(name, {}) new_filename = (body.get("filename") or "").strip() if new_filename: if not _safe_backup_name(new_filename): return jsonify({"ok": False, "error": "invalid new filename"}), 400 new_path = CLOUDINIT_IMAGES_DIR / new_filename if new_path.exists() and new_path != path: return jsonify({"ok": False, "error": "target filename already exists"}), 409 try: path.rename(new_path) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""} meta.pop(name, None) name = new_filename path = CLOUDINIT_IMAGES_DIR / name else: if "name" in body: entry["name"] = (body.get("name") or "").strip() or path.name if "description" in body: entry["description"] = (body.get("description") or "").strip() meta[name] = entry if not _save_cloudinit_meta(meta): return jsonify({"ok": False, "error": "could not save metadata"}), 500 return jsonify({"ok": True, "name": name}) @app.route("/api/cloudinit-images/", methods=["DELETE"]) @require_admin def api_cloudinit_delete(name): if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid name"}), 400 path = CLOUDINIT_IMAGES_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "not found"}), 404 try: if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink(): try: if GOLDEN_IMAGE.resolve() == path.resolve(): GOLDEN_IMAGE.unlink() except OSError: pass path.unlink() meta = _load_cloudinit_meta() meta.pop(name, None) _save_cloudinit_meta(meta) admin_log("cloudinit_delete", name) return jsonify({"ok": True, "message": f"Deleted {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups//shrink", methods=["POST"]) @require_admin def api_backup_shrink(name): """Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"): return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 ok, msg = _request_host_shrink(name, action="shrink") if ok: return jsonify({"ok": True, "message": msg}) return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500) @app.route("/api/backups//compress", methods=["POST"]) @require_admin def api_backup_compress(name): """Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"): return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 body = request.get_json(force=True, silent=True) or {} fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower() if fmt not in ("gz", "gzip", "xz"): fmt = "xz" if fmt == "gzip": fmt = "gz" ok, msg = _request_host_shrink(name, action="compress", format=fmt) if ok: ext = ".xz" if fmt == "xz" else ".gz" return jsonify({"ok": True, "message": msg or f"Compressed to {name}{ext}"}) return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500) @app.route("/api/backups/", methods=["PATCH"]) @require_admin def api_backup_update(name): """Update backup metadata (display name, description) or rename the file.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 body = request.get_json(force=True, silent=True) or {} meta = _load_backups_meta() entry = meta.get(name, {}) new_filename = (body.get("filename") or "").strip() if new_filename: if not _safe_backup_name(new_filename): return jsonify({"ok": False, "error": "invalid new filename"}), 400 new_path = BACKUPS_DIR / new_filename if new_path.exists() and new_path != path: return jsonify({"ok": False, "error": "target filename already exists"}), 409 try: path.rename(new_path) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""} if name in meta: del meta[name] name = new_filename path = BACKUPS_DIR / name else: if "name" in body: entry["name"] = (body.get("name") or "").strip() or path.name if "description" in body: entry["description"] = (body.get("description") or "").strip() meta[name] = entry if not _save_backups_meta(meta): return jsonify({"ok": False, "error": "could not save metadata"}), 500 return jsonify({"ok": True, "name": name}) @app.route("/api/backups/", methods=["DELETE"]) @require_admin def api_backup_delete(name): """Delete a backup file. If it is the current golden image (symlink), the golden link is removed.""" if not _safe_backup_name(name): return jsonify({"ok": False, "error": "invalid backup name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"ok": False, "error": "backup not found"}), 404 try: if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink(): try: if (GOLDEN_IMAGE.resolve() == path.resolve()): GOLDEN_IMAGE.unlink() except OSError: pass path.unlink() meta = _load_backups_meta() meta.pop(name, None) _save_backups_meta(meta) return jsonify({"ok": True, "message": f"Deleted {name}"}) except (OSError, IOError) as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/backups/", methods=["GET"]) @require_admin def api_backup_download(name): if not _safe_backup_name(name): return jsonify({"error": "invalid name"}), 400 path = BACKUPS_DIR / name if not path.is_file(): return jsonify({"error": "not found"}), 404 return send_file(path, as_attachment=True, download_name=name) def _build_status_read(): try: if BUILD_STATUS_FILE.is_file(): with open(BUILD_STATUS_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"phase": "idle", "message": "", "output_name": None, "error": None} def _build_status_write(phase, message, output_name=None, error=None): try: BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(BUILD_STATUS_FILE, "w") as f: json.dump({ "phase": phase, "message": message, "output_name": output_name, "error": error, "updated": time.time(), }, f, indent=2) except OSError: pass def _raspios_latest_url(variant="desktop"): """Resolve latest Raspberry Pi OS (arm64) .img.xz URL. variant=lite|desktop|full.""" if variant == "lite": slug = "raspios_lite_arm64" elif variant == "full": slug = "raspios_full_arm64" else: slug = "raspios_arm64" # desktop (with desktop, not full) base = f"https://downloads.raspberrypi.com/{slug}/images" headers = {"User-Agent": "Mozilla/5.0 (compatible; CM4-Provisioning/1.0)"} try: req = urllib.request.Request(base + "/", headers=headers) with urllib.request.urlopen(req, timeout=20) as r: html = r.read().decode("utf-8", errors="ignore") folders = re.findall(rf"{re.escape(slug)}-(\d{{4}}-\d{{2}}-\d{{2}})/", html) if not folders: return None latest = sorted(folders)[-1] folder_url = f"{base}/{slug}-{latest}/" req2 = urllib.request.Request(folder_url, headers=headers) with urllib.request.urlopen(req2, timeout=20) as r: folder_html = r.read().decode("utf-8", errors="ignore") m = re.search(r'href="([^"]+\.img\.xz)"', folder_html) if not m: return None href = m.group(1) if href.startswith("http://") or href.startswith("https://"): return href return folder_url.rstrip("/") + "/" + href.lstrip("/") except Exception: return None def _load_cloudinit_templates(): try: if CLOUDINIT_TEMPLATES_FILE.is_file(): with open(CLOUDINIT_TEMPLATES_FILE, "r") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return {"templates": []} def _save_cloudinit_templates(data): try: CLOUDINIT_TEMPLATES_FILE.parent.mkdir(parents=True, exist_ok=True) with open(CLOUDINIT_TEMPLATES_FILE, "w") as f: json.dump(data, f, indent=2) return True except OSError: return False @app.route("/api/build-cloudinit-status") @require_admin def api_build_cloudinit_status(): """Return current build status (phase, message, output_name, error).""" return jsonify(_build_status_read()) @app.route("/api/build-cloudinit-cancel", methods=["POST"]) @require_admin def api_build_cloudinit_cancel(): """Request cancellation of the current build (host script checks for cancel file).""" try: BUILD_CANCEL_FILE.parent.mkdir(parents=True, exist_ok=True) BUILD_CANCEL_FILE.write_text("") return jsonify({"ok": True, "message": "Cancel requested. Build will stop at next check."}) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 @app.route("/api/build-cloudinit-status-clear", methods=["POST"]) @require_admin def api_build_cloudinit_status_clear(): """Clear build status to idle (so message is cleared after cancel/done/error). Use ?force=1 to clear even when status is stuck (e.g. build died during finalizing).""" st = _build_status_read() force = request.args.get("force") in ("1", "true", "yes") busy = st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving") if busy and not force: return jsonify({"ok": False, "error": "Cannot clear while build is in progress"}), 409 _build_status_write("idle", "", None, None) return jsonify({"ok": True}) @app.route("/api/build-cloudinit", methods=["POST"]) @require_admin def api_build_cloudinit(): """Start building a cloud-init image: write request file; host runs build (has loop devices).""" st = _build_status_read() if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"): return jsonify({"ok": False, "error": "A build is already in progress"}), 409 body = request.get_json(silent=True) or {} variant = (body.get("variant") or "desktop").strip().lower() if variant not in ("lite", "desktop", "full"): variant = "desktop" _build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…") url = _raspios_latest_url(variant) if not url: _build_status_write("idle", "", error="Could not resolve latest Raspios URL") return jsonify({"ok": False, "error": "Could not resolve latest image URL"}), 503 user_data = body.get("user_data") or DEFAULT_USER_DATA meta_data = body.get("meta_data") or DEFAULT_META_DATA network_config = body.get("network_config") or DEFAULT_NETWORK_CONFIG set_as_golden_after = bool(body.get("set_as_golden_after")) image_name = (body.get("image_name") or "").strip()[:64] image_name = re.sub(r"[^\w\-]", "", image_name) # only alphanumeric, underscore, dash try: BUILD_REQUEST_FILE.parent.mkdir(parents=True, exist_ok=True) with open(BUILD_REQUEST_FILE, "w") as f: json.dump({ "url": url, "variant": variant, "image_name": image_name or None, "user_data": user_data, "meta_data": meta_data, "network_config": network_config, "set_as_golden_after": set_as_golden_after, }, f, indent=2) except OSError as e: return jsonify({"ok": False, "error": str(e)}), 500 _build_status_write("resolving", "Build requested; host will run download and inject (check status).") return jsonify({"ok": True, "message": "Build started on host. Download and inject may take 15–45 min. Poll status or refresh the page."}), 202 @app.route("/api/raspios-latest-url") @require_admin def api_raspios_latest_url(): """Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|desktop|full.""" variant = (request.args.get("variant") or "desktop").strip().lower() if variant not in ("lite", "desktop", "full"): variant = "desktop" url = _raspios_latest_url(variant) if not url: return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503 return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1], "variant": variant}) @app.route("/api/cloudinit-templates", methods=["GET"]) @require_admin def api_cloudinit_templates_list(): """List saved cloud-init templates.""" data = _load_cloudinit_templates() return jsonify({"templates": data.get("templates", [])}) @app.route("/api/cloudinit-templates", methods=["POST"]) @require_admin def api_cloudinit_templates_create(): """Save a new cloud-init template.""" body = request.get_json(force=True, silent=True) or {} name = (body.get("name") or "").strip() if not name: return jsonify({"ok": False, "error": "name required"}), 400 data = _load_cloudinit_templates() templates = data.get("templates", []) tid = str(int(time.time() * 1000)) templates.append({ "id": tid, "name": name, "user_data": body.get("user_data", ""), "meta_data": body.get("meta_data", ""), "network_config": body.get("network_config", ""), }) data["templates"] = templates if not _save_cloudinit_templates(data): return jsonify({"ok": False, "error": "Failed to save"}), 500 return jsonify({"ok": True, "id": tid, "name": name}) @app.route("/api/cloudinit-templates/") @require_admin def api_cloudinit_templates_get(tid): """Get one template by id.""" data = _load_cloudinit_templates() for t in data.get("templates", []): if t.get("id") == tid: return jsonify(t) return jsonify({"error": "not found"}), 404 @app.route("/api/cloudinit-templates/", methods=["PUT"]) @require_admin def api_cloudinit_templates_update(tid): """Update an existing template (name, user_data, meta_data, network_config). Id unchanged.""" body = request.get_json(force=True, silent=True) or {} data = _load_cloudinit_templates() templates = data.get("templates", []) for i, t in enumerate(templates): if t.get("id") == tid: name = (body.get("name") or t.get("name") or "").strip() if not name: return jsonify({"ok": False, "error": "name required"}), 400 templates[i] = { "id": tid, "name": name, "user_data": body.get("user_data", t.get("user_data", "")), "meta_data": body.get("meta_data", t.get("meta_data", "")), "network_config": body.get("network_config", t.get("network_config", "")), } data["templates"] = templates if not _save_cloudinit_templates(data): return jsonify({"ok": False, "error": "Failed to save"}), 500 return jsonify({"ok": True, "id": tid, "name": name}) return jsonify({"ok": False, "error": "not found"}), 404 @app.route("/api/cloudinit-templates/", methods=["DELETE"]) @require_admin def api_cloudinit_templates_delete(tid): """Delete a template.""" data = _load_cloudinit_templates() templates = [t for t in data.get("templates", []) if t.get("id") != tid] if len(templates) == len(data.get("templates", [])): return jsonify({"ok": False, "error": "not found"}), 404 data["templates"] = templates if not _save_cloudinit_templates(data): return jsonify({"ok": False, "error": "Failed to save"}), 500 return jsonify({"ok": True}) @app.route("/api/golden-info") def api_golden_info(): """Return whether golden image exists, size/mtime, and which file it points to (for UI).""" if not GOLDEN_IMAGE.is_file(): return jsonify({"present": False}) try: st = GOLDEN_IMAGE.stat() src, src_name = _golden_current_source() out = {"present": True, "size": st.st_size, "mtime": st.st_mtime} if src and src_name: out["source"] = src out["name"] = src_name return jsonify(out) except OSError: return jsonify({"present": False}) @app.route("/api/admin/users", methods=["GET"]) @require_admin def api_admin_users(): return jsonify({"users": list_users()}) @app.route("/api/admin/users", methods=["POST"]) @require_admin def api_admin_add_user(): body = request.get_json(force=True, silent=True) or {} username = (body.get("username") or "").strip() password = (body.get("password") or "") if not username: return jsonify({"ok": False, "error": "username required"}), 400 if len(password) < 6: return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400 if create_user(username, password): admin_log("user_created", username) return jsonify({"ok": True, "message": f"User {username} created"}) return jsonify({"ok": False, "error": "username already exists"}), 409 @app.route("/api/admin/users//password", methods=["POST"]) @require_admin def api_admin_change_password(user_id): body = request.get_json(force=True, silent=True) or {} new_password = body.get("password") or "" if len(new_password) < 6: return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400 conn = get_db() row = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone() conn.close() if not row: return jsonify({"ok": False, "error": "user not found"}), 404 change_password(user_id, new_password) admin_log("password_changed", str(user_id)) return jsonify({"ok": True, "message": "Password updated"}) @app.route("/api/admin/logs") @require_admin def api_admin_logs(): limit = min(int(request.args.get("limit", 100)), 500) return jsonify({"logs": get_recent_logs(limit)}) # Ensure DB exists when app is loaded (e.g. by gunicorn or systemd) try: init_db() except Exception: pass if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)