Implement a new API endpoint to retrieve current DHCP leases from dnsmasq, enhancing the dashboard's functionality for monitoring network devices. Update the home.html template to display DHCP lease information in a structured table format, including IP, MAC, hostname, and expiry details. Introduce buttons for enabling and disabling DHCP network boot, improving user interaction. Enhance JavaScript to fetch and display lease data dynamically, ensuring users have real-time visibility of network activity.
1447 lines
54 KiB
Python
1447 lines
54 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Flask dashboard for CM4 eMMC provisioning.
|
||
Public home: deploy only (status, logs, how to connect). No login.
|
||
Admin: login required — backups, cloud-init, portal files, set golden, users.
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import sqlite3
|
||
import subprocess
|
||
import time
|
||
import urllib.request
|
||
from functools import wraps
|
||
from pathlib import Path
|
||
|
||
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, session
|
||
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.environ.get("CM4_DASHBOARD_SECRET_KEY", os.urandom(24).hex())
|
||
|
||
# --- Paths ---
|
||
BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning"))
|
||
STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json"))
|
||
LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log"))
|
||
ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request"))
|
||
DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source"))
|
||
BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups")))
|
||
CLOUDINIT_IMAGES_DIR = Path(os.environ.get("CM4_CLOUDINIT_IMAGES_DIR", str(BASE_DIR / "cloudinit-images")))
|
||
PORTAL_FILES_DIR = Path(os.environ.get("CM4_PORTAL_FILES_DIR", str(BASE_DIR / "portal-files")))
|
||
GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img")))
|
||
NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json")))
|
||
BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json")))
|
||
BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR / "build_cloudinit_request.json")))
|
||
SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json")))
|
||
SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json")))
|
||
CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json")))
|
||
PORTAL_DESCRIPTIONS_FILE = Path(os.environ.get("CM4_PORTAL_DESCRIPTIONS_FILE", str(BASE_DIR / "portal_descriptions.json")))
|
||
DB_PATH = Path(os.environ.get("CM4_DASHBOARD_DB", str(BASE_DIR / "dashboard.db")))
|
||
TOGGLE_NETWORK_BOOT_SCRIPT = os.environ.get("CM4_TOGGLE_NETWORK_BOOT_SCRIPT", "/opt/cm4-provisioning/toggle-network-boot-dhcp.sh")
|
||
DHCP_LEASES_FILE = os.environ.get("CM4_DHCP_LEASES_FILE", "/var/lib/misc/dnsmasq.leases")
|
||
|
||
|
||
# --- Database (admin users + activity logs) ---
|
||
def get_db():
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def init_db():
|
||
os.makedirs(DB_PATH.parent, exist_ok=True)
|
||
conn = get_db()
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS users (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
username TEXT UNIQUE NOT NULL,
|
||
password_hash TEXT NOT NULL,
|
||
created_at REAL NOT NULL
|
||
);
|
||
CREATE TABLE IF NOT EXISTS admin_logs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
user_id INTEGER,
|
||
action TEXT NOT NULL,
|
||
details TEXT,
|
||
created_at REAL NOT NULL,
|
||
FOREIGN KEY (user_id) REFERENCES users(id)
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_logs_created ON admin_logs(created_at DESC);
|
||
""")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def admin_log(action, details=None):
|
||
user_id = session.get("user_id")
|
||
conn = get_db()
|
||
conn.execute(
|
||
"INSERT INTO admin_logs (user_id, action, details, created_at) VALUES (?, ?, ?, ?)",
|
||
(user_id, action, details, time.time()),
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def require_admin(f):
|
||
@wraps(f)
|
||
def wrapped(*args, **kwargs):
|
||
if not session.get("admin_logged_in"):
|
||
if request.is_json or request.path.startswith("/api/"):
|
||
return jsonify({"ok": False, "error": "Login required"}), 401
|
||
return redirect(url_for("login", next=request.url))
|
||
return f(*args, **kwargs)
|
||
return wrapped
|
||
|
||
|
||
def get_user_by_username(username):
|
||
conn = get_db()
|
||
row = conn.execute("SELECT id, username, password_hash FROM users WHERE username = ?", (username,)).fetchone()
|
||
conn.close()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def create_user(username, password):
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
|
||
(username, generate_password_hash(password), time.time()),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
except sqlite3.IntegrityError:
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def list_users():
|
||
conn = get_db()
|
||
rows = conn.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall()
|
||
conn.close()
|
||
return [{"id": r["id"], "username": r["username"], "created_at": r["created_at"]} for r in rows]
|
||
|
||
|
||
def change_password(user_id, new_password):
|
||
conn = get_db()
|
||
conn.execute(
|
||
"UPDATE users SET password_hash = ? WHERE id = ?",
|
||
(generate_password_hash(new_password), user_id),
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def get_recent_logs(limit=100):
|
||
conn = get_db()
|
||
rows = conn.execute(
|
||
"SELECT l.id, l.action, l.details, l.created_at, u.username FROM admin_logs l LEFT JOIN users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT ?",
|
||
(limit,),
|
||
).fetchall()
|
||
conn.close()
|
||
return [{"id": r["id"], "action": r["action"], "details": r["details"], "created_at": r["created_at"], "username": r["username"]} for r in rows]
|
||
|
||
|
||
@app.after_request
|
||
def no_cache(response):
|
||
if request.path == "/" or request.path.startswith("/api/") or request.path.startswith("/admin") or request.path.startswith("/login"):
|
||
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
||
response.headers["Pragma"] = "no-cache"
|
||
return response
|
||
|
||
# Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition)
|
||
DEFAULT_USER_DATA = """#cloud-config
|
||
package_update: true
|
||
package_upgrade: false
|
||
packages:
|
||
- curl
|
||
|
||
runcmd:
|
||
- curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh
|
||
- chmod +x /tmp/bootstrap.sh
|
||
- /tmp/bootstrap.sh
|
||
"""
|
||
|
||
DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001
|
||
local-hostname: gnss.guard
|
||
"""
|
||
|
||
DEFAULT_NETWORK_CONFIG = """version: 2
|
||
ethernets:
|
||
eth0:
|
||
dhcp4: true
|
||
"""
|
||
|
||
DEFAULT_STATUS = {
|
||
"phase": "idle",
|
||
"message": "Waiting for reTerminal in boot mode or network.",
|
||
"progress": None,
|
||
"updated": None,
|
||
}
|
||
|
||
|
||
def read_status():
|
||
try:
|
||
with open(STATUS_FILE, "r") as f:
|
||
data = json.load(f)
|
||
out = {**DEFAULT_STATUS, **data}
|
||
if out.get("phase") == "waiting_choice":
|
||
try:
|
||
with open(DEVICE_SOURCE_FILE, "r") as sf:
|
||
out["device_source"] = (sf.read() or "").strip() or "usb"
|
||
except (FileNotFoundError, OSError):
|
||
out["device_source"] = "usb"
|
||
return out
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return DEFAULT_STATUS
|
||
|
||
|
||
def read_log_tail(lines=50):
|
||
try:
|
||
with open(LOG_FILE, "r") as f:
|
||
all_lines = f.readlines()
|
||
return "".join(all_lines[-lines:]).strip() if all_lines else ""
|
||
except (FileNotFoundError, PermissionError):
|
||
return ""
|
||
|
||
|
||
def _load_network_devices():
|
||
try:
|
||
if NETWORK_DEVICES_FILE.is_file():
|
||
with open(NETWORK_DEVICES_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {"devices": []}
|
||
|
||
|
||
def _save_network_devices(data):
|
||
try:
|
||
os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True)
|
||
with open(NETWORK_DEVICES_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except (PermissionError, OSError):
|
||
return False
|
||
|
||
|
||
BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json"
|
||
CLOUDINIT_META_FILE = CLOUDINIT_IMAGES_DIR / "cloudinit_meta.json"
|
||
|
||
|
||
def _load_backups_meta():
|
||
try:
|
||
if BACKUPS_META_FILE.is_file():
|
||
with open(BACKUPS_META_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {}
|
||
|
||
|
||
def _save_backups_meta(data):
|
||
try:
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
with open(BACKUPS_META_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except (PermissionError, OSError):
|
||
return False
|
||
|
||
|
||
def _safe_backup_name(name):
|
||
"""Reject path traversal and ensure it's a backup filename we manage."""
|
||
if not name or ".." in name or "/" in name or "\\" in name:
|
||
return False
|
||
if not name.endswith((".img", ".img.gz", ".img.xz")):
|
||
return False
|
||
return True
|
||
|
||
|
||
def _load_cloudinit_meta():
|
||
try:
|
||
if CLOUDINIT_META_FILE.is_file():
|
||
with open(CLOUDINIT_META_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {}
|
||
|
||
|
||
def _save_cloudinit_meta(data):
|
||
try:
|
||
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||
with open(CLOUDINIT_META_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except (PermissionError, OSError):
|
||
return False
|
||
|
||
|
||
def list_backups():
|
||
if not BACKUPS_DIR.is_dir():
|
||
return []
|
||
meta = _load_backups_meta()
|
||
out = []
|
||
for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
||
if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
|
||
try:
|
||
st = p.stat()
|
||
m = meta.get(p.name, {})
|
||
out.append({
|
||
"name": p.name,
|
||
"display_name": m.get("name") or p.name,
|
||
"description": m.get("description") or "",
|
||
"size": st.st_size,
|
||
"mtime": st.st_mtime,
|
||
})
|
||
except OSError:
|
||
pass
|
||
return out
|
||
|
||
|
||
def list_cloudinit_images():
|
||
if not CLOUDINIT_IMAGES_DIR.is_dir():
|
||
return []
|
||
meta = _load_cloudinit_meta()
|
||
out = []
|
||
for p in sorted(CLOUDINIT_IMAGES_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
||
if p.is_file() and p.name != "cloudinit_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
|
||
try:
|
||
st = p.stat()
|
||
m = meta.get(p.name, {})
|
||
out.append({
|
||
"name": p.name,
|
||
"display_name": m.get("name") or p.name,
|
||
"description": m.get("description") or "",
|
||
"size": st.st_size,
|
||
"mtime": st.st_mtime,
|
||
})
|
||
except OSError:
|
||
pass
|
||
return out
|
||
|
||
|
||
def _golden_current_source():
|
||
"""Return (source, name) if golden is a symlink to backups or cloudinit, else (None, None)."""
|
||
if not GOLDEN_IMAGE.exists():
|
||
return None, None
|
||
try:
|
||
target = GOLDEN_IMAGE.resolve()
|
||
try:
|
||
target.relative_to(BACKUPS_DIR.resolve())
|
||
return "backups", target.name
|
||
except ValueError:
|
||
pass
|
||
try:
|
||
target.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
|
||
return "cloudinit", target.name
|
||
except ValueError:
|
||
pass
|
||
except OSError:
|
||
pass
|
||
return None, None
|
||
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return render_template("home.html")
|
||
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "GET":
|
||
if session.get("admin_logged_in"):
|
||
return redirect(url_for("admin"))
|
||
return render_template("login.html")
|
||
username = (request.form.get("username") or "").strip()
|
||
password = (request.form.get("password") or "")
|
||
if not username:
|
||
return render_template("login.html", error="Username required")
|
||
user = get_user_by_username(username)
|
||
if not user:
|
||
# First user: allow self-registration
|
||
if not list_users():
|
||
if not password or len(password) < 6:
|
||
return render_template("login.html", error="First user: choose a password (min 6 characters)")
|
||
create_user(username, password)
|
||
session["admin_logged_in"] = True
|
||
session["user_id"] = get_user_by_username(username)["id"]
|
||
session["username"] = username
|
||
admin_log("first_admin_created", username)
|
||
return redirect(request.args.get("next") or url_for("admin"))
|
||
return render_template("login.html", error="Invalid username or password")
|
||
if not check_password_hash(user["password_hash"], password):
|
||
return render_template("login.html", error="Invalid username or password")
|
||
session["admin_logged_in"] = True
|
||
session["user_id"] = user["id"]
|
||
session["username"] = user["username"]
|
||
admin_log("login", username)
|
||
return redirect(request.args.get("next") or url_for("admin"))
|
||
|
||
|
||
@app.route("/logout")
|
||
def logout():
|
||
if session.get("admin_logged_in"):
|
||
admin_log("logout", session.get("username"))
|
||
session.clear()
|
||
return redirect(url_for("index"))
|
||
|
||
|
||
@app.route("/admin")
|
||
@require_admin
|
||
def admin():
|
||
return render_template("admin.html", username=session.get("username", "Admin"))
|
||
|
||
|
||
@app.route("/admin/portal-files")
|
||
@require_admin
|
||
def admin_portal_files():
|
||
return render_template("portal_files.html", username=session.get("username", "Admin"))
|
||
|
||
|
||
@app.route("/admin/cloudinit-build")
|
||
@require_admin
|
||
def admin_cloudinit_build():
|
||
return render_template("cloudinit_build.html", username=session.get("username", "Admin"))
|
||
|
||
|
||
# Serve portal files for wget (e.g. cloud-init first boot). No auth.
|
||
# Subpaths allowed (e.g. first-boot/splash.png); ".." and "\\" forbidden to prevent traversal.
|
||
@app.route("/files/<path:filename>")
|
||
def serve_portal_file(filename):
|
||
if ".." in filename or "\\" in filename:
|
||
return jsonify({"error": "invalid path"}), 400
|
||
path = (PORTAL_FILES_DIR / filename).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"error": "invalid path"}), 400
|
||
if not path.is_file():
|
||
return jsonify({"error": "not found"}), 404
|
||
return send_file(path, as_attachment=False, download_name=filename.split("/")[-1])
|
||
|
||
|
||
@app.route("/api/portal-files-debug")
|
||
def api_portal_files_debug():
|
||
"""No-auth debug: what PORTAL_FILES_DIR the process sees (for troubleshooting)."""
|
||
try:
|
||
names = []
|
||
if PORTAL_FILES_DIR.is_dir():
|
||
for p in sorted(PORTAL_FILES_DIR.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
|
||
if ".." in p.name or p.name.startswith("."):
|
||
continue
|
||
names.append({"name": p.name, "type": "dir" if p.is_dir() else "file"})
|
||
return jsonify({
|
||
"portal_files_dir": str(PORTAL_FILES_DIR),
|
||
"exists": PORTAL_FILES_DIR.exists(),
|
||
"is_dir": PORTAL_FILES_DIR.is_dir(),
|
||
"items": names,
|
||
"CM4_PROVISIONING_DIR": os.environ.get("CM4_PROVISIONING_DIR"),
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/status")
|
||
def api_status():
|
||
return jsonify(read_status())
|
||
|
||
|
||
@app.route("/api/status-clear", methods=["POST"])
|
||
def api_status_clear():
|
||
"""Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again)."""
|
||
try:
|
||
with open(STATUS_FILE, "w") as f:
|
||
json.dump({
|
||
"phase": "idle",
|
||
"message": DEFAULT_STATUS["message"],
|
||
"progress": None,
|
||
"updated": None,
|
||
}, f)
|
||
return jsonify({"ok": True})
|
||
except (PermissionError, OSError):
|
||
return jsonify({"ok": False, "error": "Could not write status"}), 500
|
||
|
||
|
||
@app.route("/api/log")
|
||
def api_log():
|
||
return jsonify({"log": read_log_tail()})
|
||
|
||
|
||
@app.route("/api/pending-devices")
|
||
def api_pending_devices():
|
||
"""Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy."""
|
||
st = read_status()
|
||
usb = None
|
||
if st.get("phase") == "waiting_choice":
|
||
usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")}
|
||
data = _load_network_devices()
|
||
network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")]
|
||
return jsonify({"usb": usb, "network": network})
|
||
|
||
|
||
@app.route("/api/device-action", methods=["POST"])
|
||
def api_device_action():
|
||
"""User chose Backup or Deploy for a device. source=usb | network; for network pass mac=."""
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
source = (body.get("source") or "").strip().lower()
|
||
action = (body.get("action") or "").strip().lower()
|
||
if action not in ("backup", "deploy", "reboot"):
|
||
return jsonify({"ok": False, "error": "action must be 'backup', 'deploy', or 'reboot'"}), 400
|
||
if action == "reboot" and source != "network":
|
||
return jsonify({"ok": False, "error": "'reboot' is only for network devices"}), 400
|
||
if source == "usb":
|
||
try:
|
||
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
|
||
# If user requested "shrink after backup", create flag so host runs PiShrink after dd
|
||
if action == "backup" and body.get("shrink"):
|
||
try:
|
||
(BASE_DIR / "shrink_next_backup").write_text("1")
|
||
except (PermissionError, OSError):
|
||
pass # host may still have SHRINK_BACKUP=1
|
||
with open(ACTION_REQUEST_FILE, "w") as f:
|
||
f.write(action)
|
||
return jsonify({"ok": True})
|
||
except (PermissionError, OSError):
|
||
return jsonify({"ok": False, "error": "Could not write action file"}), 500
|
||
if source == "network":
|
||
mac = (body.get("mac") or "").strip()
|
||
if not mac:
|
||
return jsonify({"ok": False, "error": "mac required for network device"}), 400
|
||
data = _load_network_devices()
|
||
for d in data.get("devices", []):
|
||
if (d.get("mac") or "").lower() == mac.lower():
|
||
d["action"] = action
|
||
d["action_at"] = time.time()
|
||
_save_network_devices(data)
|
||
return jsonify({"ok": True})
|
||
return jsonify({"ok": False, "error": "Device not found"}), 404
|
||
return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400
|
||
|
||
|
||
@app.route("/api/register-device", methods=["POST"])
|
||
def api_register_device():
|
||
"""Called by a network-booted device to register (mac, ip)."""
|
||
body = request.get_json(force=True, silent=True) or request.form
|
||
mac = (body.get("mac") or "").strip()
|
||
ip = (body.get("ip") or request.remote_addr or "").strip()
|
||
if not mac:
|
||
return jsonify({"ok": False, "error": "mac required"}), 400
|
||
data = _load_network_devices()
|
||
devices = data.get("devices", [])
|
||
for d in devices:
|
||
if (d.get("mac") or "").lower() == mac.lower():
|
||
d["ip"] = ip
|
||
d["registered_at"] = time.time()
|
||
d["action"] = d.get("action") or "wait"
|
||
_save_network_devices(data)
|
||
return jsonify({"ok": True, "message": "registered"})
|
||
devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"})
|
||
data["devices"] = devices
|
||
_save_network_devices(data)
|
||
return jsonify({"ok": True, "message": "registered"})
|
||
|
||
|
||
def _dhcp_network_boot_run(cmd):
|
||
"""Run toggle script with enable|disable|status. Returns (ok, output_or_error)."""
|
||
if not os.path.isfile(TOGGLE_NETWORK_BOOT_SCRIPT) or not os.access(TOGGLE_NETWORK_BOOT_SCRIPT, os.X_OK):
|
||
return False, "Toggle script not installed"
|
||
try:
|
||
out = subprocess.run(
|
||
[TOGGLE_NETWORK_BOOT_SCRIPT, cmd],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=10,
|
||
)
|
||
if out.returncode != 0:
|
||
return False, (out.stderr or out.stdout or "script failed").strip()
|
||
return True, (out.stdout or "").strip()
|
||
except subprocess.TimeoutExpired:
|
||
return False, "Timeout"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
@app.route("/api/dhcp-network-boot", methods=["GET"])
|
||
def api_dhcp_network_boot_get():
|
||
"""Return whether DHCP network-boot options (66/67) are enabled."""
|
||
ok, out = _dhcp_network_boot_run("status")
|
||
if not ok:
|
||
return jsonify({"enabled": None, "error": out}), 200
|
||
return jsonify({"enabled": out.strip().lower() == "enabled"})
|
||
|
||
|
||
@app.route("/api/dhcp-network-boot", methods=["POST"])
|
||
def api_dhcp_network_boot_post():
|
||
"""Enable or disable DHCP network-boot options (DHCP server keeps running). Body: { \"enabled\": true|false }."""
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
enabled = body.get("enabled")
|
||
if enabled is None:
|
||
return jsonify({"ok": False, "error": "enabled required (true|false)"}), 400
|
||
cmd = "enable" if enabled else "disable"
|
||
ok, out = _dhcp_network_boot_run(cmd)
|
||
if not ok:
|
||
return jsonify({"ok": False, "error": out}), 500
|
||
return jsonify({"ok": True, "enabled": enabled})
|
||
|
||
|
||
@app.route("/api/action-done", methods=["POST"])
|
||
def api_action_done():
|
||
"""Called by a device when deploy or backup has completed. Disables DHCP network-boot so the device boots from eMMC next time."""
|
||
mac = request.args.get("mac") or ((request.get_json(silent=True) or {}).get("mac") or "")
|
||
ok, _ = _dhcp_network_boot_run("disable")
|
||
if not ok:
|
||
return jsonify({"ok": False, "error": "Could not disable DHCP network boot"}), 500
|
||
return jsonify({"ok": True, "message": "Network boot disabled; device will boot from eMMC on next boot"})
|
||
|
||
|
||
def _read_dhcp_leases():
|
||
"""Read dnsmasq lease file. Returns (leases_list, error_string). leases_list items: {expiry, mac, ip, hostname}."""
|
||
if not DHCP_LEASES_FILE or not os.path.isfile(DHCP_LEASES_FILE):
|
||
return [], None
|
||
try:
|
||
leases = []
|
||
with open(DHCP_LEASES_FILE, "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
parts = line.split()
|
||
if len(parts) >= 4:
|
||
leases.append({
|
||
"expiry": int(parts[0]) if parts[0].isdigit() else 0,
|
||
"mac": parts[1],
|
||
"ip": parts[2],
|
||
"hostname": parts[3] if len(parts) > 3 else "",
|
||
})
|
||
return leases, None
|
||
except (OSError, PermissionError) as e:
|
||
return [], str(e)
|
||
|
||
|
||
@app.route("/api/dhcp-leases")
|
||
def api_dhcp_leases():
|
||
"""Return current DHCP leases from dnsmasq (when dashboard runs on LXC with dnsmasq)."""
|
||
leases, err = _read_dhcp_leases()
|
||
if err:
|
||
return jsonify({"leases": [], "error": err})
|
||
return jsonify({"leases": leases, "error": None})
|
||
|
||
|
||
@app.route("/api/device-action-poll")
|
||
def api_device_action_poll():
|
||
"""Network device polls this to get its assigned action (deploy/backup) and URL."""
|
||
mac = (request.args.get("mac") or "").strip()
|
||
if not mac:
|
||
return jsonify({"action": "wait"}), 200
|
||
data = _load_network_devices()
|
||
base = request.host_url.rstrip("/")
|
||
for d in data.get("devices", []):
|
||
if (d.get("mac") or "").lower() == mac.lower():
|
||
action = d.get("action") or "wait"
|
||
if action == "deploy":
|
||
return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"})
|
||
if action == "backup":
|
||
return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"})
|
||
if action == "reboot":
|
||
return jsonify({"action": "reboot"})
|
||
return jsonify({"action": "wait"})
|
||
return jsonify({"action": "wait"})
|
||
|
||
|
||
@app.route("/api/golden-image")
|
||
def api_golden_image():
|
||
"""Stream the golden image for network deploy (device pulls and writes to eMMC)."""
|
||
if not GOLDEN_IMAGE.is_file():
|
||
return jsonify({"error": "Golden image not found"}), 404
|
||
return send_file(
|
||
GOLDEN_IMAGE,
|
||
mimetype="application/octet-stream",
|
||
as_attachment=True,
|
||
download_name="golden.img",
|
||
)
|
||
|
||
|
||
@app.route("/api/backup-upload", methods=["POST"])
|
||
def api_backup_upload():
|
||
"""Network device uploads its eMMC backup (raw body)."""
|
||
mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20]
|
||
if not mac:
|
||
return jsonify({"error": "mac query param required"}), 400
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
name = f"backup-net-{mac}-{int(time.time())}.img"
|
||
path = BACKUPS_DIR / name
|
||
try:
|
||
with open(path, "wb") as f:
|
||
while True:
|
||
chunk = request.stream.read(1024 * 1024)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
return jsonify({"ok": True, "file": name})
|
||
except (OSError, IOError) as e:
|
||
if path.exists():
|
||
path.unlink(missing_ok=True)
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/backups")
|
||
@require_admin
|
||
def api_backups():
|
||
return jsonify({"backups": list_backups(), "backups_dir": str(BACKUPS_DIR)})
|
||
|
||
|
||
@app.route("/api/cloudinit-images")
|
||
@require_admin
|
||
def api_cloudinit_images():
|
||
return jsonify({"images": list_cloudinit_images(), "cloudinit_dir": str(CLOUDINIT_IMAGES_DIR)})
|
||
|
||
|
||
def _load_portal_descriptions():
|
||
"""Return dict mapping path -> description (path can be file or folder)."""
|
||
if not PORTAL_DESCRIPTIONS_FILE.is_file():
|
||
return {}
|
||
try:
|
||
with open(PORTAL_DESCRIPTIONS_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
return {}
|
||
|
||
|
||
def _save_portal_descriptions(descriptions):
|
||
try:
|
||
PORTAL_DESCRIPTIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(PORTAL_DESCRIPTIONS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(descriptions, f, indent=2)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
def _portal_files_list_impl(subpath):
|
||
"""Shared impl for listing portal files. Returns (items, descriptions, base_url, current_path)."""
|
||
base_url = request.host_url.rstrip("/") + "/files/"
|
||
empty_items = []
|
||
if ".." in subpath or "\\" in subpath:
|
||
return empty_items, {}, base_url, subpath
|
||
if not PORTAL_FILES_DIR.is_dir():
|
||
try:
|
||
PORTAL_FILES_DIR.mkdir(parents=True, exist_ok=True)
|
||
except OSError:
|
||
pass
|
||
if not PORTAL_FILES_DIR.is_dir():
|
||
return empty_items, {}, base_url, subpath
|
||
list_dir = (PORTAL_FILES_DIR / subpath).resolve() if subpath else PORTAL_FILES_DIR
|
||
try:
|
||
list_dir.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return empty_items, {}, base_url, subpath
|
||
if not list_dir.is_dir():
|
||
return empty_items, {}, base_url, subpath
|
||
items = []
|
||
for p in sorted(list_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
|
||
if ".." in p.name or p.name.startswith("."):
|
||
continue
|
||
rel = (subpath + "/" + p.name) if subpath else p.name
|
||
try:
|
||
if p.is_dir():
|
||
items.append({"type": "folder", "path": rel, "name": p.name})
|
||
else:
|
||
items.append({"type": "file", "path": rel, "name": p.name, "size": p.stat().st_size, "mtime": p.stat().st_mtime})
|
||
except OSError:
|
||
pass
|
||
descriptions = _load_portal_descriptions()
|
||
return items, descriptions, base_url, subpath
|
||
|
||
|
||
@app.route("/api/portal-files")
|
||
def api_portal_files_list():
|
||
"""List one level: root or contents of path=... (folders and files). ?debug=1 allows unauthenticated read-only list."""
|
||
subpath = request.args.get("path", "").strip().strip("/")
|
||
debug = request.args.get("debug") == "1"
|
||
if not debug and not session.get("admin_logged_in"):
|
||
if request.is_json or request.path.startswith("/api/"):
|
||
return jsonify({"ok": False, "error": "Login required"}), 401
|
||
return redirect(url_for("login", next=request.url))
|
||
items, descriptions, base_url, subpath = _portal_files_list_impl(subpath)
|
||
resp = jsonify({
|
||
"items": items,
|
||
"base_url": base_url,
|
||
"descriptions": descriptions,
|
||
"current_path": subpath,
|
||
"portal_files_dir": str(PORTAL_FILES_DIR),
|
||
})
|
||
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
|
||
return resp
|
||
|
||
|
||
@app.route("/api/portal-files/descriptions", methods=["GET", "PATCH"])
|
||
@require_admin
|
||
def api_portal_descriptions():
|
||
if request.method == "PATCH":
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
desc = data.get("descriptions")
|
||
if not isinstance(desc, dict):
|
||
return jsonify({"ok": False, "error": "descriptions must be a dict"}), 400
|
||
if not _save_portal_descriptions(desc):
|
||
return jsonify({"ok": False, "error": "save failed"}), 500
|
||
admin_log("portal_descriptions", "updated")
|
||
return jsonify({"ok": True})
|
||
return jsonify({"descriptions": _load_portal_descriptions()})
|
||
|
||
|
||
@app.route("/api/portal-files/folder", methods=["POST"])
|
||
@require_admin
|
||
def api_portal_folder_create():
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
path = (data.get("path") or "").strip().strip("/")
|
||
if not path:
|
||
return jsonify({"ok": False, "error": "path required"}), 400
|
||
if ".." in path or "\\" in path:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
full = (PORTAL_FILES_DIR / path).resolve()
|
||
try:
|
||
full.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
try:
|
||
full.mkdir(parents=True, exist_ok=True)
|
||
admin_log("portal_folder_create", path)
|
||
return jsonify({"ok": True, "path": path})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/portal-files/upload", methods=["POST"])
|
||
@require_admin
|
||
def api_portal_files_upload():
|
||
if "file" not in request.files and "upload" not in request.files:
|
||
return jsonify({"ok": False, "error": "no file (use field 'file' or 'upload')"}), 400
|
||
f = request.files.get("file") or request.files.get("upload")
|
||
if not f or not f.filename:
|
||
return jsonify({"ok": False, "error": "no file selected"}), 400
|
||
base_name = re.sub(r"[^\w\-./]", "_", f.filename)[:120] or "upload"
|
||
if ".." in base_name or base_name.startswith("/"):
|
||
return jsonify({"ok": False, "error": "invalid filename"}), 400
|
||
subpath = (request.form.get("path") or "").strip().strip("/")
|
||
if subpath and (".." in subpath or "\\" in subpath):
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
name = (subpath + "/" + base_name) if subpath else base_name
|
||
path = (PORTAL_FILES_DIR / name).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
try:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
f.save(str(path))
|
||
admin_log("portal_upload", name)
|
||
return jsonify({"ok": True, "name": name, "url": request.host_url.rstrip("/") + "/files/" + name})
|
||
except (OSError, IOError) as e:
|
||
if path.exists():
|
||
path.unlink(missing_ok=True)
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/portal-files/<path:name>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_portal_file_delete(name):
|
||
if ".." in name or "\\" in name:
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = (PORTAL_FILES_DIR / name).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
if path.is_file():
|
||
try:
|
||
path.unlink()
|
||
admin_log("portal_delete", name)
|
||
return jsonify({"ok": True})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
if path.is_dir():
|
||
if any(path.iterdir()):
|
||
return jsonify({"ok": False, "error": "folder not empty"}), 400
|
||
try:
|
||
path.rmdir()
|
||
admin_log("portal_folder_delete", name)
|
||
return jsonify({"ok": True})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
|
||
|
||
@app.route("/api/backups/upload", methods=["POST"])
|
||
@require_admin
|
||
def api_backups_upload():
|
||
"""Upload an image file from the dashboard (multipart form)."""
|
||
if "file" not in request.files and "image" not in request.files:
|
||
return jsonify({"ok": False, "error": "no file in request (use field 'file' or 'image')"}), 400
|
||
f = request.files.get("file") or request.files.get("image")
|
||
if not f or not f.filename:
|
||
return jsonify({"ok": False, "error": "no file selected"}), 400
|
||
base = (f.filename.rsplit(".", 1)[0] if "." in f.filename else f.filename).strip() or "upload"
|
||
safe_base = re.sub(r"[^\w\-.]", "_", base)[:80]
|
||
if not safe_base:
|
||
safe_base = "upload"
|
||
ext = ""
|
||
if f.filename.lower().endswith(".img.xz"):
|
||
ext = ".img.xz"
|
||
elif f.filename.lower().endswith(".img.gz"):
|
||
ext = ".img.gz"
|
||
elif f.filename.lower().endswith(".img"):
|
||
ext = ".img"
|
||
else:
|
||
ext = ".img"
|
||
name = f"{safe_base}-{int(time.time())}{ext}"
|
||
if not _safe_backup_name(name):
|
||
name = f"upload-{int(time.time())}.img"
|
||
path = BACKUPS_DIR / name
|
||
try:
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
f.save(str(path))
|
||
return jsonify({"ok": True, "name": name, "message": f"Uploaded {name}"})
|
||
except (OSError, IOError) as e:
|
||
if path.exists():
|
||
path.unlink(missing_ok=True)
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
def _set_golden_from_path(path):
|
||
"""Set golden image to point to given path (file in backups or cloudinit dir)."""
|
||
GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True)
|
||
if GOLDEN_IMAGE.exists():
|
||
GOLDEN_IMAGE.unlink()
|
||
path_resolved = path.resolve()
|
||
try:
|
||
path_resolved.relative_to(BACKUPS_DIR.resolve())
|
||
except ValueError:
|
||
try:
|
||
path_resolved.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
|
||
except ValueError:
|
||
shutil.copy2(path, GOLDEN_IMAGE)
|
||
return
|
||
os.symlink(path_resolved, GOLDEN_IMAGE)
|
||
|
||
|
||
@app.route("/api/backups/<path:name>/set-as-golden", methods=["POST"])
|
||
@require_admin
|
||
def api_backup_set_as_golden(name):
|
||
"""Use this backup as the golden image (symlink)."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
try:
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
_set_golden_from_path(path)
|
||
admin_log("set_golden", f"backups/{name}")
|
||
return jsonify({"ok": True, "message": f"Golden image set from backup {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>/set-as-golden", methods=["POST"])
|
||
@require_admin
|
||
def api_cloudinit_set_as_golden(name):
|
||
"""Use this cloud-init image as the golden image (symlink)."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "image not found"}), 404
|
||
try:
|
||
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||
_set_golden_from_path(path)
|
||
admin_log("set_golden", f"cloudinit/{name}")
|
||
return jsonify({"ok": True, "message": f"Golden image set from cloud-init image {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
def _request_host_shrink(name, action="shrink", format="xz"):
|
||
"""Write shrink request for host and poll shrink_status.json. Returns (ok, message_or_error)."""
|
||
req = {"name": name, "action": action}
|
||
if action == "compress":
|
||
req["format"] = "gz" if format == "gz" else "xz"
|
||
try:
|
||
SHRINK_REQUEST_FILE.write_text(json.dumps(req))
|
||
except OSError as e:
|
||
return False, str(e)
|
||
deadline = time.monotonic() + 2100 # 35 min
|
||
while time.monotonic() < deadline:
|
||
time.sleep(5)
|
||
if not SHRINK_STATUS_FILE.exists():
|
||
continue
|
||
try:
|
||
data = json.loads(SHRINK_STATUS_FILE.read_text())
|
||
except (OSError, ValueError):
|
||
continue
|
||
if data.get("name") != name:
|
||
continue
|
||
phase = data.get("phase")
|
||
if phase == "done":
|
||
return True, data.get("message") or f"Shrunk {name}"
|
||
if phase == "error":
|
||
return False, data.get("error") or "PiShrink failed"
|
||
return False, "Shrink timed out (run on host may still be in progress)"
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>", methods=["GET"])
|
||
@require_admin
|
||
def api_cloudinit_download(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"error": "not found"}), 404
|
||
return send_file(path, as_attachment=True, download_name=name)
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>", methods=["PATCH"])
|
||
@require_admin
|
||
def api_cloudinit_update(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
meta = _load_cloudinit_meta()
|
||
entry = meta.get(name, {})
|
||
new_filename = (body.get("filename") or "").strip()
|
||
if new_filename:
|
||
if not _safe_backup_name(new_filename):
|
||
return jsonify({"ok": False, "error": "invalid new filename"}), 400
|
||
new_path = CLOUDINIT_IMAGES_DIR / new_filename
|
||
if new_path.exists() and new_path != path:
|
||
return jsonify({"ok": False, "error": "target filename already exists"}), 409
|
||
try:
|
||
path.rename(new_path)
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
|
||
meta.pop(name, None)
|
||
name = new_filename
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
else:
|
||
if "name" in body:
|
||
entry["name"] = (body.get("name") or "").strip() or path.name
|
||
if "description" in body:
|
||
entry["description"] = (body.get("description") or "").strip()
|
||
meta[name] = entry
|
||
if not _save_cloudinit_meta(meta):
|
||
return jsonify({"ok": False, "error": "could not save metadata"}), 500
|
||
return jsonify({"ok": True, "name": name})
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_cloudinit_delete(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
try:
|
||
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
|
||
try:
|
||
if GOLDEN_IMAGE.resolve() == path.resolve():
|
||
GOLDEN_IMAGE.unlink()
|
||
except OSError:
|
||
pass
|
||
path.unlink()
|
||
meta = _load_cloudinit_meta()
|
||
meta.pop(name, None)
|
||
_save_cloudinit_meta(meta)
|
||
admin_log("cloudinit_delete", name)
|
||
return jsonify({"ok": True, "message": f"Deleted {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/backups/<path:name>/shrink", methods=["POST"])
|
||
@require_admin
|
||
def api_backup_shrink(name):
|
||
"""Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
|
||
return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
ok, msg = _request_host_shrink(name, action="shrink")
|
||
if ok:
|
||
return jsonify({"ok": True, "message": msg})
|
||
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
|
||
|
||
|
||
@app.route("/api/backups/<path:name>/compress", methods=["POST"])
|
||
@require_admin
|
||
def api_backup_compress(name):
|
||
"""Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
|
||
return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower()
|
||
if fmt not in ("gz", "gzip", "xz"):
|
||
fmt = "xz"
|
||
if fmt == "gzip":
|
||
fmt = "gz"
|
||
ok, msg = _request_host_shrink(name, action="compress", format=fmt)
|
||
if ok:
|
||
ext = ".xz" if fmt == "xz" else ".gz"
|
||
return jsonify({"ok": True, "message": msg or f"Compressed to {name}{ext}"})
|
||
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
|
||
|
||
|
||
@app.route("/api/backups/<path:name>", methods=["PATCH"])
|
||
@require_admin
|
||
def api_backup_update(name):
|
||
"""Update backup metadata (display name, description) or rename the file."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
meta = _load_backups_meta()
|
||
entry = meta.get(name, {})
|
||
|
||
new_filename = (body.get("filename") or "").strip()
|
||
if new_filename:
|
||
if not _safe_backup_name(new_filename):
|
||
return jsonify({"ok": False, "error": "invalid new filename"}), 400
|
||
new_path = BACKUPS_DIR / new_filename
|
||
if new_path.exists() and new_path != path:
|
||
return jsonify({"ok": False, "error": "target filename already exists"}), 409
|
||
try:
|
||
path.rename(new_path)
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
|
||
if name in meta:
|
||
del meta[name]
|
||
name = new_filename
|
||
path = BACKUPS_DIR / name
|
||
else:
|
||
if "name" in body:
|
||
entry["name"] = (body.get("name") or "").strip() or path.name
|
||
if "description" in body:
|
||
entry["description"] = (body.get("description") or "").strip()
|
||
meta[name] = entry
|
||
|
||
if not _save_backups_meta(meta):
|
||
return jsonify({"ok": False, "error": "could not save metadata"}), 500
|
||
return jsonify({"ok": True, "name": name})
|
||
|
||
|
||
@app.route("/api/backups/<path:name>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_backup_delete(name):
|
||
"""Delete a backup file. If it is the current golden image (symlink), the golden link is removed."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
try:
|
||
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
|
||
try:
|
||
if (GOLDEN_IMAGE.resolve() == path.resolve()):
|
||
GOLDEN_IMAGE.unlink()
|
||
except OSError:
|
||
pass
|
||
path.unlink()
|
||
meta = _load_backups_meta()
|
||
meta.pop(name, None)
|
||
_save_backups_meta(meta)
|
||
return jsonify({"ok": True, "message": f"Deleted {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/backups/<path:name>", methods=["GET"])
|
||
@require_admin
|
||
def api_backup_download(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"error": "invalid name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"error": "not found"}), 404
|
||
return send_file(path, as_attachment=True, download_name=name)
|
||
|
||
|
||
def _build_status_read():
|
||
try:
|
||
if BUILD_STATUS_FILE.is_file():
|
||
with open(BUILD_STATUS_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {"phase": "idle", "message": "", "output_name": None, "error": None}
|
||
|
||
|
||
def _build_status_write(phase, message, output_name=None, error=None):
|
||
try:
|
||
BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(BUILD_STATUS_FILE, "w") as f:
|
||
json.dump({
|
||
"phase": phase,
|
||
"message": message,
|
||
"output_name": output_name,
|
||
"error": error,
|
||
"updated": time.time(),
|
||
}, f, indent=2)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _raspios_latest_url(variant="lite"):
|
||
"""Resolve latest Raspberry Pi OS (arm64) .img.xz URL. variant=lite|full."""
|
||
slug = "raspios_lite_arm64" if variant == "lite" else "raspios_full_arm64"
|
||
base = f"https://downloads.raspberrypi.com/{slug}/images"
|
||
headers = {"User-Agent": "Mozilla/5.0 (compatible; CM4-Provisioning/1.0)"}
|
||
try:
|
||
req = urllib.request.Request(base + "/", headers=headers)
|
||
with urllib.request.urlopen(req, timeout=20) as r:
|
||
html = r.read().decode("utf-8", errors="ignore")
|
||
folders = re.findall(rf"{re.escape(slug)}-(\d{{4}}-\d{{2}}-\d{{2}})/", html)
|
||
if not folders:
|
||
return None
|
||
latest = sorted(folders)[-1]
|
||
folder_url = f"{base}/{slug}-{latest}/"
|
||
req2 = urllib.request.Request(folder_url, headers=headers)
|
||
with urllib.request.urlopen(req2, timeout=20) as r:
|
||
folder_html = r.read().decode("utf-8", errors="ignore")
|
||
m = re.search(r'href="([^"]+\.img\.xz)"', folder_html)
|
||
if not m:
|
||
return None
|
||
href = m.group(1)
|
||
if href.startswith("http://") or href.startswith("https://"):
|
||
return href
|
||
return folder_url.rstrip("/") + "/" + href.lstrip("/")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _load_cloudinit_templates():
|
||
try:
|
||
if CLOUDINIT_TEMPLATES_FILE.is_file():
|
||
with open(CLOUDINIT_TEMPLATES_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {"templates": []}
|
||
|
||
|
||
def _save_cloudinit_templates(data):
|
||
try:
|
||
CLOUDINIT_TEMPLATES_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(CLOUDINIT_TEMPLATES_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
@app.route("/api/build-cloudinit-status")
|
||
@require_admin
|
||
def api_build_cloudinit_status():
|
||
"""Return current build status (phase, message, output_name, error)."""
|
||
return jsonify(_build_status_read())
|
||
|
||
|
||
@app.route("/api/build-cloudinit", methods=["POST"])
|
||
@require_admin
|
||
def api_build_cloudinit():
|
||
"""Start building a cloud-init image: write request file; host runs build (has loop devices)."""
|
||
st = _build_status_read()
|
||
if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"):
|
||
return jsonify({"ok": False, "error": "A build is already in progress"}), 409
|
||
body = request.get_json(silent=True) or {}
|
||
variant = (body.get("variant") or "lite").strip().lower()
|
||
if variant not in ("lite", "full"):
|
||
variant = "lite"
|
||
_build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…")
|
||
url = _raspios_latest_url(variant)
|
||
if not url:
|
||
_build_status_write("idle", "", error="Could not resolve latest Raspios URL")
|
||
return jsonify({"ok": False, "error": "Could not resolve latest image URL"}), 503
|
||
user_data = body.get("user_data") or DEFAULT_USER_DATA
|
||
meta_data = body.get("meta_data") or DEFAULT_META_DATA
|
||
network_config = body.get("network_config") or DEFAULT_NETWORK_CONFIG
|
||
set_as_golden_after = bool(body.get("set_as_golden_after"))
|
||
try:
|
||
BUILD_REQUEST_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(BUILD_REQUEST_FILE, "w") as f:
|
||
json.dump({
|
||
"url": url,
|
||
"variant": variant,
|
||
"user_data": user_data,
|
||
"meta_data": meta_data,
|
||
"network_config": network_config,
|
||
"set_as_golden_after": set_as_golden_after,
|
||
}, f, indent=2)
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
_build_status_write("resolving", "Build requested; host will run download and inject (check status).")
|
||
return jsonify({"ok": True, "message": "Build started on host. Download and inject may take 15–45 min. Poll status or refresh the page."}), 202
|
||
|
||
|
||
@app.route("/api/raspios-latest-url")
|
||
@require_admin
|
||
def api_raspios_latest_url():
|
||
"""Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|full."""
|
||
variant = (request.args.get("variant") or "lite").strip().lower()
|
||
if variant not in ("lite", "full"):
|
||
variant = "lite"
|
||
url = _raspios_latest_url(variant)
|
||
if not url:
|
||
return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503
|
||
return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1], "variant": variant})
|
||
|
||
|
||
@app.route("/api/cloudinit-templates", methods=["GET"])
|
||
@require_admin
|
||
def api_cloudinit_templates_list():
|
||
"""List saved cloud-init templates."""
|
||
data = _load_cloudinit_templates()
|
||
return jsonify({"templates": data.get("templates", [])})
|
||
|
||
|
||
@app.route("/api/cloudinit-templates", methods=["POST"])
|
||
@require_admin
|
||
def api_cloudinit_templates_create():
|
||
"""Save a new cloud-init template."""
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
name = (body.get("name") or "").strip()
|
||
if not name:
|
||
return jsonify({"ok": False, "error": "name required"}), 400
|
||
data = _load_cloudinit_templates()
|
||
templates = data.get("templates", [])
|
||
tid = str(int(time.time() * 1000))
|
||
templates.append({
|
||
"id": tid,
|
||
"name": name,
|
||
"user_data": body.get("user_data", ""),
|
||
"meta_data": body.get("meta_data", ""),
|
||
"network_config": body.get("network_config", ""),
|
||
})
|
||
data["templates"] = templates
|
||
if not _save_cloudinit_templates(data):
|
||
return jsonify({"ok": False, "error": "Failed to save"}), 500
|
||
return jsonify({"ok": True, "id": tid, "name": name})
|
||
|
||
|
||
@app.route("/api/cloudinit-templates/<tid>")
|
||
@require_admin
|
||
def api_cloudinit_templates_get(tid):
|
||
"""Get one template by id."""
|
||
data = _load_cloudinit_templates()
|
||
for t in data.get("templates", []):
|
||
if t.get("id") == tid:
|
||
return jsonify(t)
|
||
return jsonify({"error": "not found"}), 404
|
||
|
||
|
||
@app.route("/api/cloudinit-templates/<tid>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_cloudinit_templates_delete(tid):
|
||
"""Delete a template."""
|
||
data = _load_cloudinit_templates()
|
||
templates = [t for t in data.get("templates", []) if t.get("id") != tid]
|
||
if len(templates) == len(data.get("templates", [])):
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
data["templates"] = templates
|
||
if not _save_cloudinit_templates(data):
|
||
return jsonify({"ok": False, "error": "Failed to save"}), 500
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/golden-info")
|
||
def api_golden_info():
|
||
"""Return whether golden image exists, size/mtime, and which file it points to (for UI)."""
|
||
if not GOLDEN_IMAGE.is_file():
|
||
return jsonify({"present": False})
|
||
try:
|
||
st = GOLDEN_IMAGE.stat()
|
||
src, src_name = _golden_current_source()
|
||
out = {"present": True, "size": st.st_size, "mtime": st.st_mtime}
|
||
if src and src_name:
|
||
out["source"] = src
|
||
out["name"] = src_name
|
||
return jsonify(out)
|
||
except OSError:
|
||
return jsonify({"present": False})
|
||
|
||
|
||
@app.route("/api/admin/users", methods=["GET"])
|
||
@require_admin
|
||
def api_admin_users():
|
||
return jsonify({"users": list_users()})
|
||
|
||
|
||
@app.route("/api/admin/users", methods=["POST"])
|
||
@require_admin
|
||
def api_admin_add_user():
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
username = (body.get("username") or "").strip()
|
||
password = (body.get("password") or "")
|
||
if not username:
|
||
return jsonify({"ok": False, "error": "username required"}), 400
|
||
if len(password) < 6:
|
||
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
|
||
if create_user(username, password):
|
||
admin_log("user_created", username)
|
||
return jsonify({"ok": True, "message": f"User {username} created"})
|
||
return jsonify({"ok": False, "error": "username already exists"}), 409
|
||
|
||
|
||
@app.route("/api/admin/users/<int:user_id>/password", methods=["POST"])
|
||
@require_admin
|
||
def api_admin_change_password(user_id):
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
new_password = body.get("password") or ""
|
||
if len(new_password) < 6:
|
||
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
|
||
conn = get_db()
|
||
row = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
|
||
conn.close()
|
||
if not row:
|
||
return jsonify({"ok": False, "error": "user not found"}), 404
|
||
change_password(user_id, new_password)
|
||
admin_log("password_changed", str(user_id))
|
||
return jsonify({"ok": True, "message": "Password updated"})
|
||
|
||
|
||
@app.route("/api/admin/logs")
|
||
@require_admin
|
||
def api_admin_logs():
|
||
limit = min(int(request.args.get("limit", 100)), 500)
|
||
return jsonify({"logs": get_recent_logs(limit)})
|
||
|
||
|
||
# Ensure DB exists when app is loaded (e.g. by gunicorn or systemd)
|
||
try:
|
||
init_db()
|
||
except Exception:
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=5000, debug=False)
|