<message>Eliminate network boot options from the dashboard, including API endpoints and UI elements, to streamline the provisioning process for USB boot only. Update messages and documentation to reflect the removal of network boot functionality, ensuring clarity for users. Adjust the cloud-init build process and related templates to focus solely on USB boot mode, enhancing the overall user experience and simplifying the workflow.
1767 lines
67 KiB
Python
1767 lines
67 KiB
Python
#!/usr/bin/env python3
|
||
# Revision: 2
|
||
"""
|
||
Flask dashboard for CM4 eMMC provisioning.
|
||
Public home: deploy only (status, logs, how to connect). No login.
|
||
Admin: login required — backups, cloud-init, portal files, set golden, users.
|
||
"""
|
||
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import sqlite3
|
||
import subprocess
|
||
import tempfile
|
||
import time
|
||
import urllib.request
|
||
from functools import wraps
|
||
from pathlib import Path
|
||
|
||
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, session, Response, stream_with_context
|
||
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
|
||
try:
|
||
from itsdangerous import BadSignature
|
||
except ImportError:
|
||
BadSignature = Exception # noqa: disable invalid-name
|
||
|
||
app = Flask(__name__)
|
||
# Set CM4_DASHBOARD_SECRET_KEY in env (e.g. via /opt/cm4-provisioning/dashboard.env) so sessions persist across restarts
|
||
app.secret_key = os.environ.get("CM4_DASHBOARD_SECRET_KEY", os.urandom(24).hex())
|
||
|
||
# --- Paths ---
|
||
BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning"))
|
||
STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json"))
|
||
LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log"))
|
||
ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request"))
|
||
DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source"))
|
||
BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups")))
|
||
CLOUDINIT_IMAGES_DIR = Path(os.environ.get("CM4_CLOUDINIT_IMAGES_DIR", str(BASE_DIR / "cloudinit-images")))
|
||
PORTAL_FILES_DIR = Path(os.environ.get("CM4_PORTAL_FILES_DIR", str(BASE_DIR / "portal-files")))
|
||
GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img")))
|
||
NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json")))
|
||
BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json")))
|
||
BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR / "build_cloudinit_request.json")))
|
||
BUILD_CANCEL_FILE = BUILD_REQUEST_FILE.parent / "build_cloudinit_cancel"
|
||
SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json")))
|
||
SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json")))
|
||
FIRST_BOOT_STATUS_FILE = Path(os.environ.get("CM4_FIRST_BOOT_STATUS_FILE", str(BASE_DIR / "first_boot_status.json")))
|
||
CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json")))
|
||
PORTAL_DESCRIPTIONS_FILE = Path(os.environ.get("CM4_PORTAL_DESCRIPTIONS_FILE", str(BASE_DIR / "portal_descriptions.json")))
|
||
DB_PATH = Path(os.environ.get("CM4_DASHBOARD_DB", str(BASE_DIR / "dashboard.db")))
|
||
DHCP_LEASES_FILE = os.environ.get("CM4_DHCP_LEASES_FILE", "/var/lib/misc/dnsmasq.leases")
|
||
# EEPROM update (USB boot): tools and output files (shared with host script)
|
||
EEPROM_DIR = Path(os.environ.get("CM4_EEPROM_DIR", "/opt/cm4-provisioning/eeprom"))
|
||
EEPROM_CONFIG_TOOL = EEPROM_DIR / "rpi-eeprom-config"
|
||
EEPROM_FW_FILE = EEPROM_DIR / "pieeprom.bin"
|
||
EEPROM_UPD_FILE = BASE_DIR / "pieeprom.upd"
|
||
EEPROM_SIG_FILE = BASE_DIR / "pieeprom.sig"
|
||
|
||
|
||
# --- Database (admin users + activity logs) ---
|
||
def get_db():
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def init_db():
|
||
os.makedirs(DB_PATH.parent, exist_ok=True)
|
||
conn = get_db()
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS users (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
username TEXT UNIQUE NOT NULL,
|
||
password_hash TEXT NOT NULL,
|
||
created_at REAL NOT NULL
|
||
);
|
||
CREATE TABLE IF NOT EXISTS admin_logs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
user_id INTEGER,
|
||
action TEXT NOT NULL,
|
||
details TEXT,
|
||
created_at REAL NOT NULL,
|
||
FOREIGN KEY (user_id) REFERENCES users(id)
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_logs_created ON admin_logs(created_at DESC);
|
||
""")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def admin_log(action, details=None):
|
||
user_id = session.get("user_id")
|
||
conn = get_db()
|
||
conn.execute(
|
||
"INSERT INTO admin_logs (user_id, action, details, created_at) VALUES (?, ?, ?, ?)",
|
||
(user_id, action, details, time.time()),
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def require_admin(f):
|
||
@wraps(f)
|
||
def wrapped(*args, **kwargs):
|
||
try:
|
||
logged_in = session.get("admin_logged_in")
|
||
except (BadSignature, ValueError, OSError):
|
||
# Invalid/rotated secret key or corrupted session cookie → treat as not logged in
|
||
try:
|
||
session.clear()
|
||
except Exception:
|
||
pass
|
||
logged_in = False
|
||
if not logged_in:
|
||
if request.is_json or request.path.startswith("/api/"):
|
||
return jsonify({"ok": False, "error": "Login required"}), 401
|
||
return redirect(url_for("login", next=request.url))
|
||
return f(*args, **kwargs)
|
||
return wrapped
|
||
|
||
|
||
def get_user_by_username(username):
|
||
conn = get_db()
|
||
row = conn.execute("SELECT id, username, password_hash FROM users WHERE username = ?", (username,)).fetchone()
|
||
conn.close()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def create_user(username, password):
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
|
||
(username, generate_password_hash(password), time.time()),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
except sqlite3.IntegrityError:
|
||
return False
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def list_users():
|
||
conn = get_db()
|
||
rows = conn.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall()
|
||
conn.close()
|
||
return [{"id": r["id"], "username": r["username"], "created_at": r["created_at"]} for r in rows]
|
||
|
||
|
||
def change_password(user_id, new_password):
|
||
conn = get_db()
|
||
conn.execute(
|
||
"UPDATE users SET password_hash = ? WHERE id = ?",
|
||
(generate_password_hash(new_password), user_id),
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def get_recent_logs(limit=100):
|
||
conn = get_db()
|
||
rows = conn.execute(
|
||
"SELECT l.id, l.action, l.details, l.created_at, u.username FROM admin_logs l LEFT JOIN users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT ?",
|
||
(limit,),
|
||
).fetchall()
|
||
conn.close()
|
||
return [{"id": r["id"], "action": r["action"], "details": r["details"], "created_at": r["created_at"], "username": r["username"]} for r in rows]
|
||
|
||
|
||
@app.after_request
|
||
def no_cache(response):
|
||
if request.path == "/" or request.path.startswith("/api/") or request.path.startswith("/admin") or request.path.startswith("/login"):
|
||
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
||
response.headers["Pragma"] = "no-cache"
|
||
return response
|
||
|
||
|
||
@app.errorhandler(500)
|
||
def handle_500(err):
|
||
"""Log 500 and return a simple response so the user is not left with a blank error."""
|
||
import traceback
|
||
try:
|
||
traceback.print_exc()
|
||
except Exception:
|
||
pass
|
||
if request.path.startswith("/api/"):
|
||
return jsonify({"ok": False, "error": "Internal server error"}), 500
|
||
return (
|
||
"<!DOCTYPE html><html><head><meta charset='utf-8'><title>Error</title></head><body>"
|
||
"<h1>Something went wrong</h1><p>Try <a href='/login'>logging in again</a> or <a href='/'>going home</a>.</p>"
|
||
"</body></html>",
|
||
500,
|
||
{"Content-Type": "text/html; charset=utf-8"},
|
||
)
|
||
|
||
|
||
# Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition)
|
||
DEFAULT_USER_DATA = """#cloud-config
|
||
package_update: true
|
||
package_upgrade: false
|
||
packages:
|
||
- curl
|
||
|
||
runcmd:
|
||
- curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh
|
||
- chmod +x /tmp/bootstrap.sh
|
||
- /tmp/bootstrap.sh
|
||
"""
|
||
|
||
DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001
|
||
local-hostname: gnss.guard
|
||
"""
|
||
|
||
DEFAULT_NETWORK_CONFIG = """version: 2
|
||
ethernets:
|
||
eth0:
|
||
dhcp4: true
|
||
"""
|
||
|
||
DEFAULT_STATUS = {
|
||
"phase": "idle",
|
||
"message": "Waiting for reTerminal in USB boot mode.",
|
||
"progress": None,
|
||
"updated": None,
|
||
}
|
||
|
||
|
||
def _write_status(phase, message, progress=None):
|
||
try:
|
||
os.makedirs(os.path.dirname(STATUS_FILE) or ".", exist_ok=True)
|
||
with open(STATUS_FILE, "w") as f:
|
||
json.dump({"phase": phase, "message": message, "progress": progress, "updated": time.time()}, f)
|
||
except (PermissionError, OSError):
|
||
pass
|
||
|
||
|
||
def _generate_eeprom_update(boot_order, extra_settings=None):
|
||
"""Generate pieeprom.upd and pieeprom.sig in BASE_DIR. Returns (True, None) or (False, error_message)."""
|
||
extra_settings = extra_settings or {}
|
||
if not EEPROM_CONFIG_TOOL.is_file() or not EEPROM_FW_FILE.is_file():
|
||
return False, "EEPROM tools not installed. Run install-eeprom-tools-on-lxc.sh on the LXC."
|
||
try:
|
||
os.makedirs(BASE_DIR, exist_ok=True)
|
||
# Read current config from firmware image
|
||
out = subprocess.run(
|
||
[str(EEPROM_CONFIG_TOOL), str(EEPROM_FW_FILE)],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=30,
|
||
cwd=str(EEPROM_DIR),
|
||
)
|
||
if out.returncode != 0:
|
||
return False, (out.stderr or out.stdout or "rpi-eeprom-config failed").strip()[:500]
|
||
lines = (out.stdout or "").strip().splitlines()
|
||
# Strip lines we replace
|
||
skip_keys = {"BOOT_ORDER", "NET_BOOT_MAX_RETRIES", "DHCP_TIMEOUT", "DHCP_REQ_TIMEOUT", "TFTP_IP", "NET_INSTALL_AT_POWER_ON"}
|
||
new_lines = []
|
||
for line in lines:
|
||
key = line.split("=", 1)[0].strip() if "=" in line else ""
|
||
if key in skip_keys:
|
||
continue
|
||
new_lines.append(line)
|
||
# Add our settings
|
||
new_lines.append(f"BOOT_ORDER={boot_order}")
|
||
new_lines.append("NET_BOOT_MAX_RETRIES=3")
|
||
new_lines.append("DHCP_TIMEOUT=1500")
|
||
new_lines.append("DHCP_REQ_TIMEOUT=500")
|
||
new_lines.append("NET_INSTALL_AT_POWER_ON=0")
|
||
for k, v in extra_settings.items():
|
||
new_lines.append(f"{k}={v}")
|
||
with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f:
|
||
f.write("\n".join(new_lines) + "\n")
|
||
conf_path = f.name
|
||
try:
|
||
out2 = subprocess.run(
|
||
[str(EEPROM_CONFIG_TOOL), "--config", conf_path, "--out", str(EEPROM_UPD_FILE), str(EEPROM_FW_FILE)],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=60,
|
||
cwd=str(EEPROM_DIR),
|
||
)
|
||
if out2.returncode != 0:
|
||
return False, (out2.stderr or out2.stdout or "rpi-eeprom-config --config failed").strip()[:500]
|
||
if not EEPROM_UPD_FILE.is_file() or EEPROM_UPD_FILE.stat().st_size == 0:
|
||
return False, "Generated pieeprom.upd is missing or empty"
|
||
# Write signature (sha256 hex)
|
||
h = hashlib.sha256()
|
||
with open(EEPROM_UPD_FILE, "rb") as f:
|
||
h.update(f.read())
|
||
EEPROM_SIG_FILE.write_text(h.hexdigest())
|
||
return True, None
|
||
finally:
|
||
try:
|
||
os.unlink(conf_path)
|
||
except OSError:
|
||
pass
|
||
except subprocess.TimeoutExpired:
|
||
return False, "Timeout running rpi-eeprom-config"
|
||
except (PermissionError, OSError) as e:
|
||
return False, str(e)
|
||
|
||
|
||
def read_status():
|
||
try:
|
||
with open(STATUS_FILE, "r") as f:
|
||
data = json.load(f)
|
||
out = {**DEFAULT_STATUS, **data}
|
||
if out.get("phase") == "waiting_choice":
|
||
try:
|
||
with open(DEVICE_SOURCE_FILE, "r") as sf:
|
||
out["device_source"] = (sf.read() or "").strip() or "usb"
|
||
except (FileNotFoundError, OSError):
|
||
out["device_source"] = "usb"
|
||
return out
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return DEFAULT_STATUS
|
||
|
||
|
||
def read_log_tail(lines=50):
|
||
try:
|
||
with open(LOG_FILE, "r") as f:
|
||
all_lines = f.readlines()
|
||
return "".join(all_lines[-lines:]).strip() if all_lines else ""
|
||
except (FileNotFoundError, PermissionError):
|
||
return ""
|
||
|
||
|
||
def _load_network_devices():
|
||
try:
|
||
if NETWORK_DEVICES_FILE.is_file():
|
||
with open(NETWORK_DEVICES_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {"devices": []}
|
||
|
||
|
||
def _save_network_devices(data):
|
||
try:
|
||
os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True)
|
||
with open(NETWORK_DEVICES_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except (PermissionError, OSError):
|
||
return False
|
||
|
||
|
||
BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json"
|
||
CLOUDINIT_META_FILE = CLOUDINIT_IMAGES_DIR / "cloudinit_meta.json"
|
||
|
||
|
||
def _load_backups_meta():
|
||
try:
|
||
if BACKUPS_META_FILE.is_file():
|
||
with open(BACKUPS_META_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {}
|
||
|
||
|
||
def _save_backups_meta(data):
|
||
try:
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
with open(BACKUPS_META_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except (PermissionError, OSError):
|
||
return False
|
||
|
||
|
||
def _safe_backup_name(name):
|
||
"""Reject path traversal and ensure it's a backup filename we manage."""
|
||
if not name or ".." in name or "/" in name or "\\" in name:
|
||
return False
|
||
if not name.endswith((".img", ".img.gz", ".img.xz")):
|
||
return False
|
||
return True
|
||
|
||
|
||
def _load_cloudinit_meta():
|
||
try:
|
||
if CLOUDINIT_META_FILE.is_file():
|
||
with open(CLOUDINIT_META_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {}
|
||
|
||
|
||
def _save_cloudinit_meta(data):
|
||
try:
|
||
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||
with open(CLOUDINIT_META_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except (PermissionError, OSError):
|
||
return False
|
||
|
||
|
||
def list_backups():
|
||
if not BACKUPS_DIR.is_dir():
|
||
return []
|
||
meta = _load_backups_meta()
|
||
out = []
|
||
for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
||
if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
|
||
try:
|
||
st = p.stat()
|
||
m = meta.get(p.name, {})
|
||
out.append({
|
||
"name": p.name,
|
||
"display_name": m.get("name") or p.name,
|
||
"description": m.get("description") or "",
|
||
"size": st.st_size,
|
||
"mtime": st.st_mtime,
|
||
})
|
||
except OSError:
|
||
pass
|
||
return out
|
||
|
||
|
||
def list_cloudinit_images():
|
||
if not CLOUDINIT_IMAGES_DIR.is_dir():
|
||
return []
|
||
meta = _load_cloudinit_meta()
|
||
out = []
|
||
for p in sorted(CLOUDINIT_IMAGES_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
||
if p.is_file() and p.name != "cloudinit_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
|
||
try:
|
||
st = p.stat()
|
||
m = meta.get(p.name, {})
|
||
out.append({
|
||
"name": p.name,
|
||
"display_name": m.get("name") or p.name,
|
||
"description": m.get("description") or "",
|
||
"size": st.st_size,
|
||
"mtime": st.st_mtime,
|
||
})
|
||
except OSError:
|
||
pass
|
||
return out
|
||
|
||
|
||
def _golden_current_source():
|
||
"""Return (source, name) if golden is a symlink to backups or cloudinit, else (None, None)."""
|
||
if not GOLDEN_IMAGE.exists():
|
||
return None, None
|
||
try:
|
||
target = GOLDEN_IMAGE.resolve()
|
||
try:
|
||
target.relative_to(BACKUPS_DIR.resolve())
|
||
return "backups", target.name
|
||
except ValueError:
|
||
pass
|
||
try:
|
||
target.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
|
||
return "cloudinit", target.name
|
||
except ValueError:
|
||
pass
|
||
except OSError:
|
||
pass
|
||
return None, None
|
||
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return render_template("home.html")
|
||
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "GET":
|
||
if session.get("admin_logged_in"):
|
||
return redirect(url_for("admin"))
|
||
return render_template("login.html")
|
||
username = (request.form.get("username") or "").strip()
|
||
password = (request.form.get("password") or "")
|
||
if not username:
|
||
return render_template("login.html", error="Username required")
|
||
user = get_user_by_username(username)
|
||
if not user:
|
||
# First user: allow self-registration
|
||
if not list_users():
|
||
if not password or len(password) < 6:
|
||
return render_template("login.html", error="First user: choose a password (min 6 characters)")
|
||
create_user(username, password)
|
||
session["admin_logged_in"] = True
|
||
session["user_id"] = get_user_by_username(username)["id"]
|
||
session["username"] = username
|
||
admin_log("first_admin_created", username)
|
||
return redirect(request.args.get("next") or url_for("admin"))
|
||
return render_template("login.html", error="Invalid username or password")
|
||
if not check_password_hash(user["password_hash"], password):
|
||
return render_template("login.html", error="Invalid username or password")
|
||
session["admin_logged_in"] = True
|
||
session["user_id"] = user["id"]
|
||
session["username"] = user["username"]
|
||
admin_log("login", username)
|
||
return redirect(request.args.get("next") or url_for("admin"))
|
||
|
||
|
||
@app.route("/logout")
|
||
def logout():
|
||
if session.get("admin_logged_in"):
|
||
admin_log("logout", session.get("username"))
|
||
session.clear()
|
||
return redirect(url_for("index"))
|
||
|
||
|
||
@app.route("/admin")
|
||
@require_admin
|
||
def admin():
|
||
return render_template("admin.html", username=session.get("username", "Admin"))
|
||
|
||
|
||
@app.route("/admin/portal-files")
|
||
@require_admin
|
||
def admin_portal_files():
|
||
return render_template("portal_files.html", username=session.get("username", "Admin"))
|
||
|
||
|
||
@app.route("/admin/cloudinit-build")
|
||
@require_admin
|
||
def admin_cloudinit_build():
|
||
return render_template("cloudinit_build.html", username=session.get("username", "Admin"))
|
||
|
||
|
||
# Serve portal files for wget (e.g. cloud-init first boot). No auth.
|
||
# Subpaths allowed (e.g. first-boot/splash.png); ".." and "\\" forbidden to prevent traversal.
|
||
@app.route("/files/<path:filename>")
|
||
def serve_portal_file(filename):
|
||
if ".." in filename or "\\" in filename:
|
||
return jsonify({"error": "invalid path"}), 400
|
||
path = (PORTAL_FILES_DIR / filename).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"error": "invalid path"}), 400
|
||
if not path.is_file():
|
||
return jsonify({"error": "not found"}), 404
|
||
return send_file(path, as_attachment=False, download_name=filename.split("/")[-1])
|
||
|
||
|
||
@app.route("/api/portal-files-debug")
|
||
def api_portal_files_debug():
|
||
"""No-auth debug: what PORTAL_FILES_DIR the process sees (for troubleshooting)."""
|
||
try:
|
||
names = []
|
||
if PORTAL_FILES_DIR.is_dir():
|
||
for p in sorted(PORTAL_FILES_DIR.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
|
||
if ".." in p.name or p.name.startswith("."):
|
||
continue
|
||
names.append({"name": p.name, "type": "dir" if p.is_dir() else "file"})
|
||
return jsonify({
|
||
"portal_files_dir": str(PORTAL_FILES_DIR),
|
||
"exists": PORTAL_FILES_DIR.exists(),
|
||
"is_dir": PORTAL_FILES_DIR.is_dir(),
|
||
"items": names,
|
||
"CM4_PROVISIONING_DIR": os.environ.get("CM4_PROVISIONING_DIR"),
|
||
})
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/status")
|
||
def api_status():
|
||
return jsonify(read_status())
|
||
|
||
|
||
@app.route("/api/status-clear", methods=["POST"])
|
||
def api_status_clear():
|
||
"""Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again)."""
|
||
try:
|
||
with open(STATUS_FILE, "w") as f:
|
||
json.dump({
|
||
"phase": "idle",
|
||
"message": DEFAULT_STATUS["message"],
|
||
"progress": None,
|
||
"updated": None,
|
||
}, f)
|
||
return jsonify({"ok": True})
|
||
except (PermissionError, OSError):
|
||
return jsonify({"ok": False, "error": "Could not write status"}), 500
|
||
|
||
|
||
def _first_boot_status_read():
|
||
"""Read first-boot status (device reports progress during first-boot.sh)."""
|
||
try:
|
||
with open(FIRST_BOOT_STATUS_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return {"phase": "idle", "message": "", "step": "", "step_name": "", "hostname": "", "ip": "", "updated": None}
|
||
|
||
|
||
def _first_boot_status_write(phase, message="", step="", step_name="", hostname="", ip="", error=None):
|
||
"""Write first-boot status (called by POST from device)."""
|
||
try:
|
||
os.makedirs(os.path.dirname(FIRST_BOOT_STATUS_FILE) or ".", exist_ok=True)
|
||
data = {
|
||
"phase": phase,
|
||
"message": message,
|
||
"step": step,
|
||
"step_name": step_name,
|
||
"hostname": hostname,
|
||
"ip": ip or "",
|
||
"updated": time.time(),
|
||
}
|
||
if error:
|
||
data["error"] = error
|
||
with open(FIRST_BOOT_STATUS_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
except (PermissionError, OSError):
|
||
pass
|
||
|
||
|
||
@app.route("/api/first-boot-status", methods=["GET"])
|
||
def api_first_boot_status_get():
|
||
"""Return current first-boot progress (for dashboard to poll)."""
|
||
return jsonify(_first_boot_status_read())
|
||
|
||
|
||
@app.route("/api/first-boot-status", methods=["POST"])
|
||
def api_first_boot_status_post():
|
||
"""Called by device during first-boot.sh to report progress. No auth (device on local net)."""
|
||
body = request.get_json(silent=True) or {}
|
||
phase = (body.get("phase") or "running").strip().lower()
|
||
if phase not in ("started", "running", "done", "error"):
|
||
phase = "running"
|
||
message = (body.get("message") or "").strip()[:500]
|
||
step = (body.get("step") or "").strip()[:10]
|
||
step_name = (body.get("step_name") or "").strip()[:80]
|
||
hostname = (body.get("hostname") or "").strip()[:64]
|
||
ip = (body.get("ip") or "").strip()[:45]
|
||
error = (body.get("error") or "").strip()[:500] if phase == "error" else None
|
||
_first_boot_status_write(phase=phase, message=message, step=step, step_name=step_name, hostname=hostname, ip=ip, error=error)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/log")
|
||
def api_log():
|
||
return jsonify({"log": read_log_tail()})
|
||
|
||
|
||
@app.route("/api/eeprom-presets")
|
||
def api_eeprom_presets():
|
||
"""Return available boot order presets for the Update EEPROM dropdown (eMMC only; network boot removed from portal)."""
|
||
presets = [
|
||
{"id": "0x1", "label": "eMMC only"},
|
||
]
|
||
return jsonify({"presets": presets})
|
||
|
||
|
||
@app.route("/api/pending-devices")
|
||
def api_pending_devices():
|
||
"""Returns USB device if waiting_choice. Network boot removed from portal; DHCP still provides internet."""
|
||
st = read_status()
|
||
usb = None
|
||
if st.get("phase") == "waiting_choice":
|
||
usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")}
|
||
return jsonify({"usb": usb, "network": []})
|
||
|
||
|
||
@app.route("/api/device-action", methods=["POST"])
|
||
def api_device_action():
|
||
"""User chose Backup, Deploy, or Update EEPROM for a USB device. Network boot removed from portal."""
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
source = (body.get("source") or "").strip().lower()
|
||
action = (body.get("action") or "").strip().lower()
|
||
if source != "usb":
|
||
return jsonify({"ok": False, "error": "Only USB boot is supported"}), 400
|
||
if action not in ("backup", "deploy", "eeprom_update"):
|
||
return jsonify({"ok": False, "error": "action must be 'backup', 'deploy', or 'eeprom_update'"}), 400
|
||
if action == "eeprom_update":
|
||
boot_order = (body.get("boot_order") or "0x1").strip().lower()
|
||
if boot_order != "0x1":
|
||
return jsonify({"ok": False, "error": "boot_order must be 0x1 (eMMC only)"}), 400
|
||
ok, err = _generate_eeprom_update(boot_order)
|
||
if not ok:
|
||
return jsonify({"ok": False, "error": err or "Failed to generate EEPROM update"}), 500
|
||
try:
|
||
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
|
||
with open(ACTION_REQUEST_FILE, "w") as f:
|
||
f.write("eeprom_update")
|
||
_write_status("eeprom_update", "EEPROM update ready; host will write to device boot partition.")
|
||
return jsonify({"ok": True})
|
||
except (PermissionError, OSError):
|
||
return jsonify({"ok": False, "error": "Could not write action file"}), 500
|
||
try:
|
||
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
|
||
if action == "backup" and body.get("shrink"):
|
||
try:
|
||
(BASE_DIR / "shrink_next_backup").write_text("1")
|
||
except (PermissionError, OSError):
|
||
pass
|
||
with open(ACTION_REQUEST_FILE, "w") as f:
|
||
f.write(action)
|
||
if action == "deploy":
|
||
_first_boot_status_write("idle", "", hostname="", ip="")
|
||
return jsonify({"ok": True})
|
||
except (PermissionError, OSError):
|
||
return jsonify({"ok": False, "error": "Could not write action file"}), 500
|
||
|
||
|
||
@app.route("/api/register-device", methods=["POST"])
|
||
def api_register_device():
|
||
"""Legacy: network boot removed from portal; accept registration but do not list devices."""
|
||
body = request.get_json(force=True, silent=True) or request.form
|
||
if not (body.get("mac") or "").strip():
|
||
return jsonify({"ok": False, "error": "mac required"}), 400
|
||
return jsonify({"ok": True, "message": "registered"})
|
||
|
||
|
||
@app.route("/api/action-done", methods=["POST"])
|
||
def api_action_done():
|
||
"""Legacy: network boot removed from portal; acknowledge completion without toggling DHCP."""
|
||
mac = request.args.get("mac") or ((request.get_json(silent=True) or {}).get("mac") or "")
|
||
if mac:
|
||
data = _load_network_devices()
|
||
data["devices"] = [d for d in data.get("devices", []) if (d.get("mac") or "").lower() != mac.lower()]
|
||
_save_network_devices(data)
|
||
_write_status("done", f"Done ({mac or 'device'}).")
|
||
return jsonify({"ok": True, "message": "Done"})
|
||
|
||
|
||
def _read_dhcp_leases():
|
||
"""Read dnsmasq lease file. Returns (leases_list, error_string). leases_list items: {expiry, mac, ip, hostname}."""
|
||
if not DHCP_LEASES_FILE or not os.path.isfile(DHCP_LEASES_FILE):
|
||
return [], None
|
||
try:
|
||
leases = []
|
||
with open(DHCP_LEASES_FILE, "r") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
parts = line.split()
|
||
if len(parts) >= 4:
|
||
leases.append({
|
||
"expiry": int(parts[0]) if parts[0].isdigit() else 0,
|
||
"mac": parts[1],
|
||
"ip": parts[2],
|
||
"hostname": parts[3] if len(parts) > 3 else "",
|
||
})
|
||
return leases, None
|
||
except (OSError, PermissionError) as e:
|
||
return [], str(e)
|
||
|
||
|
||
@app.route("/api/dhcp-leases")
|
||
def api_dhcp_leases():
|
||
"""Return current DHCP leases from dnsmasq (when dashboard runs on LXC with dnsmasq)."""
|
||
leases, err = _read_dhcp_leases()
|
||
if err:
|
||
return jsonify({"leases": [], "error": err})
|
||
return jsonify({"leases": leases, "error": None})
|
||
|
||
|
||
@app.route("/api/device-action-poll")
|
||
def api_device_action_poll():
|
||
"""Network device polls this to get its assigned action (deploy/backup) and URL."""
|
||
mac = (request.args.get("mac") or "").strip()
|
||
if not mac:
|
||
return jsonify({"action": "wait"}), 200
|
||
data = _load_network_devices()
|
||
base = request.host_url.rstrip("/")
|
||
for d in data.get("devices", []):
|
||
if (d.get("mac") or "").lower() == mac.lower():
|
||
action = d.get("action") or "wait"
|
||
if action == "deploy":
|
||
return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"})
|
||
if action == "backup":
|
||
return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"})
|
||
if action == "reboot":
|
||
return jsonify({"action": "reboot"})
|
||
return jsonify({"action": "wait"})
|
||
return jsonify({"action": "wait"})
|
||
|
||
|
||
@app.route("/api/golden-image")
|
||
def api_golden_image():
|
||
"""Stream the golden image for network deploy (device pulls and writes to eMMC). Decompresses .img.xz/.img.gz on the fly."""
|
||
if not GOLDEN_IMAGE.is_file():
|
||
return jsonify({"error": "Golden image not found"}), 404
|
||
resolved = GOLDEN_IMAGE.resolve()
|
||
if str(resolved).endswith(".img.xz"):
|
||
def stream():
|
||
proc = subprocess.Popen(
|
||
["xz", "-d", "-c", str(resolved)],
|
||
stdout=subprocess.PIPE,
|
||
bufsize=65536,
|
||
)
|
||
try:
|
||
while True:
|
||
chunk = proc.stdout.read(65536)
|
||
if not chunk:
|
||
break
|
||
yield chunk
|
||
finally:
|
||
proc.wait()
|
||
return Response(
|
||
stream_with_context(stream()),
|
||
mimetype="application/octet-stream",
|
||
headers={"Content-Disposition": "attachment; filename=golden.img"},
|
||
)
|
||
if str(resolved).endswith(".img.gz"):
|
||
def stream():
|
||
proc = subprocess.Popen(
|
||
["gzip", "-c", "-d", str(resolved)],
|
||
stdout=subprocess.PIPE,
|
||
bufsize=65536,
|
||
)
|
||
try:
|
||
while True:
|
||
chunk = proc.stdout.read(65536)
|
||
if not chunk:
|
||
break
|
||
yield chunk
|
||
finally:
|
||
proc.wait()
|
||
return Response(
|
||
stream_with_context(stream()),
|
||
mimetype="application/octet-stream",
|
||
headers={"Content-Disposition": "attachment; filename=golden.img"},
|
||
)
|
||
return send_file(
|
||
GOLDEN_IMAGE,
|
||
mimetype="application/octet-stream",
|
||
as_attachment=True,
|
||
download_name="golden.img",
|
||
)
|
||
|
||
|
||
@app.route("/api/backup-upload", methods=["POST"])
|
||
def api_backup_upload():
|
||
"""Network device uploads its eMMC backup (raw body)."""
|
||
mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20]
|
||
if not mac:
|
||
return jsonify({"error": "mac query param required"}), 400
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
name = f"backup-net-{mac}-{int(time.time())}.img"
|
||
path = BACKUPS_DIR / name
|
||
try:
|
||
with open(path, "wb") as f:
|
||
while True:
|
||
chunk = request.stream.read(1024 * 1024)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
return jsonify({"ok": True, "file": name})
|
||
except (OSError, IOError) as e:
|
||
if path.exists():
|
||
path.unlink(missing_ok=True)
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/backups")
|
||
@require_admin
|
||
def api_backups():
|
||
return jsonify({"backups": list_backups(), "backups_dir": str(BACKUPS_DIR)})
|
||
|
||
|
||
@app.route("/api/cloudinit-images")
|
||
@require_admin
|
||
def api_cloudinit_images():
|
||
return jsonify({"images": list_cloudinit_images(), "cloudinit_dir": str(CLOUDINIT_IMAGES_DIR)})
|
||
|
||
|
||
def _load_portal_descriptions():
|
||
"""Return dict mapping path -> description (path can be file or folder)."""
|
||
if not PORTAL_DESCRIPTIONS_FILE.is_file():
|
||
return {}
|
||
try:
|
||
with open(PORTAL_DESCRIPTIONS_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
return {}
|
||
|
||
|
||
def _save_portal_descriptions(descriptions):
|
||
try:
|
||
PORTAL_DESCRIPTIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(PORTAL_DESCRIPTIONS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(descriptions, f, indent=2)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
def _portal_files_list_impl(subpath):
|
||
"""Shared impl for listing portal files. Returns (items, descriptions, base_url, current_path)."""
|
||
base_url = request.host_url.rstrip("/") + "/files/"
|
||
empty_items = []
|
||
if ".." in subpath or "\\" in subpath:
|
||
return empty_items, {}, base_url, subpath
|
||
if not PORTAL_FILES_DIR.is_dir():
|
||
try:
|
||
PORTAL_FILES_DIR.mkdir(parents=True, exist_ok=True)
|
||
except OSError:
|
||
pass
|
||
if not PORTAL_FILES_DIR.is_dir():
|
||
return empty_items, {}, base_url, subpath
|
||
list_dir = (PORTAL_FILES_DIR / subpath).resolve() if subpath else PORTAL_FILES_DIR
|
||
try:
|
||
list_dir.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return empty_items, {}, base_url, subpath
|
||
if not list_dir.is_dir():
|
||
return empty_items, {}, base_url, subpath
|
||
items = []
|
||
for p in sorted(list_dir.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
|
||
if ".." in p.name or p.name.startswith("."):
|
||
continue
|
||
rel = (subpath + "/" + p.name) if subpath else p.name
|
||
try:
|
||
if p.is_dir():
|
||
items.append({"type": "folder", "path": rel, "name": p.name})
|
||
else:
|
||
items.append({"type": "file", "path": rel, "name": p.name, "size": p.stat().st_size, "mtime": p.stat().st_mtime})
|
||
except OSError:
|
||
pass
|
||
descriptions = _load_portal_descriptions()
|
||
return items, descriptions, base_url, subpath
|
||
|
||
|
||
@app.route("/api/portal-files")
|
||
def api_portal_files_list():
|
||
"""List one level: root or contents of path=... (folders and files). ?debug=1 allows unauthenticated read-only list."""
|
||
subpath = request.args.get("path", "").strip().strip("/")
|
||
debug = request.args.get("debug") == "1"
|
||
if not debug and not session.get("admin_logged_in"):
|
||
if request.is_json or request.path.startswith("/api/"):
|
||
return jsonify({"ok": False, "error": "Login required"}), 401
|
||
return redirect(url_for("login", next=request.url))
|
||
items, descriptions, base_url, subpath = _portal_files_list_impl(subpath)
|
||
resp = jsonify({
|
||
"items": items,
|
||
"base_url": base_url,
|
||
"descriptions": descriptions,
|
||
"current_path": subpath,
|
||
"portal_files_dir": str(PORTAL_FILES_DIR),
|
||
})
|
||
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
|
||
return resp
|
||
|
||
|
||
@app.route("/api/portal-files/descriptions", methods=["GET", "PATCH"])
|
||
@require_admin
|
||
def api_portal_descriptions():
|
||
if request.method == "PATCH":
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
desc = data.get("descriptions")
|
||
if not isinstance(desc, dict):
|
||
return jsonify({"ok": False, "error": "descriptions must be a dict"}), 400
|
||
if not _save_portal_descriptions(desc):
|
||
return jsonify({"ok": False, "error": "save failed"}), 500
|
||
admin_log("portal_descriptions", "updated")
|
||
return jsonify({"ok": True})
|
||
return jsonify({"descriptions": _load_portal_descriptions()})
|
||
|
||
|
||
@app.route("/api/portal-files/folder", methods=["POST"])
|
||
@require_admin
|
||
def api_portal_folder_create():
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
path = (data.get("path") or "").strip().strip("/")
|
||
if not path:
|
||
return jsonify({"ok": False, "error": "path required"}), 400
|
||
if ".." in path or "\\" in path:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
full = (PORTAL_FILES_DIR / path).resolve()
|
||
try:
|
||
full.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
try:
|
||
full.mkdir(parents=True, exist_ok=True)
|
||
admin_log("portal_folder_create", path)
|
||
return jsonify({"ok": True, "path": path})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/portal-files/upload", methods=["POST"])
|
||
@require_admin
|
||
def api_portal_files_upload():
|
||
if "file" not in request.files and "upload" not in request.files:
|
||
return jsonify({"ok": False, "error": "no file (use field 'file' or 'upload')"}), 400
|
||
f = request.files.get("file") or request.files.get("upload")
|
||
if not f or not f.filename:
|
||
return jsonify({"ok": False, "error": "no file selected"}), 400
|
||
base_name = re.sub(r"[^\w\-./]", "_", f.filename)[:120] or "upload"
|
||
if ".." in base_name or base_name.startswith("/"):
|
||
return jsonify({"ok": False, "error": "invalid filename"}), 400
|
||
subpath = (request.form.get("path") or "").strip().strip("/")
|
||
if subpath and (".." in subpath or "\\" in subpath):
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
name = (subpath + "/" + base_name) if subpath else base_name
|
||
path = (PORTAL_FILES_DIR / name).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
try:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
f.save(str(path))
|
||
admin_log("portal_upload", name)
|
||
return jsonify({"ok": True, "name": name, "url": request.host_url.rstrip("/") + "/files/" + name})
|
||
except (OSError, IOError) as e:
|
||
if path.exists():
|
||
path.unlink(missing_ok=True)
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
# Text extensions allowed for editor (others are read-only or binary)
|
||
_PORTAL_EDITABLE_EXTENSIONS = frozenset(
|
||
".sh .bash .conf .cfg .ini .desktop .json .py .yml .yaml .md .txt .plymouth .script".split()
|
||
)
|
||
|
||
|
||
def _portal_language_for_path(name):
|
||
"""Return Ace/editor language mode from filename."""
|
||
n = (name or "").lower()
|
||
if n.endswith(".sh") or n.endswith(".bash"):
|
||
return "sh"
|
||
if n.endswith(".json"):
|
||
return "json"
|
||
if n.endswith(".py"):
|
||
return "python"
|
||
if n.endswith(".yml") or n.endswith(".yaml"):
|
||
return "yaml"
|
||
if n.endswith(".md"):
|
||
return "markdown"
|
||
if n.endswith(".conf") or n.endswith(".cfg") or n.endswith(".ini") or n.endswith(".desktop") or n.endswith(".plymouth"):
|
||
return "ini"
|
||
if n.endswith(".script"):
|
||
return "sh"
|
||
return "plain_text"
|
||
|
||
|
||
@app.route("/api/portal-files/content", methods=["GET"])
|
||
@require_admin
|
||
def api_portal_file_content_get():
|
||
"""Return file content as text for the editor. path= query param. Rejects binary."""
|
||
path_arg = (request.args.get("path") or "").strip()
|
||
if not path_arg or ".." in path_arg or "\\" in path_arg:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
path = (PORTAL_FILES_DIR / path_arg).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
ext = path.suffix.lower() if path.suffix else ""
|
||
if ext and ext not in _PORTAL_EDITABLE_EXTENSIONS:
|
||
return jsonify({"ok": False, "error": "binary or unsupported file type for editing"}), 400
|
||
try:
|
||
raw = path.read_bytes()
|
||
text = raw.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
return jsonify({"ok": False, "error": "binary or unsupported encoding"}), 400
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
return jsonify({
|
||
"ok": True,
|
||
"path": path_arg,
|
||
"content": text,
|
||
"language": _portal_language_for_path(path.name),
|
||
})
|
||
|
||
|
||
@app.route("/api/portal-files/content", methods=["PUT"])
|
||
@require_admin
|
||
def api_portal_file_content_put():
|
||
"""Write file content from editor. JSON body: { \"path\": \"...\", \"content\": \"...\" }."""
|
||
data = request.get_json(silent=True) or {}
|
||
path_arg = (data.get("path") or "").strip()
|
||
content = data.get("content")
|
||
if not path_arg or ".." in path_arg or "\\" in path_arg:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
if content is None:
|
||
return jsonify({"ok": False, "error": "content required"}), 400
|
||
path = (PORTAL_FILES_DIR / path_arg).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
if path.is_dir():
|
||
return jsonify({"ok": False, "error": "cannot edit a folder"}), 400
|
||
ext = path.suffix.lower() if path.suffix else ""
|
||
if path.exists() and ext and ext not in _PORTAL_EDITABLE_EXTENSIONS:
|
||
return jsonify({"ok": False, "error": "unsupported file type for editing"}), 400
|
||
if not isinstance(content, str):
|
||
content = str(content)
|
||
try:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(content, encoding="utf-8")
|
||
admin_log("portal_edit", path_arg)
|
||
return jsonify({"ok": True, "path": path_arg})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/portal-files/<path:name>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_portal_file_delete(name):
|
||
if ".." in name or "\\" in name:
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = (PORTAL_FILES_DIR / name).resolve()
|
||
try:
|
||
path.relative_to(PORTAL_FILES_DIR.resolve())
|
||
except ValueError:
|
||
return jsonify({"ok": False, "error": "invalid path"}), 400
|
||
if path.is_file():
|
||
try:
|
||
path.unlink()
|
||
admin_log("portal_delete", name)
|
||
return jsonify({"ok": True})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
if path.is_dir():
|
||
if any(path.iterdir()):
|
||
return jsonify({"ok": False, "error": "folder not empty"}), 400
|
||
try:
|
||
path.rmdir()
|
||
admin_log("portal_folder_delete", name)
|
||
return jsonify({"ok": True})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
|
||
|
||
@app.route("/api/backups/upload", methods=["POST"])
|
||
@require_admin
|
||
def api_backups_upload():
|
||
"""Upload an image file from the dashboard (multipart form)."""
|
||
if "file" not in request.files and "image" not in request.files:
|
||
return jsonify({"ok": False, "error": "no file in request (use field 'file' or 'image')"}), 400
|
||
f = request.files.get("file") or request.files.get("image")
|
||
if not f or not f.filename:
|
||
return jsonify({"ok": False, "error": "no file selected"}), 400
|
||
base = (f.filename.rsplit(".", 1)[0] if "." in f.filename else f.filename).strip() or "upload"
|
||
safe_base = re.sub(r"[^\w\-.]", "_", base)[:80]
|
||
if not safe_base:
|
||
safe_base = "upload"
|
||
ext = ""
|
||
if f.filename.lower().endswith(".img.xz"):
|
||
ext = ".img.xz"
|
||
elif f.filename.lower().endswith(".img.gz"):
|
||
ext = ".img.gz"
|
||
elif f.filename.lower().endswith(".img"):
|
||
ext = ".img"
|
||
else:
|
||
ext = ".img"
|
||
name = f"{safe_base}-{int(time.time())}{ext}"
|
||
if not _safe_backup_name(name):
|
||
name = f"upload-{int(time.time())}.img"
|
||
path = BACKUPS_DIR / name
|
||
try:
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
f.save(str(path))
|
||
return jsonify({"ok": True, "name": name, "message": f"Uploaded {name}"})
|
||
except (OSError, IOError) as e:
|
||
if path.exists():
|
||
path.unlink(missing_ok=True)
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
def _set_golden_from_path(path):
|
||
"""Set golden image to point to given path (file in backups or cloudinit dir)."""
|
||
GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True)
|
||
if GOLDEN_IMAGE.exists():
|
||
GOLDEN_IMAGE.unlink()
|
||
path_resolved = path.resolve()
|
||
try:
|
||
path_resolved.relative_to(BACKUPS_DIR.resolve())
|
||
except ValueError:
|
||
try:
|
||
path_resolved.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
|
||
except ValueError:
|
||
shutil.copy2(path, GOLDEN_IMAGE)
|
||
return
|
||
os.symlink(path_resolved, GOLDEN_IMAGE)
|
||
|
||
|
||
@app.route("/api/backups/<path:name>/set-as-golden", methods=["POST"])
|
||
@require_admin
|
||
def api_backup_set_as_golden(name):
|
||
"""Use this backup as the golden image (symlink)."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
try:
|
||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
_set_golden_from_path(path)
|
||
admin_log("set_golden", f"backups/{name}")
|
||
return jsonify({"ok": True, "message": f"Golden image set from backup {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>/set-as-golden", methods=["POST"])
|
||
@require_admin
|
||
def api_cloudinit_set_as_golden(name):
|
||
"""Use this cloud-init image as the golden image (symlink)."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "image not found"}), 404
|
||
try:
|
||
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||
_set_golden_from_path(path)
|
||
admin_log("set_golden", f"cloudinit/{name}")
|
||
return jsonify({"ok": True, "message": f"Golden image set from cloud-init image {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
def _request_host_shrink(name, action="shrink", format="xz"):
|
||
"""Write shrink request for host and poll shrink_status.json. Returns (ok, message_or_error)."""
|
||
req = {"name": name, "action": action}
|
||
if action == "compress":
|
||
req["format"] = "gz" if format == "gz" else "xz"
|
||
try:
|
||
SHRINK_REQUEST_FILE.write_text(json.dumps(req))
|
||
except OSError as e:
|
||
return False, str(e)
|
||
deadline = time.monotonic() + 2100 # 35 min
|
||
while time.monotonic() < deadline:
|
||
time.sleep(5)
|
||
if not SHRINK_STATUS_FILE.exists():
|
||
continue
|
||
try:
|
||
data = json.loads(SHRINK_STATUS_FILE.read_text())
|
||
except (OSError, ValueError):
|
||
continue
|
||
if data.get("name") != name:
|
||
continue
|
||
phase = data.get("phase")
|
||
if phase == "done":
|
||
return True, data.get("message") or f"Shrunk {name}"
|
||
if phase == "error":
|
||
return False, data.get("error") or "PiShrink failed"
|
||
return False, "Shrink timed out (run on host may still be in progress)"
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>", methods=["GET"])
|
||
@require_admin
|
||
def api_cloudinit_download(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"error": "not found"}), 404
|
||
return send_file(path, as_attachment=True, download_name=name)
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>", methods=["PATCH"])
|
||
@require_admin
|
||
def api_cloudinit_update(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
meta = _load_cloudinit_meta()
|
||
entry = meta.get(name, {})
|
||
new_filename = (body.get("filename") or "").strip()
|
||
if new_filename:
|
||
if not _safe_backup_name(new_filename):
|
||
return jsonify({"ok": False, "error": "invalid new filename"}), 400
|
||
new_path = CLOUDINIT_IMAGES_DIR / new_filename
|
||
if new_path.exists() and new_path != path:
|
||
return jsonify({"ok": False, "error": "target filename already exists"}), 409
|
||
try:
|
||
path.rename(new_path)
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
|
||
meta.pop(name, None)
|
||
name = new_filename
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
else:
|
||
if "name" in body:
|
||
entry["name"] = (body.get("name") or "").strip() or path.name
|
||
if "description" in body:
|
||
entry["description"] = (body.get("description") or "").strip()
|
||
meta[name] = entry
|
||
if not _save_cloudinit_meta(meta):
|
||
return jsonify({"ok": False, "error": "could not save metadata"}), 500
|
||
return jsonify({"ok": True, "name": name})
|
||
|
||
|
||
@app.route("/api/cloudinit-images/<path:name>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_cloudinit_delete(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid name"}), 400
|
||
path = CLOUDINIT_IMAGES_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
try:
|
||
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
|
||
try:
|
||
if GOLDEN_IMAGE.resolve() == path.resolve():
|
||
GOLDEN_IMAGE.unlink()
|
||
except OSError:
|
||
pass
|
||
path.unlink()
|
||
meta = _load_cloudinit_meta()
|
||
meta.pop(name, None)
|
||
_save_cloudinit_meta(meta)
|
||
admin_log("cloudinit_delete", name)
|
||
return jsonify({"ok": True, "message": f"Deleted {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/backups/<path:name>/shrink", methods=["POST"])
|
||
@require_admin
|
||
def api_backup_shrink(name):
|
||
"""Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
|
||
return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
ok, msg = _request_host_shrink(name, action="shrink")
|
||
if ok:
|
||
return jsonify({"ok": True, "message": msg})
|
||
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
|
||
|
||
|
||
@app.route("/api/backups/<path:name>/compress", methods=["POST"])
|
||
@require_admin
|
||
def api_backup_compress(name):
|
||
"""Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
|
||
return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower()
|
||
if fmt not in ("gz", "gzip", "xz"):
|
||
fmt = "xz"
|
||
if fmt == "gzip":
|
||
fmt = "gz"
|
||
ok, msg = _request_host_shrink(name, action="compress", format=fmt)
|
||
if ok:
|
||
ext = ".xz" if fmt == "xz" else ".gz"
|
||
return jsonify({"ok": True, "message": msg or f"Compressed to {name}{ext}"})
|
||
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
|
||
|
||
|
||
@app.route("/api/backups/<path:name>", methods=["PATCH"])
|
||
@require_admin
|
||
def api_backup_update(name):
|
||
"""Update backup metadata (display name, description) or rename the file."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
meta = _load_backups_meta()
|
||
entry = meta.get(name, {})
|
||
|
||
new_filename = (body.get("filename") or "").strip()
|
||
if new_filename:
|
||
if not _safe_backup_name(new_filename):
|
||
return jsonify({"ok": False, "error": "invalid new filename"}), 400
|
||
new_path = BACKUPS_DIR / new_filename
|
||
if new_path.exists() and new_path != path:
|
||
return jsonify({"ok": False, "error": "target filename already exists"}), 409
|
||
try:
|
||
path.rename(new_path)
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
|
||
if name in meta:
|
||
del meta[name]
|
||
name = new_filename
|
||
path = BACKUPS_DIR / name
|
||
else:
|
||
if "name" in body:
|
||
entry["name"] = (body.get("name") or "").strip() or path.name
|
||
if "description" in body:
|
||
entry["description"] = (body.get("description") or "").strip()
|
||
meta[name] = entry
|
||
|
||
if not _save_backups_meta(meta):
|
||
return jsonify({"ok": False, "error": "could not save metadata"}), 500
|
||
return jsonify({"ok": True, "name": name})
|
||
|
||
|
||
@app.route("/api/backups/<path:name>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_backup_delete(name):
|
||
"""Delete a backup file. If it is the current golden image (symlink), the golden link is removed."""
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"ok": False, "error": "invalid backup name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"ok": False, "error": "backup not found"}), 404
|
||
try:
|
||
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
|
||
try:
|
||
if (GOLDEN_IMAGE.resolve() == path.resolve()):
|
||
GOLDEN_IMAGE.unlink()
|
||
except OSError:
|
||
pass
|
||
path.unlink()
|
||
meta = _load_backups_meta()
|
||
meta.pop(name, None)
|
||
_save_backups_meta(meta)
|
||
return jsonify({"ok": True, "message": f"Deleted {name}"})
|
||
except (OSError, IOError) as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/backups/<path:name>", methods=["GET"])
|
||
@require_admin
|
||
def api_backup_download(name):
|
||
if not _safe_backup_name(name):
|
||
return jsonify({"error": "invalid name"}), 400
|
||
path = BACKUPS_DIR / name
|
||
if not path.is_file():
|
||
return jsonify({"error": "not found"}), 404
|
||
return send_file(path, as_attachment=True, download_name=name)
|
||
|
||
|
||
def _build_status_read():
|
||
try:
|
||
if BUILD_STATUS_FILE.is_file():
|
||
with open(BUILD_STATUS_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {"phase": "idle", "message": "", "output_name": None, "error": None}
|
||
|
||
|
||
def _build_status_write(phase, message, output_name=None, error=None):
|
||
try:
|
||
BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(BUILD_STATUS_FILE, "w") as f:
|
||
json.dump({
|
||
"phase": phase,
|
||
"message": message,
|
||
"output_name": output_name,
|
||
"error": error,
|
||
"updated": time.time(),
|
||
}, f, indent=2)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _raspios_latest_url(variant="desktop"):
|
||
"""Resolve latest Raspberry Pi OS (arm64) .img.xz URL. variant=lite|desktop|full."""
|
||
if variant == "lite":
|
||
slug = "raspios_lite_arm64"
|
||
elif variant == "full":
|
||
slug = "raspios_full_arm64"
|
||
else:
|
||
slug = "raspios_arm64" # desktop (with desktop, not full)
|
||
base = f"https://downloads.raspberrypi.com/{slug}/images"
|
||
headers = {"User-Agent": "Mozilla/5.0 (compatible; CM4-Provisioning/1.0)"}
|
||
try:
|
||
req = urllib.request.Request(base + "/", headers=headers)
|
||
with urllib.request.urlopen(req, timeout=20) as r:
|
||
html = r.read().decode("utf-8", errors="ignore")
|
||
folders = re.findall(rf"{re.escape(slug)}-(\d{{4}}-\d{{2}}-\d{{2}})/", html)
|
||
if not folders:
|
||
return None
|
||
latest = sorted(folders)[-1]
|
||
folder_url = f"{base}/{slug}-{latest}/"
|
||
req2 = urllib.request.Request(folder_url, headers=headers)
|
||
with urllib.request.urlopen(req2, timeout=20) as r:
|
||
folder_html = r.read().decode("utf-8", errors="ignore")
|
||
m = re.search(r'href="([^"]+\.img\.xz)"', folder_html)
|
||
if not m:
|
||
return None
|
||
href = m.group(1)
|
||
if href.startswith("http://") or href.startswith("https://"):
|
||
return href
|
||
return folder_url.rstrip("/") + "/" + href.lstrip("/")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _load_cloudinit_templates():
|
||
try:
|
||
if CLOUDINIT_TEMPLATES_FILE.is_file():
|
||
with open(CLOUDINIT_TEMPLATES_FILE, "r") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {"templates": []}
|
||
|
||
|
||
def _save_cloudinit_templates(data):
|
||
try:
|
||
CLOUDINIT_TEMPLATES_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(CLOUDINIT_TEMPLATES_FILE, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
@app.route("/api/build-cloudinit-status")
|
||
@require_admin
|
||
def api_build_cloudinit_status():
|
||
"""Return current build status (phase, message, output_name, error)."""
|
||
return jsonify(_build_status_read())
|
||
|
||
|
||
@app.route("/api/build-cloudinit-cancel", methods=["POST"])
|
||
@require_admin
|
||
def api_build_cloudinit_cancel():
|
||
"""Request cancellation of the current build (host script checks for cancel file)."""
|
||
try:
|
||
BUILD_CANCEL_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
BUILD_CANCEL_FILE.write_text("")
|
||
return jsonify({"ok": True, "message": "Cancel requested. Build will stop at next check."})
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
|
||
|
||
@app.route("/api/build-cloudinit-status-clear", methods=["POST"])
|
||
@require_admin
|
||
def api_build_cloudinit_status_clear():
|
||
"""Clear build status to idle (so message is cleared after cancel/done/error).
|
||
Use ?force=1 to clear even when status is stuck (e.g. build died during finalizing)."""
|
||
st = _build_status_read()
|
||
force = request.args.get("force") in ("1", "true", "yes")
|
||
busy = st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving")
|
||
if busy and not force:
|
||
return jsonify({"ok": False, "error": "Cannot clear while build is in progress"}), 409
|
||
_build_status_write("idle", "", None, None)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/build-cloudinit", methods=["POST"])
|
||
@require_admin
|
||
def api_build_cloudinit():
|
||
"""Start building a cloud-init image: write request file; host runs build (has loop devices)."""
|
||
st = _build_status_read()
|
||
if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"):
|
||
return jsonify({"ok": False, "error": "A build is already in progress"}), 409
|
||
body = request.get_json(silent=True) or {}
|
||
variant = (body.get("variant") or "desktop").strip().lower()
|
||
if variant not in ("lite", "desktop", "full"):
|
||
variant = "desktop"
|
||
_build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…")
|
||
url = _raspios_latest_url(variant)
|
||
if not url:
|
||
_build_status_write("idle", "", error="Could not resolve latest Raspios URL")
|
||
return jsonify({"ok": False, "error": "Could not resolve latest image URL"}), 503
|
||
user_data = body.get("user_data") or DEFAULT_USER_DATA
|
||
meta_data = body.get("meta_data") or DEFAULT_META_DATA
|
||
network_config = body.get("network_config") or DEFAULT_NETWORK_CONFIG
|
||
set_as_golden_after = bool(body.get("set_as_golden_after"))
|
||
image_name = (body.get("image_name") or "").strip()[:64]
|
||
image_name = re.sub(r"[^\w\-]", "", image_name) # only alphanumeric, underscore, dash
|
||
try:
|
||
BUILD_REQUEST_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
# Remove any stale cancel file from a previous run so the new build is not cancelled immediately
|
||
BUILD_CANCEL_FILE.unlink(missing_ok=True)
|
||
with open(BUILD_REQUEST_FILE, "w") as f:
|
||
json.dump({
|
||
"url": url,
|
||
"variant": variant,
|
||
"image_name": image_name or None,
|
||
"user_data": user_data,
|
||
"meta_data": meta_data,
|
||
"network_config": network_config,
|
||
"set_as_golden_after": set_as_golden_after,
|
||
}, f, indent=2)
|
||
except OSError as e:
|
||
return jsonify({"ok": False, "error": str(e)}), 500
|
||
_build_status_write("resolving", "Build requested; host will run download and inject (check status).")
|
||
return jsonify({"ok": True, "message": "Build started on host. Download and inject may take 15–45 min. Poll status or refresh the page."}), 202
|
||
|
||
|
||
@app.route("/api/raspios-latest-url")
|
||
@require_admin
|
||
def api_raspios_latest_url():
|
||
"""Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|desktop|full."""
|
||
variant = (request.args.get("variant") or "desktop").strip().lower()
|
||
if variant not in ("lite", "desktop", "full"):
|
||
variant = "desktop"
|
||
url = _raspios_latest_url(variant)
|
||
if not url:
|
||
return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503
|
||
return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1], "variant": variant})
|
||
|
||
|
||
@app.route("/api/cloudinit-templates", methods=["GET"])
|
||
@require_admin
|
||
def api_cloudinit_templates_list():
|
||
"""List saved cloud-init templates."""
|
||
data = _load_cloudinit_templates()
|
||
return jsonify({"templates": data.get("templates", [])})
|
||
|
||
|
||
@app.route("/api/cloudinit-templates", methods=["POST"])
|
||
@require_admin
|
||
def api_cloudinit_templates_create():
|
||
"""Save a new cloud-init template."""
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
name = (body.get("name") or "").strip()
|
||
if not name:
|
||
return jsonify({"ok": False, "error": "name required"}), 400
|
||
data = _load_cloudinit_templates()
|
||
templates = data.get("templates", [])
|
||
tid = str(int(time.time() * 1000))
|
||
templates.append({
|
||
"id": tid,
|
||
"name": name,
|
||
"user_data": body.get("user_data", ""),
|
||
"meta_data": body.get("meta_data", ""),
|
||
"network_config": body.get("network_config", ""),
|
||
})
|
||
data["templates"] = templates
|
||
if not _save_cloudinit_templates(data):
|
||
return jsonify({"ok": False, "error": "Failed to save"}), 500
|
||
return jsonify({"ok": True, "id": tid, "name": name})
|
||
|
||
|
||
@app.route("/api/cloudinit-templates/<tid>")
|
||
@require_admin
|
||
def api_cloudinit_templates_get(tid):
|
||
"""Get one template by id."""
|
||
data = _load_cloudinit_templates()
|
||
for t in data.get("templates", []):
|
||
if t.get("id") == tid:
|
||
return jsonify(t)
|
||
return jsonify({"error": "not found"}), 404
|
||
|
||
|
||
@app.route("/api/cloudinit-templates/<tid>", methods=["PUT"])
|
||
@require_admin
|
||
def api_cloudinit_templates_update(tid):
|
||
"""Update an existing template (name, user_data, meta_data, network_config). Id unchanged."""
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
data = _load_cloudinit_templates()
|
||
templates = data.get("templates", [])
|
||
for i, t in enumerate(templates):
|
||
if t.get("id") == tid:
|
||
name = (body.get("name") or t.get("name") or "").strip()
|
||
if not name:
|
||
return jsonify({"ok": False, "error": "name required"}), 400
|
||
templates[i] = {
|
||
"id": tid,
|
||
"name": name,
|
||
"user_data": body.get("user_data", t.get("user_data", "")),
|
||
"meta_data": body.get("meta_data", t.get("meta_data", "")),
|
||
"network_config": body.get("network_config", t.get("network_config", "")),
|
||
}
|
||
data["templates"] = templates
|
||
if not _save_cloudinit_templates(data):
|
||
return jsonify({"ok": False, "error": "Failed to save"}), 500
|
||
return jsonify({"ok": True, "id": tid, "name": name})
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
|
||
|
||
@app.route("/api/cloudinit-templates/<tid>", methods=["DELETE"])
|
||
@require_admin
|
||
def api_cloudinit_templates_delete(tid):
|
||
"""Delete a template."""
|
||
data = _load_cloudinit_templates()
|
||
templates = [t for t in data.get("templates", []) if t.get("id") != tid]
|
||
if len(templates) == len(data.get("templates", [])):
|
||
return jsonify({"ok": False, "error": "not found"}), 404
|
||
data["templates"] = templates
|
||
if not _save_cloudinit_templates(data):
|
||
return jsonify({"ok": False, "error": "Failed to save"}), 500
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/golden-info")
|
||
def api_golden_info():
|
||
"""Return whether golden image exists, size/mtime, and which file it points to (for UI)."""
|
||
if not GOLDEN_IMAGE.is_file():
|
||
return jsonify({"present": False})
|
||
try:
|
||
st = GOLDEN_IMAGE.stat()
|
||
src, src_name = _golden_current_source()
|
||
out = {"present": True, "size": st.st_size, "mtime": st.st_mtime}
|
||
if src and src_name:
|
||
out["source"] = src
|
||
out["name"] = src_name
|
||
return jsonify(out)
|
||
except OSError:
|
||
return jsonify({"present": False})
|
||
|
||
|
||
@app.route("/api/admin/users", methods=["GET"])
|
||
@require_admin
|
||
def api_admin_users():
|
||
return jsonify({"users": list_users()})
|
||
|
||
|
||
@app.route("/api/admin/users", methods=["POST"])
|
||
@require_admin
|
||
def api_admin_add_user():
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
username = (body.get("username") or "").strip()
|
||
password = (body.get("password") or "")
|
||
if not username:
|
||
return jsonify({"ok": False, "error": "username required"}), 400
|
||
if len(password) < 6:
|
||
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
|
||
if create_user(username, password):
|
||
admin_log("user_created", username)
|
||
return jsonify({"ok": True, "message": f"User {username} created"})
|
||
return jsonify({"ok": False, "error": "username already exists"}), 409
|
||
|
||
|
||
@app.route("/api/admin/users/<int:user_id>/password", methods=["POST"])
|
||
@require_admin
|
||
def api_admin_change_password(user_id):
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
new_password = body.get("password") or ""
|
||
if len(new_password) < 6:
|
||
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
|
||
conn = get_db()
|
||
row = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
|
||
conn.close()
|
||
if not row:
|
||
return jsonify({"ok": False, "error": "user not found"}), 404
|
||
change_password(user_id, new_password)
|
||
admin_log("password_changed", str(user_id))
|
||
return jsonify({"ok": True, "message": "Password updated"})
|
||
|
||
|
||
@app.route("/api/admin/logs")
|
||
@require_admin
|
||
def api_admin_logs():
|
||
limit = min(int(request.args.get("limit", 100)), 500)
|
||
return jsonify({"logs": get_recent_logs(limit)})
|
||
|
||
|
||
# Ensure DB exists when app is loaded (e.g. by gunicorn or systemd)
|
||
try:
|
||
init_db()
|
||
except Exception:
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=5000, debug=False)
|