Files
reterminal-dm4/emmc-provisioning/dashboard/app.py
nearxos ec973cc2b3 Add build cancellation feature to cloud-init process</message>
<message>Implement a new API endpoint for cancelling ongoing cloud-init builds, allowing users to request a build cancellation via the dashboard. Update the dashboard UI to include a cancel button that appears during the build process, enhancing user experience by providing control over long-running operations. Modify the build script to check for cancellation requests, ensuring that builds can be stopped gracefully. This feature improves usability and responsiveness in the cloud-init image building workflow.
2026-02-23 10:21:06 +02:00

1839 lines
71 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Flask dashboard for CM4 eMMC provisioning.
Public home: deploy only (status, logs, how to connect). No login.
Admin: login required — backups, cloud-init, portal files, set golden, users.
"""
import hashlib
import json
import os
import re
import shutil
import sqlite3
import subprocess
import tempfile
import time
import urllib.request
from functools import wraps
from pathlib import Path
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, session, Response, stream_with_context
from werkzeug.security import generate_password_hash, check_password_hash
try:
from itsdangerous import BadSignature
except ImportError:
BadSignature = Exception # noqa: disable invalid-name
app = Flask(__name__)
# Set CM4_DASHBOARD_SECRET_KEY in env (e.g. via /opt/cm4-provisioning/dashboard.env) so sessions persist across restarts
app.secret_key = os.environ.get("CM4_DASHBOARD_SECRET_KEY", os.urandom(24).hex())
# --- Paths ---
BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning"))
STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json"))
LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log"))
ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request"))
DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source"))
BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups")))
CLOUDINIT_IMAGES_DIR = Path(os.environ.get("CM4_CLOUDINIT_IMAGES_DIR", str(BASE_DIR / "cloudinit-images")))
PORTAL_FILES_DIR = Path(os.environ.get("CM4_PORTAL_FILES_DIR", str(BASE_DIR / "portal-files")))
GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img")))
NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json")))
BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json")))
BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR / "build_cloudinit_request.json")))
BUILD_CANCEL_FILE = BUILD_REQUEST_FILE.parent / "build_cloudinit_cancel"
SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json")))
SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json")))
FIRST_BOOT_STATUS_FILE = Path(os.environ.get("CM4_FIRST_BOOT_STATUS_FILE", str(BASE_DIR / "first_boot_status.json")))
CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json")))
PORTAL_DESCRIPTIONS_FILE = Path(os.environ.get("CM4_PORTAL_DESCRIPTIONS_FILE", str(BASE_DIR / "portal_descriptions.json")))
DB_PATH = Path(os.environ.get("CM4_DASHBOARD_DB", str(BASE_DIR / "dashboard.db")))
TOGGLE_NETWORK_BOOT_SCRIPT = os.environ.get("CM4_TOGGLE_NETWORK_BOOT_SCRIPT", "/opt/cm4-provisioning/toggle-network-boot-dhcp.sh")
DHCP_LEASES_FILE = os.environ.get("CM4_DHCP_LEASES_FILE", "/var/lib/misc/dnsmasq.leases")
# EEPROM update (USB boot): tools and output files (shared with host script)
EEPROM_DIR = Path(os.environ.get("CM4_EEPROM_DIR", "/opt/cm4-provisioning/eeprom"))
EEPROM_CONFIG_TOOL = EEPROM_DIR / "rpi-eeprom-config"
EEPROM_FW_FILE = EEPROM_DIR / "pieeprom.bin"
EEPROM_UPD_FILE = BASE_DIR / "pieeprom.upd"
EEPROM_SIG_FILE = BASE_DIR / "pieeprom.sig"
# --- Database (admin users + activity logs) ---
def get_db():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(DB_PATH.parent, exist_ok=True)
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS admin_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT NOT NULL,
details TEXT,
created_at REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_logs_created ON admin_logs(created_at DESC);
""")
conn.commit()
conn.close()
def admin_log(action, details=None):
user_id = session.get("user_id")
conn = get_db()
conn.execute(
"INSERT INTO admin_logs (user_id, action, details, created_at) VALUES (?, ?, ?, ?)",
(user_id, action, details, time.time()),
)
conn.commit()
conn.close()
def require_admin(f):
@wraps(f)
def wrapped(*args, **kwargs):
try:
logged_in = session.get("admin_logged_in")
except (BadSignature, ValueError, OSError):
# Invalid/rotated secret key or corrupted session cookie → treat as not logged in
try:
session.clear()
except Exception:
pass
logged_in = False
if not logged_in:
if request.is_json or request.path.startswith("/api/"):
return jsonify({"ok": False, "error": "Login required"}), 401
return redirect(url_for("login", next=request.url))
return f(*args, **kwargs)
return wrapped
def get_user_by_username(username):
conn = get_db()
row = conn.execute("SELECT id, username, password_hash FROM users WHERE username = ?", (username,)).fetchone()
conn.close()
return dict(row) if row else None
def create_user(username, password):
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
(username, generate_password_hash(password), time.time()),
)
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def list_users():
conn = get_db()
rows = conn.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall()
conn.close()
return [{"id": r["id"], "username": r["username"], "created_at": r["created_at"]} for r in rows]
def change_password(user_id, new_password):
conn = get_db()
conn.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(generate_password_hash(new_password), user_id),
)
conn.commit()
conn.close()
def get_recent_logs(limit=100):
conn = get_db()
rows = conn.execute(
"SELECT l.id, l.action, l.details, l.created_at, u.username FROM admin_logs l LEFT JOIN users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT ?",
(limit,),
).fetchall()
conn.close()
return [{"id": r["id"], "action": r["action"], "details": r["details"], "created_at": r["created_at"], "username": r["username"]} for r in rows]
@app.after_request
def no_cache(response):
if request.path == "/" or request.path.startswith("/api/") or request.path.startswith("/admin") or request.path.startswith("/login"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
@app.errorhandler(500)
def handle_500(err):
"""Log 500 and return a simple response so the user is not left with a blank error."""
import traceback
try:
traceback.print_exc()
except Exception:
pass
if request.path.startswith("/api/"):
return jsonify({"ok": False, "error": "Internal server error"}), 500
return (
"<!DOCTYPE html><html><head><meta charset='utf-8'><title>Error</title></head><body>"
"<h1>Something went wrong</h1><p>Try <a href='/login'>logging in again</a> or <a href='/'>going home</a>.</p>"
"</body></html>",
500,
{"Content-Type": "text/html; charset=utf-8"},
)
# Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition)
DEFAULT_USER_DATA = """#cloud-config
package_update: true
package_upgrade: false
packages:
- curl
runcmd:
- curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh
- chmod +x /tmp/bootstrap.sh
- /tmp/bootstrap.sh
"""
DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001
local-hostname: gnss.guard
"""
DEFAULT_NETWORK_CONFIG = """version: 2
ethernets:
eth0:
dhcp4: true
"""
DEFAULT_STATUS = {
"phase": "idle",
"message": "Waiting for reTerminal in boot mode or network.",
"progress": None,
"updated": None,
}
def _write_status(phase, message, progress=None):
try:
os.makedirs(os.path.dirname(STATUS_FILE) or ".", exist_ok=True)
with open(STATUS_FILE, "w") as f:
json.dump({"phase": phase, "message": message, "progress": progress, "updated": time.time()}, f)
except (PermissionError, OSError):
pass
def _generate_eeprom_update(boot_order, extra_settings=None):
"""Generate pieeprom.upd and pieeprom.sig in BASE_DIR. Returns (True, None) or (False, error_message)."""
extra_settings = extra_settings or {}
if not EEPROM_CONFIG_TOOL.is_file() or not EEPROM_FW_FILE.is_file():
return False, "EEPROM tools not installed. Run install-eeprom-tools-on-lxc.sh on the LXC."
try:
os.makedirs(BASE_DIR, exist_ok=True)
# Read current config from firmware image
out = subprocess.run(
[str(EEPROM_CONFIG_TOOL), str(EEPROM_FW_FILE)],
capture_output=True,
text=True,
timeout=30,
cwd=str(EEPROM_DIR),
)
if out.returncode != 0:
return False, (out.stderr or out.stdout or "rpi-eeprom-config failed").strip()[:500]
lines = (out.stdout or "").strip().splitlines()
# Strip lines we replace
skip_keys = {"BOOT_ORDER", "NET_BOOT_MAX_RETRIES", "DHCP_TIMEOUT", "DHCP_REQ_TIMEOUT", "TFTP_IP", "NET_INSTALL_AT_POWER_ON"}
new_lines = []
for line in lines:
key = line.split("=", 1)[0].strip() if "=" in line else ""
if key in skip_keys:
continue
new_lines.append(line)
# Add our settings
new_lines.append(f"BOOT_ORDER={boot_order}")
new_lines.append("NET_BOOT_MAX_RETRIES=3")
new_lines.append("DHCP_TIMEOUT=1500")
new_lines.append("DHCP_REQ_TIMEOUT=500")
new_lines.append("NET_INSTALL_AT_POWER_ON=0")
for k, v in extra_settings.items():
new_lines.append(f"{k}={v}")
with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f:
f.write("\n".join(new_lines) + "\n")
conf_path = f.name
try:
out2 = subprocess.run(
[str(EEPROM_CONFIG_TOOL), "--config", conf_path, "--out", str(EEPROM_UPD_FILE), str(EEPROM_FW_FILE)],
capture_output=True,
text=True,
timeout=60,
cwd=str(EEPROM_DIR),
)
if out2.returncode != 0:
return False, (out2.stderr or out2.stdout or "rpi-eeprom-config --config failed").strip()[:500]
if not EEPROM_UPD_FILE.is_file() or EEPROM_UPD_FILE.stat().st_size == 0:
return False, "Generated pieeprom.upd is missing or empty"
# Write signature (sha256 hex)
h = hashlib.sha256()
with open(EEPROM_UPD_FILE, "rb") as f:
h.update(f.read())
EEPROM_SIG_FILE.write_text(h.hexdigest())
return True, None
finally:
try:
os.unlink(conf_path)
except OSError:
pass
except subprocess.TimeoutExpired:
return False, "Timeout running rpi-eeprom-config"
except (PermissionError, OSError) as e:
return False, str(e)
def read_status():
try:
with open(STATUS_FILE, "r") as f:
data = json.load(f)
out = {**DEFAULT_STATUS, **data}
if out.get("phase") == "waiting_choice":
try:
with open(DEVICE_SOURCE_FILE, "r") as sf:
out["device_source"] = (sf.read() or "").strip() or "usb"
except (FileNotFoundError, OSError):
out["device_source"] = "usb"
return out
except (FileNotFoundError, json.JSONDecodeError):
return DEFAULT_STATUS
def read_log_tail(lines=50):
try:
with open(LOG_FILE, "r") as f:
all_lines = f.readlines()
return "".join(all_lines[-lines:]).strip() if all_lines else ""
except (FileNotFoundError, PermissionError):
return ""
def _load_network_devices():
try:
if NETWORK_DEVICES_FILE.is_file():
with open(NETWORK_DEVICES_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"devices": []}
def _save_network_devices(data):
try:
os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True)
with open(NETWORK_DEVICES_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except (PermissionError, OSError):
return False
BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json"
CLOUDINIT_META_FILE = CLOUDINIT_IMAGES_DIR / "cloudinit_meta.json"
def _load_backups_meta():
try:
if BACKUPS_META_FILE.is_file():
with open(BACKUPS_META_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}
def _save_backups_meta(data):
try:
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
with open(BACKUPS_META_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except (PermissionError, OSError):
return False
def _safe_backup_name(name):
"""Reject path traversal and ensure it's a backup filename we manage."""
if not name or ".." in name or "/" in name or "\\" in name:
return False
if not name.endswith((".img", ".img.gz", ".img.xz")):
return False
return True
def _load_cloudinit_meta():
try:
if CLOUDINIT_META_FILE.is_file():
with open(CLOUDINIT_META_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}
def _save_cloudinit_meta(data):
try:
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
with open(CLOUDINIT_META_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except (PermissionError, OSError):
return False
def list_backups():
if not BACKUPS_DIR.is_dir():
return []
meta = _load_backups_meta()
out = []
for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
try:
st = p.stat()
m = meta.get(p.name, {})
out.append({
"name": p.name,
"display_name": m.get("name") or p.name,
"description": m.get("description") or "",
"size": st.st_size,
"mtime": st.st_mtime,
})
except OSError:
pass
return out
def list_cloudinit_images():
if not CLOUDINIT_IMAGES_DIR.is_dir():
return []
meta = _load_cloudinit_meta()
out = []
for p in sorted(CLOUDINIT_IMAGES_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if p.is_file() and p.name != "cloudinit_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
try:
st = p.stat()
m = meta.get(p.name, {})
out.append({
"name": p.name,
"display_name": m.get("name") or p.name,
"description": m.get("description") or "",
"size": st.st_size,
"mtime": st.st_mtime,
})
except OSError:
pass
return out
def _golden_current_source():
"""Return (source, name) if golden is a symlink to backups or cloudinit, else (None, None)."""
if not GOLDEN_IMAGE.exists():
return None, None
try:
target = GOLDEN_IMAGE.resolve()
try:
target.relative_to(BACKUPS_DIR.resolve())
return "backups", target.name
except ValueError:
pass
try:
target.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
return "cloudinit", target.name
except ValueError:
pass
except OSError:
pass
return None, None
@app.route("/")
def index():
return render_template("home.html")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
if session.get("admin_logged_in"):
return redirect(url_for("admin"))
return render_template("login.html")
username = (request.form.get("username") or "").strip()
password = (request.form.get("password") or "")
if not username:
return render_template("login.html", error="Username required")
user = get_user_by_username(username)
if not user:
# First user: allow self-registration
if not list_users():
if not password or len(password) < 6:
return render_template("login.html", error="First user: choose a password (min 6 characters)")
create_user(username, password)
session["admin_logged_in"] = True
session["user_id"] = get_user_by_username(username)["id"]
session["username"] = username
admin_log("first_admin_created", username)
return redirect(request.args.get("next") or url_for("admin"))
return render_template("login.html", error="Invalid username or password")
if not check_password_hash(user["password_hash"], password):
return render_template("login.html", error="Invalid username or password")
session["admin_logged_in"] = True
session["user_id"] = user["id"]
session["username"] = user["username"]
admin_log("login", username)
return redirect(request.args.get("next") or url_for("admin"))
@app.route("/logout")
def logout():
if session.get("admin_logged_in"):
admin_log("logout", session.get("username"))
session.clear()
return redirect(url_for("index"))
@app.route("/admin")
@require_admin
def admin():
return render_template("admin.html", username=session.get("username", "Admin"))
@app.route("/admin/portal-files")
@require_admin
def admin_portal_files():
return render_template("portal_files.html", username=session.get("username", "Admin"))
@app.route("/admin/cloudinit-build")
@require_admin
def admin_cloudinit_build():
return render_template("cloudinit_build.html", username=session.get("username", "Admin"))
# Serve portal files for wget (e.g. cloud-init first boot). No auth.
# Subpaths allowed (e.g. first-boot/splash.png); ".." and "\\" forbidden to prevent traversal.
@app.route("/files/<path:filename>")
def serve_portal_file(filename):
if ".." in filename or "\\" in filename:
return jsonify({"error": "invalid path"}), 400
path = (PORTAL_FILES_DIR / filename).resolve()
try:
path.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return jsonify({"error": "invalid path"}), 400
if not path.is_file():
return jsonify({"error": "not found"}), 404
return send_file(path, as_attachment=False, download_name=filename.split("/")[-1])
@app.route("/api/portal-files-debug")
def api_portal_files_debug():
"""No-auth debug: what PORTAL_FILES_DIR the process sees (for troubleshooting)."""
try:
names = []
if PORTAL_FILES_DIR.is_dir():
for p in sorted(PORTAL_FILES_DIR.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
if ".." in p.name or p.name.startswith("."):
continue
names.append({"name": p.name, "type": "dir" if p.is_dir() else "file"})
return jsonify({
"portal_files_dir": str(PORTAL_FILES_DIR),
"exists": PORTAL_FILES_DIR.exists(),
"is_dir": PORTAL_FILES_DIR.is_dir(),
"items": names,
"CM4_PROVISIONING_DIR": os.environ.get("CM4_PROVISIONING_DIR"),
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/status")
def api_status():
return jsonify(read_status())
@app.route("/api/status-clear", methods=["POST"])
def api_status_clear():
"""Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again)."""
try:
with open(STATUS_FILE, "w") as f:
json.dump({
"phase": "idle",
"message": DEFAULT_STATUS["message"],
"progress": None,
"updated": None,
}, f)
return jsonify({"ok": True})
except (PermissionError, OSError):
return jsonify({"ok": False, "error": "Could not write status"}), 500
def _first_boot_status_read():
"""Read first-boot status (device reports progress during first-boot.sh)."""
try:
with open(FIRST_BOOT_STATUS_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"phase": "idle", "message": "", "step": "", "step_name": "", "hostname": "", "ip": "", "updated": None}
def _first_boot_status_write(phase, message="", step="", step_name="", hostname="", ip="", error=None):
"""Write first-boot status (called by POST from device)."""
try:
os.makedirs(os.path.dirname(FIRST_BOOT_STATUS_FILE) or ".", exist_ok=True)
data = {
"phase": phase,
"message": message,
"step": step,
"step_name": step_name,
"hostname": hostname,
"ip": ip or "",
"updated": time.time(),
}
if error:
data["error"] = error
with open(FIRST_BOOT_STATUS_FILE, "w") as f:
json.dump(data, f, indent=2)
except (PermissionError, OSError):
pass
@app.route("/api/first-boot-status", methods=["GET"])
def api_first_boot_status_get():
"""Return current first-boot progress (for dashboard to poll)."""
return jsonify(_first_boot_status_read())
@app.route("/api/first-boot-status", methods=["POST"])
def api_first_boot_status_post():
"""Called by device during first-boot.sh to report progress. No auth (device on local net)."""
body = request.get_json(silent=True) or {}
phase = (body.get("phase") or "running").strip().lower()
if phase not in ("started", "running", "done", "error"):
phase = "running"
message = (body.get("message") or "").strip()[:500]
step = (body.get("step") or "").strip()[:10]
step_name = (body.get("step_name") or "").strip()[:80]
hostname = (body.get("hostname") or "").strip()[:64]
ip = (body.get("ip") or "").strip()[:45]
error = (body.get("error") or "").strip()[:500] if phase == "error" else None
_first_boot_status_write(phase=phase, message=message, step=step, step_name=step_name, hostname=hostname, ip=ip, error=error)
return jsonify({"ok": True})
@app.route("/api/log")
def api_log():
return jsonify({"log": read_log_tail()})
@app.route("/api/eeprom-presets")
def api_eeprom_presets():
"""Return available boot order presets for the Update EEPROM dropdown."""
presets = [
{"id": "0x1", "label": "eMMC only"},
{"id": "0xf21", "label": "eMMC first, then network"},
{"id": "0xf12", "label": "Network first, then eMMC"},
]
return jsonify({"presets": presets})
@app.route("/api/pending-devices")
def api_pending_devices():
"""Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy."""
st = read_status()
usb = None
if st.get("phase") == "waiting_choice":
usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")}
data = _load_network_devices()
network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")]
return jsonify({"usb": usb, "network": network})
@app.route("/api/device-action", methods=["POST"])
def api_device_action():
"""User chose Backup, Deploy, or Update EEPROM for a device. source=usb | network; for network pass mac=."""
body = request.get_json(force=True, silent=True) or {}
source = (body.get("source") or "").strip().lower()
action = (body.get("action") or "").strip().lower()
if action not in ("backup", "deploy", "reboot", "eeprom_update"):
return jsonify({"ok": False, "error": "action must be 'backup', 'deploy', 'reboot', or 'eeprom_update'"}), 400
if action == "reboot" and source != "network":
return jsonify({"ok": False, "error": "'reboot' is only for network devices"}), 400
if action == "eeprom_update" and source != "usb":
return jsonify({"ok": False, "error": "'eeprom_update' is only for USB-connected devices"}), 400
if source == "usb":
if action == "eeprom_update":
boot_order = (body.get("boot_order") or "0xf21").strip().lower()
if boot_order not in ("0x1", "0xf21", "0xf12"):
return jsonify({"ok": False, "error": "boot_order must be 0x1, 0xf21, or 0xf12"}), 400
ok, err = _generate_eeprom_update(boot_order)
if not ok:
return jsonify({"ok": False, "error": err or "Failed to generate EEPROM update"}), 500
try:
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
with open(ACTION_REQUEST_FILE, "w") as f:
f.write("eeprom_update")
_write_status("eeprom_update", "EEPROM update ready; host will write to device boot partition.")
return jsonify({"ok": True})
except (PermissionError, OSError):
return jsonify({"ok": False, "error": "Could not write action file"}), 500
try:
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
# If user requested "shrink after backup", create flag so host runs PiShrink after dd
if action == "backup" and body.get("shrink"):
try:
(BASE_DIR / "shrink_next_backup").write_text("1")
except (PermissionError, OSError):
pass # host may still have SHRINK_BACKUP=1
with open(ACTION_REQUEST_FILE, "w") as f:
f.write(action)
if action == "deploy":
_first_boot_status_write("idle", "", hostname="", ip="")
return jsonify({"ok": True})
except (PermissionError, OSError):
return jsonify({"ok": False, "error": "Could not write action file"}), 500
if source == "network":
mac = (body.get("mac") or "").strip()
if not mac:
return jsonify({"ok": False, "error": "mac required for network device"}), 400
data = _load_network_devices()
for d in data.get("devices", []):
if (d.get("mac") or "").lower() == mac.lower():
d["action"] = action
d["action_at"] = time.time()
_save_network_devices(data)
ip = d.get("ip") or mac
if action == "deploy":
_write_status("flashing", f"Deploying to {ip} (network)...")
_first_boot_status_write("idle", "", hostname="", ip="")
elif action == "backup":
_write_status("backup", f"Backing up {ip} (network)...")
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "Device not found"}), 404
return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400
@app.route("/api/register-device", methods=["POST"])
def api_register_device():
"""Called by a network-booted device to register (mac, ip)."""
body = request.get_json(force=True, silent=True) or request.form
mac = (body.get("mac") or "").strip()
ip = (body.get("ip") or request.remote_addr or "").strip()
if not mac:
return jsonify({"ok": False, "error": "mac required"}), 400
data = _load_network_devices()
devices = data.get("devices", [])
for d in devices:
if (d.get("mac") or "").lower() == mac.lower():
d["ip"] = ip
d["registered_at"] = time.time()
d["action"] = d.get("action") or "wait"
_save_network_devices(data)
return jsonify({"ok": True, "message": "registered"})
devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"})
data["devices"] = devices
_save_network_devices(data)
return jsonify({"ok": True, "message": "registered"})
def _dhcp_network_boot_run(cmd):
"""Run toggle script with enable|disable|status. Returns (ok, output_or_error)."""
if not os.path.isfile(TOGGLE_NETWORK_BOOT_SCRIPT) or not os.access(TOGGLE_NETWORK_BOOT_SCRIPT, os.X_OK):
return False, "Toggle script not installed"
try:
out = subprocess.run(
[TOGGLE_NETWORK_BOOT_SCRIPT, cmd],
capture_output=True,
text=True,
timeout=10,
)
if out.returncode != 0:
return False, (out.stderr or out.stdout or "script failed").strip()
return True, (out.stdout or "").strip()
except subprocess.TimeoutExpired:
return False, "Timeout"
except Exception as e:
return False, str(e)
@app.route("/api/dhcp-network-boot", methods=["GET"])
def api_dhcp_network_boot_get():
"""Return whether DHCP network-boot options (66/67) are enabled."""
ok, out = _dhcp_network_boot_run("status")
if not ok:
return jsonify({"enabled": None, "error": out}), 200
return jsonify({"enabled": out.strip().lower() == "enabled"})
@app.route("/api/dhcp-network-boot", methods=["POST"])
def api_dhcp_network_boot_post():
"""Enable or disable DHCP network-boot options (DHCP server keeps running). Body: { \"enabled\": true|false }."""
body = request.get_json(force=True, silent=True) or {}
enabled = body.get("enabled")
if enabled is None:
return jsonify({"ok": False, "error": "enabled required (true|false)"}), 400
cmd = "enable" if enabled else "disable"
ok, out = _dhcp_network_boot_run(cmd)
if not ok:
return jsonify({"ok": False, "error": out}), 500
return jsonify({"ok": True, "enabled": enabled})
@app.route("/api/action-done", methods=["POST"])
def api_action_done():
"""Called by a device when deploy or backup has completed. Disables DHCP network-boot so the device boots from eMMC next time."""
mac = request.args.get("mac") or ((request.get_json(silent=True) or {}).get("mac") or "")
# Remove device from network-devices list so it doesn't keep showing
if mac:
data = _load_network_devices()
data["devices"] = [d for d in data.get("devices", []) if (d.get("mac") or "").lower() != mac.lower()]
_save_network_devices(data)
ok, _ = _dhcp_network_boot_run("disable")
_write_status("done", f"Done ({mac or 'network device'}). Network boot disabled; device will boot from eMMC on next boot.")
if not ok:
return jsonify({"ok": False, "error": "Could not disable DHCP network boot"}), 500
return jsonify({"ok": True, "message": "Network boot disabled; device will boot from eMMC on next boot"})
def _read_dhcp_leases():
"""Read dnsmasq lease file. Returns (leases_list, error_string). leases_list items: {expiry, mac, ip, hostname}."""
if not DHCP_LEASES_FILE or not os.path.isfile(DHCP_LEASES_FILE):
return [], None
try:
leases = []
with open(DHCP_LEASES_FILE, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 4:
leases.append({
"expiry": int(parts[0]) if parts[0].isdigit() else 0,
"mac": parts[1],
"ip": parts[2],
"hostname": parts[3] if len(parts) > 3 else "",
})
return leases, None
except (OSError, PermissionError) as e:
return [], str(e)
@app.route("/api/dhcp-leases")
def api_dhcp_leases():
"""Return current DHCP leases from dnsmasq (when dashboard runs on LXC with dnsmasq)."""
leases, err = _read_dhcp_leases()
if err:
return jsonify({"leases": [], "error": err})
return jsonify({"leases": leases, "error": None})
@app.route("/api/device-action-poll")
def api_device_action_poll():
"""Network device polls this to get its assigned action (deploy/backup) and URL."""
mac = (request.args.get("mac") or "").strip()
if not mac:
return jsonify({"action": "wait"}), 200
data = _load_network_devices()
base = request.host_url.rstrip("/")
for d in data.get("devices", []):
if (d.get("mac") or "").lower() == mac.lower():
action = d.get("action") or "wait"
if action == "deploy":
return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"})
if action == "backup":
return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"})
if action == "reboot":
return jsonify({"action": "reboot"})
return jsonify({"action": "wait"})
return jsonify({"action": "wait"})
@app.route("/api/golden-image")
def api_golden_image():
"""Stream the golden image for network deploy (device pulls and writes to eMMC). Decompresses .img.xz/.img.gz on the fly."""
if not GOLDEN_IMAGE.is_file():
return jsonify({"error": "Golden image not found"}), 404
resolved = GOLDEN_IMAGE.resolve()
if str(resolved).endswith(".img.xz"):
def stream():
proc = subprocess.Popen(
["xz", "-d", "-c", str(resolved)],
stdout=subprocess.PIPE,
bufsize=65536,
)
try:
while True:
chunk = proc.stdout.read(65536)
if not chunk:
break
yield chunk
finally:
proc.wait()
return Response(
stream_with_context(stream()),
mimetype="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=golden.img"},
)
if str(resolved).endswith(".img.gz"):
def stream():
proc = subprocess.Popen(
["gzip", "-c", "-d", str(resolved)],
stdout=subprocess.PIPE,
bufsize=65536,
)
try:
while True:
chunk = proc.stdout.read(65536)
if not chunk:
break
yield chunk
finally:
proc.wait()
return Response(
stream_with_context(stream()),
mimetype="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=golden.img"},
)
return send_file(
GOLDEN_IMAGE,
mimetype="application/octet-stream",
as_attachment=True,
download_name="golden.img",
)
@app.route("/api/backup-upload", methods=["POST"])
def api_backup_upload():
"""Network device uploads its eMMC backup (raw body)."""
mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20]
if not mac:
return jsonify({"error": "mac query param required"}), 400
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
name = f"backup-net-{mac}-{int(time.time())}.img"
path = BACKUPS_DIR / name
try:
with open(path, "wb") as f:
while True:
chunk = request.stream.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
return jsonify({"ok": True, "file": name})
except (OSError, IOError) as e:
if path.exists():
path.unlink(missing_ok=True)
return jsonify({"error": str(e)}), 500
@app.route("/api/backups")
@require_admin
def api_backups():
return jsonify({"backups": list_backups(), "backups_dir": str(BACKUPS_DIR)})
@app.route("/api/cloudinit-images")
@require_admin
def api_cloudinit_images():
return jsonify({"images": list_cloudinit_images(), "cloudinit_dir": str(CLOUDINIT_IMAGES_DIR)})
def _load_portal_descriptions():
"""Return dict mapping path -> description (path can be file or folder)."""
if not PORTAL_DESCRIPTIONS_FILE.is_file():
return {}
try:
with open(PORTAL_DESCRIPTIONS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def _save_portal_descriptions(descriptions):
try:
PORTAL_DESCRIPTIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PORTAL_DESCRIPTIONS_FILE, "w", encoding="utf-8") as f:
json.dump(descriptions, f, indent=2)
return True
except OSError:
return False
def _portal_files_list_impl(subpath):
"""Shared impl for listing portal files. Returns (items, descriptions, base_url, current_path)."""
base_url = request.host_url.rstrip("/") + "/files/"
empty_items = []
if ".." in subpath or "\\" in subpath:
return empty_items, {}, base_url, subpath
if not PORTAL_FILES_DIR.is_dir():
try:
PORTAL_FILES_DIR.mkdir(parents=True, exist_ok=True)
except OSError:
pass
if not PORTAL_FILES_DIR.is_dir():
return empty_items, {}, base_url, subpath
list_dir = (PORTAL_FILES_DIR / subpath).resolve() if subpath else PORTAL_FILES_DIR
try:
list_dir.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return empty_items, {}, base_url, subpath
if not list_dir.is_dir():
return empty_items, {}, base_url, subpath
items = []
for p in sorted(list_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
if ".." in p.name or p.name.startswith("."):
continue
rel = (subpath + "/" + p.name) if subpath else p.name
try:
if p.is_dir():
items.append({"type": "folder", "path": rel, "name": p.name})
else:
items.append({"type": "file", "path": rel, "name": p.name, "size": p.stat().st_size, "mtime": p.stat().st_mtime})
except OSError:
pass
descriptions = _load_portal_descriptions()
return items, descriptions, base_url, subpath
@app.route("/api/portal-files")
def api_portal_files_list():
"""List one level: root or contents of path=... (folders and files). ?debug=1 allows unauthenticated read-only list."""
subpath = request.args.get("path", "").strip().strip("/")
debug = request.args.get("debug") == "1"
if not debug and not session.get("admin_logged_in"):
if request.is_json or request.path.startswith("/api/"):
return jsonify({"ok": False, "error": "Login required"}), 401
return redirect(url_for("login", next=request.url))
items, descriptions, base_url, subpath = _portal_files_list_impl(subpath)
resp = jsonify({
"items": items,
"base_url": base_url,
"descriptions": descriptions,
"current_path": subpath,
"portal_files_dir": str(PORTAL_FILES_DIR),
})
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
return resp
@app.route("/api/portal-files/descriptions", methods=["GET", "PATCH"])
@require_admin
def api_portal_descriptions():
if request.method == "PATCH":
data = request.get_json(force=True, silent=True) or {}
desc = data.get("descriptions")
if not isinstance(desc, dict):
return jsonify({"ok": False, "error": "descriptions must be a dict"}), 400
if not _save_portal_descriptions(desc):
return jsonify({"ok": False, "error": "save failed"}), 500
admin_log("portal_descriptions", "updated")
return jsonify({"ok": True})
return jsonify({"descriptions": _load_portal_descriptions()})
@app.route("/api/portal-files/folder", methods=["POST"])
@require_admin
def api_portal_folder_create():
data = request.get_json(force=True, silent=True) or {}
path = (data.get("path") or "").strip().strip("/")
if not path:
return jsonify({"ok": False, "error": "path required"}), 400
if ".." in path or "\\" in path:
return jsonify({"ok": False, "error": "invalid path"}), 400
full = (PORTAL_FILES_DIR / path).resolve()
try:
full.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return jsonify({"ok": False, "error": "invalid path"}), 400
try:
full.mkdir(parents=True, exist_ok=True)
admin_log("portal_folder_create", path)
return jsonify({"ok": True, "path": path})
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/portal-files/upload", methods=["POST"])
@require_admin
def api_portal_files_upload():
if "file" not in request.files and "upload" not in request.files:
return jsonify({"ok": False, "error": "no file (use field 'file' or 'upload')"}), 400
f = request.files.get("file") or request.files.get("upload")
if not f or not f.filename:
return jsonify({"ok": False, "error": "no file selected"}), 400
base_name = re.sub(r"[^\w\-./]", "_", f.filename)[:120] or "upload"
if ".." in base_name or base_name.startswith("/"):
return jsonify({"ok": False, "error": "invalid filename"}), 400
subpath = (request.form.get("path") or "").strip().strip("/")
if subpath and (".." in subpath or "\\" in subpath):
return jsonify({"ok": False, "error": "invalid path"}), 400
name = (subpath + "/" + base_name) if subpath else base_name
path = (PORTAL_FILES_DIR / name).resolve()
try:
path.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return jsonify({"ok": False, "error": "invalid path"}), 400
try:
path.parent.mkdir(parents=True, exist_ok=True)
f.save(str(path))
admin_log("portal_upload", name)
return jsonify({"ok": True, "name": name, "url": request.host_url.rstrip("/") + "/files/" + name})
except (OSError, IOError) as e:
if path.exists():
path.unlink(missing_ok=True)
return jsonify({"ok": False, "error": str(e)}), 500
# Text extensions allowed for editor (others are read-only or binary)
_PORTAL_EDITABLE_EXTENSIONS = frozenset(
".sh .bash .conf .cfg .ini .desktop .json .py .yml .yaml .md .txt .plymouth .script".split()
)
def _portal_language_for_path(name):
"""Return Ace/editor language mode from filename."""
n = (name or "").lower()
if n.endswith(".sh") or n.endswith(".bash"):
return "sh"
if n.endswith(".json"):
return "json"
if n.endswith(".py"):
return "python"
if n.endswith(".yml") or n.endswith(".yaml"):
return "yaml"
if n.endswith(".md"):
return "markdown"
if n.endswith(".conf") or n.endswith(".cfg") or n.endswith(".ini") or n.endswith(".desktop") or n.endswith(".plymouth"):
return "ini"
if n.endswith(".script"):
return "sh"
return "plain_text"
@app.route("/api/portal-files/content", methods=["GET"])
@require_admin
def api_portal_file_content_get():
"""Return file content as text for the editor. path= query param. Rejects binary."""
path_arg = (request.args.get("path") or "").strip()
if not path_arg or ".." in path_arg or "\\" in path_arg:
return jsonify({"ok": False, "error": "invalid path"}), 400
path = (PORTAL_FILES_DIR / path_arg).resolve()
try:
path.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return jsonify({"ok": False, "error": "invalid path"}), 400
if not path.is_file():
return jsonify({"ok": False, "error": "not found"}), 404
ext = path.suffix.lower() if path.suffix else ""
if ext and ext not in _PORTAL_EDITABLE_EXTENSIONS:
return jsonify({"ok": False, "error": "binary or unsupported file type for editing"}), 400
try:
raw = path.read_bytes()
text = raw.decode("utf-8")
except UnicodeDecodeError:
return jsonify({"ok": False, "error": "binary or unsupported encoding"}), 400
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
return jsonify({
"ok": True,
"path": path_arg,
"content": text,
"language": _portal_language_for_path(path.name),
})
@app.route("/api/portal-files/content", methods=["PUT"])
@require_admin
def api_portal_file_content_put():
"""Write file content from editor. JSON body: { \"path\": \"...\", \"content\": \"...\" }."""
data = request.get_json(silent=True) or {}
path_arg = (data.get("path") or "").strip()
content = data.get("content")
if not path_arg or ".." in path_arg or "\\" in path_arg:
return jsonify({"ok": False, "error": "invalid path"}), 400
if content is None:
return jsonify({"ok": False, "error": "content required"}), 400
path = (PORTAL_FILES_DIR / path_arg).resolve()
try:
path.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return jsonify({"ok": False, "error": "invalid path"}), 400
if path.is_dir():
return jsonify({"ok": False, "error": "cannot edit a folder"}), 400
ext = path.suffix.lower() if path.suffix else ""
if path.exists() and ext and ext not in _PORTAL_EDITABLE_EXTENSIONS:
return jsonify({"ok": False, "error": "unsupported file type for editing"}), 400
if not isinstance(content, str):
content = str(content)
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
admin_log("portal_edit", path_arg)
return jsonify({"ok": True, "path": path_arg})
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/portal-files/<path:name>", methods=["DELETE"])
@require_admin
def api_portal_file_delete(name):
if ".." in name or "\\" in name:
return jsonify({"ok": False, "error": "invalid name"}), 400
path = (PORTAL_FILES_DIR / name).resolve()
try:
path.relative_to(PORTAL_FILES_DIR.resolve())
except ValueError:
return jsonify({"ok": False, "error": "invalid path"}), 400
if path.is_file():
try:
path.unlink()
admin_log("portal_delete", name)
return jsonify({"ok": True})
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
if path.is_dir():
if any(path.iterdir()):
return jsonify({"ok": False, "error": "folder not empty"}), 400
try:
path.rmdir()
admin_log("portal_folder_delete", name)
return jsonify({"ok": True})
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
return jsonify({"ok": False, "error": "not found"}), 404
@app.route("/api/backups/upload", methods=["POST"])
@require_admin
def api_backups_upload():
"""Upload an image file from the dashboard (multipart form)."""
if "file" not in request.files and "image" not in request.files:
return jsonify({"ok": False, "error": "no file in request (use field 'file' or 'image')"}), 400
f = request.files.get("file") or request.files.get("image")
if not f or not f.filename:
return jsonify({"ok": False, "error": "no file selected"}), 400
base = (f.filename.rsplit(".", 1)[0] if "." in f.filename else f.filename).strip() or "upload"
safe_base = re.sub(r"[^\w\-.]", "_", base)[:80]
if not safe_base:
safe_base = "upload"
ext = ""
if f.filename.lower().endswith(".img.xz"):
ext = ".img.xz"
elif f.filename.lower().endswith(".img.gz"):
ext = ".img.gz"
elif f.filename.lower().endswith(".img"):
ext = ".img"
else:
ext = ".img"
name = f"{safe_base}-{int(time.time())}{ext}"
if not _safe_backup_name(name):
name = f"upload-{int(time.time())}.img"
path = BACKUPS_DIR / name
try:
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
f.save(str(path))
return jsonify({"ok": True, "name": name, "message": f"Uploaded {name}"})
except (OSError, IOError) as e:
if path.exists():
path.unlink(missing_ok=True)
return jsonify({"ok": False, "error": str(e)}), 500
def _set_golden_from_path(path):
"""Set golden image to point to given path (file in backups or cloudinit dir)."""
GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True)
if GOLDEN_IMAGE.exists():
GOLDEN_IMAGE.unlink()
path_resolved = path.resolve()
try:
path_resolved.relative_to(BACKUPS_DIR.resolve())
except ValueError:
try:
path_resolved.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
except ValueError:
shutil.copy2(path, GOLDEN_IMAGE)
return
os.symlink(path_resolved, GOLDEN_IMAGE)
@app.route("/api/backups/<path:name>/set-as-golden", methods=["POST"])
@require_admin
def api_backup_set_as_golden(name):
"""Use this backup as the golden image (symlink)."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
try:
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
_set_golden_from_path(path)
admin_log("set_golden", f"backups/{name}")
return jsonify({"ok": True, "message": f"Golden image set from backup {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/cloudinit-images/<path:name>/set-as-golden", methods=["POST"])
@require_admin
def api_cloudinit_set_as_golden(name):
"""Use this cloud-init image as the golden image (symlink)."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "image not found"}), 404
try:
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
_set_golden_from_path(path)
admin_log("set_golden", f"cloudinit/{name}")
return jsonify({"ok": True, "message": f"Golden image set from cloud-init image {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
def _request_host_shrink(name, action="shrink", format="xz"):
"""Write shrink request for host and poll shrink_status.json. Returns (ok, message_or_error)."""
req = {"name": name, "action": action}
if action == "compress":
req["format"] = "gz" if format == "gz" else "xz"
try:
SHRINK_REQUEST_FILE.write_text(json.dumps(req))
except OSError as e:
return False, str(e)
deadline = time.monotonic() + 2100 # 35 min
while time.monotonic() < deadline:
time.sleep(5)
if not SHRINK_STATUS_FILE.exists():
continue
try:
data = json.loads(SHRINK_STATUS_FILE.read_text())
except (OSError, ValueError):
continue
if data.get("name") != name:
continue
phase = data.get("phase")
if phase == "done":
return True, data.get("message") or f"Shrunk {name}"
if phase == "error":
return False, data.get("error") or "PiShrink failed"
return False, "Shrink timed out (run on host may still be in progress)"
@app.route("/api/cloudinit-images/<path:name>", methods=["GET"])
@require_admin
def api_cloudinit_download(name):
if not _safe_backup_name(name):
return jsonify({"error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"error": "not found"}), 404
return send_file(path, as_attachment=True, download_name=name)
@app.route("/api/cloudinit-images/<path:name>", methods=["PATCH"])
@require_admin
def api_cloudinit_update(name):
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "not found"}), 404
body = request.get_json(force=True, silent=True) or {}
meta = _load_cloudinit_meta()
entry = meta.get(name, {})
new_filename = (body.get("filename") or "").strip()
if new_filename:
if not _safe_backup_name(new_filename):
return jsonify({"ok": False, "error": "invalid new filename"}), 400
new_path = CLOUDINIT_IMAGES_DIR / new_filename
if new_path.exists() and new_path != path:
return jsonify({"ok": False, "error": "target filename already exists"}), 409
try:
path.rename(new_path)
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
meta.pop(name, None)
name = new_filename
path = CLOUDINIT_IMAGES_DIR / name
else:
if "name" in body:
entry["name"] = (body.get("name") or "").strip() or path.name
if "description" in body:
entry["description"] = (body.get("description") or "").strip()
meta[name] = entry
if not _save_cloudinit_meta(meta):
return jsonify({"ok": False, "error": "could not save metadata"}), 500
return jsonify({"ok": True, "name": name})
@app.route("/api/cloudinit-images/<path:name>", methods=["DELETE"])
@require_admin
def api_cloudinit_delete(name):
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "not found"}), 404
try:
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
try:
if GOLDEN_IMAGE.resolve() == path.resolve():
GOLDEN_IMAGE.unlink()
except OSError:
pass
path.unlink()
meta = _load_cloudinit_meta()
meta.pop(name, None)
_save_cloudinit_meta(meta)
admin_log("cloudinit_delete", name)
return jsonify({"ok": True, "message": f"Deleted {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/backups/<path:name>/shrink", methods=["POST"])
@require_admin
def api_backup_shrink(name):
"""Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
ok, msg = _request_host_shrink(name, action="shrink")
if ok:
return jsonify({"ok": True, "message": msg})
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
@app.route("/api/backups/<path:name>/compress", methods=["POST"])
@require_admin
def api_backup_compress(name):
"""Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
body = request.get_json(force=True, silent=True) or {}
fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower()
if fmt not in ("gz", "gzip", "xz"):
fmt = "xz"
if fmt == "gzip":
fmt = "gz"
ok, msg = _request_host_shrink(name, action="compress", format=fmt)
if ok:
ext = ".xz" if fmt == "xz" else ".gz"
return jsonify({"ok": True, "message": msg or f"Compressed to {name}{ext}"})
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
@app.route("/api/backups/<path:name>", methods=["PATCH"])
@require_admin
def api_backup_update(name):
"""Update backup metadata (display name, description) or rename the file."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
body = request.get_json(force=True, silent=True) or {}
meta = _load_backups_meta()
entry = meta.get(name, {})
new_filename = (body.get("filename") or "").strip()
if new_filename:
if not _safe_backup_name(new_filename):
return jsonify({"ok": False, "error": "invalid new filename"}), 400
new_path = BACKUPS_DIR / new_filename
if new_path.exists() and new_path != path:
return jsonify({"ok": False, "error": "target filename already exists"}), 409
try:
path.rename(new_path)
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
if name in meta:
del meta[name]
name = new_filename
path = BACKUPS_DIR / name
else:
if "name" in body:
entry["name"] = (body.get("name") or "").strip() or path.name
if "description" in body:
entry["description"] = (body.get("description") or "").strip()
meta[name] = entry
if not _save_backups_meta(meta):
return jsonify({"ok": False, "error": "could not save metadata"}), 500
return jsonify({"ok": True, "name": name})
@app.route("/api/backups/<path:name>", methods=["DELETE"])
@require_admin
def api_backup_delete(name):
"""Delete a backup file. If it is the current golden image (symlink), the golden link is removed."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
try:
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
try:
if (GOLDEN_IMAGE.resolve() == path.resolve()):
GOLDEN_IMAGE.unlink()
except OSError:
pass
path.unlink()
meta = _load_backups_meta()
meta.pop(name, None)
_save_backups_meta(meta)
return jsonify({"ok": True, "message": f"Deleted {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/backups/<path:name>", methods=["GET"])
@require_admin
def api_backup_download(name):
if not _safe_backup_name(name):
return jsonify({"error": "invalid name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"error": "not found"}), 404
return send_file(path, as_attachment=True, download_name=name)
def _build_status_read():
try:
if BUILD_STATUS_FILE.is_file():
with open(BUILD_STATUS_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"phase": "idle", "message": "", "output_name": None, "error": None}
def _build_status_write(phase, message, output_name=None, error=None):
try:
BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUILD_STATUS_FILE, "w") as f:
json.dump({
"phase": phase,
"message": message,
"output_name": output_name,
"error": error,
"updated": time.time(),
}, f, indent=2)
except OSError:
pass
def _raspios_latest_url(variant="desktop"):
"""Resolve latest Raspberry Pi OS (arm64) .img.xz URL. variant=lite|desktop|full."""
if variant == "lite":
slug = "raspios_lite_arm64"
elif variant == "full":
slug = "raspios_full_arm64"
else:
slug = "raspios_arm64" # desktop (with desktop, not full)
base = f"https://downloads.raspberrypi.com/{slug}/images"
headers = {"User-Agent": "Mozilla/5.0 (compatible; CM4-Provisioning/1.0)"}
try:
req = urllib.request.Request(base + "/", headers=headers)
with urllib.request.urlopen(req, timeout=20) as r:
html = r.read().decode("utf-8", errors="ignore")
folders = re.findall(rf"{re.escape(slug)}-(\d{{4}}-\d{{2}}-\d{{2}})/", html)
if not folders:
return None
latest = sorted(folders)[-1]
folder_url = f"{base}/{slug}-{latest}/"
req2 = urllib.request.Request(folder_url, headers=headers)
with urllib.request.urlopen(req2, timeout=20) as r:
folder_html = r.read().decode("utf-8", errors="ignore")
m = re.search(r'href="([^"]+\.img\.xz)"', folder_html)
if not m:
return None
href = m.group(1)
if href.startswith("http://") or href.startswith("https://"):
return href
return folder_url.rstrip("/") + "/" + href.lstrip("/")
except Exception:
return None
def _load_cloudinit_templates():
try:
if CLOUDINIT_TEMPLATES_FILE.is_file():
with open(CLOUDINIT_TEMPLATES_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"templates": []}
def _save_cloudinit_templates(data):
try:
CLOUDINIT_TEMPLATES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CLOUDINIT_TEMPLATES_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except OSError:
return False
@app.route("/api/build-cloudinit-status")
@require_admin
def api_build_cloudinit_status():
"""Return current build status (phase, message, output_name, error)."""
return jsonify(_build_status_read())
@app.route("/api/build-cloudinit-cancel", methods=["POST"])
@require_admin
def api_build_cloudinit_cancel():
"""Request cancellation of the current build (host script checks for cancel file)."""
try:
BUILD_CANCEL_FILE.parent.mkdir(parents=True, exist_ok=True)
BUILD_CANCEL_FILE.write_text("")
return jsonify({"ok": True, "message": "Cancel requested. Build will stop at next check."})
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/build-cloudinit", methods=["POST"])
@require_admin
def api_build_cloudinit():
"""Start building a cloud-init image: write request file; host runs build (has loop devices)."""
st = _build_status_read()
if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"):
return jsonify({"ok": False, "error": "A build is already in progress"}), 409
body = request.get_json(silent=True) or {}
variant = (body.get("variant") or "desktop").strip().lower()
if variant not in ("lite", "desktop", "full"):
variant = "desktop"
_build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…")
url = _raspios_latest_url(variant)
if not url:
_build_status_write("idle", "", error="Could not resolve latest Raspios URL")
return jsonify({"ok": False, "error": "Could not resolve latest image URL"}), 503
user_data = body.get("user_data") or DEFAULT_USER_DATA
meta_data = body.get("meta_data") or DEFAULT_META_DATA
network_config = body.get("network_config") or DEFAULT_NETWORK_CONFIG
set_as_golden_after = bool(body.get("set_as_golden_after"))
image_name = (body.get("image_name") or "").strip()[:64]
image_name = re.sub(r"[^\w\-]", "", image_name) # only alphanumeric, underscore, dash
try:
BUILD_REQUEST_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUILD_REQUEST_FILE, "w") as f:
json.dump({
"url": url,
"variant": variant,
"image_name": image_name or None,
"user_data": user_data,
"meta_data": meta_data,
"network_config": network_config,
"set_as_golden_after": set_as_golden_after,
}, f, indent=2)
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
_build_status_write("resolving", "Build requested; host will run download and inject (check status).")
return jsonify({"ok": True, "message": "Build started on host. Download and inject may take 1545 min. Poll status or refresh the page."}), 202
@app.route("/api/raspios-latest-url")
@require_admin
def api_raspios_latest_url():
"""Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|desktop|full."""
variant = (request.args.get("variant") or "desktop").strip().lower()
if variant not in ("lite", "desktop", "full"):
variant = "desktop"
url = _raspios_latest_url(variant)
if not url:
return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503
return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1], "variant": variant})
@app.route("/api/cloudinit-templates", methods=["GET"])
@require_admin
def api_cloudinit_templates_list():
"""List saved cloud-init templates."""
data = _load_cloudinit_templates()
return jsonify({"templates": data.get("templates", [])})
@app.route("/api/cloudinit-templates", methods=["POST"])
@require_admin
def api_cloudinit_templates_create():
"""Save a new cloud-init template."""
body = request.get_json(force=True, silent=True) or {}
name = (body.get("name") or "").strip()
if not name:
return jsonify({"ok": False, "error": "name required"}), 400
data = _load_cloudinit_templates()
templates = data.get("templates", [])
tid = str(int(time.time() * 1000))
templates.append({
"id": tid,
"name": name,
"user_data": body.get("user_data", ""),
"meta_data": body.get("meta_data", ""),
"network_config": body.get("network_config", ""),
})
data["templates"] = templates
if not _save_cloudinit_templates(data):
return jsonify({"ok": False, "error": "Failed to save"}), 500
return jsonify({"ok": True, "id": tid, "name": name})
@app.route("/api/cloudinit-templates/<tid>")
@require_admin
def api_cloudinit_templates_get(tid):
"""Get one template by id."""
data = _load_cloudinit_templates()
for t in data.get("templates", []):
if t.get("id") == tid:
return jsonify(t)
return jsonify({"error": "not found"}), 404
@app.route("/api/cloudinit-templates/<tid>", methods=["PUT"])
@require_admin
def api_cloudinit_templates_update(tid):
"""Update an existing template (name, user_data, meta_data, network_config). Id unchanged."""
body = request.get_json(force=True, silent=True) or {}
data = _load_cloudinit_templates()
templates = data.get("templates", [])
for i, t in enumerate(templates):
if t.get("id") == tid:
name = (body.get("name") or t.get("name") or "").strip()
if not name:
return jsonify({"ok": False, "error": "name required"}), 400
templates[i] = {
"id": tid,
"name": name,
"user_data": body.get("user_data", t.get("user_data", "")),
"meta_data": body.get("meta_data", t.get("meta_data", "")),
"network_config": body.get("network_config", t.get("network_config", "")),
}
data["templates"] = templates
if not _save_cloudinit_templates(data):
return jsonify({"ok": False, "error": "Failed to save"}), 500
return jsonify({"ok": True, "id": tid, "name": name})
return jsonify({"ok": False, "error": "not found"}), 404
@app.route("/api/cloudinit-templates/<tid>", methods=["DELETE"])
@require_admin
def api_cloudinit_templates_delete(tid):
"""Delete a template."""
data = _load_cloudinit_templates()
templates = [t for t in data.get("templates", []) if t.get("id") != tid]
if len(templates) == len(data.get("templates", [])):
return jsonify({"ok": False, "error": "not found"}), 404
data["templates"] = templates
if not _save_cloudinit_templates(data):
return jsonify({"ok": False, "error": "Failed to save"}), 500
return jsonify({"ok": True})
@app.route("/api/golden-info")
def api_golden_info():
"""Return whether golden image exists, size/mtime, and which file it points to (for UI)."""
if not GOLDEN_IMAGE.is_file():
return jsonify({"present": False})
try:
st = GOLDEN_IMAGE.stat()
src, src_name = _golden_current_source()
out = {"present": True, "size": st.st_size, "mtime": st.st_mtime}
if src and src_name:
out["source"] = src
out["name"] = src_name
return jsonify(out)
except OSError:
return jsonify({"present": False})
@app.route("/api/admin/users", methods=["GET"])
@require_admin
def api_admin_users():
return jsonify({"users": list_users()})
@app.route("/api/admin/users", methods=["POST"])
@require_admin
def api_admin_add_user():
body = request.get_json(force=True, silent=True) or {}
username = (body.get("username") or "").strip()
password = (body.get("password") or "")
if not username:
return jsonify({"ok": False, "error": "username required"}), 400
if len(password) < 6:
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
if create_user(username, password):
admin_log("user_created", username)
return jsonify({"ok": True, "message": f"User {username} created"})
return jsonify({"ok": False, "error": "username already exists"}), 409
@app.route("/api/admin/users/<int:user_id>/password", methods=["POST"])
@require_admin
def api_admin_change_password(user_id):
body = request.get_json(force=True, silent=True) or {}
new_password = body.get("password") or ""
if len(new_password) < 6:
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
conn = get_db()
row = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "user not found"}), 404
change_password(user_id, new_password)
admin_log("password_changed", str(user_id))
return jsonify({"ok": True, "message": "Password updated"})
@app.route("/api/admin/logs")
@require_admin
def api_admin_logs():
limit = min(int(request.args.get("limit", 100)), 500)
return jsonify({"logs": get_recent_logs(limit)})
# Ensure DB exists when app is loaded (e.g. by gunicorn or systemd)
try:
init_db()
except Exception:
pass
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)