Files
reterminal-dm4/chromium-setup/emmc-provisioning/dashboard/app.py

665 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Flask dashboard for CM4 eMMC provisioning.
Monitors deployment status, shows device connection steps, backup/restore.
Supports USB boot mode (ask Backup/Deploy) and network-booted devices (register, then Backup/Deploy).
"""
import json
import os
import re
import shutil
import subprocess
import time
import urllib.request
from pathlib import Path
from flask import Flask, render_template, jsonify, request, send_file, Response
app = Flask(__name__)
@app.after_request
def no_cache(response):
"""Prevent browser from caching the dashboard so deploys are visible immediately."""
if request.path == "/" or request.path.startswith("/api/"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning"))
STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json"))
LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log"))
ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request"))
DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source"))
BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups")))
GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img")))
NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json")))
BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json")))
BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR / "build_cloudinit_request.json")))
SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json")))
SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json")))
CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json")))
# Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition)
DEFAULT_USER_DATA = """#cloud-config
package_update: true
package_upgrade: false
packages:
- curl
runcmd:
- curl -fsSL "http://YOUR_FILE_SERVER/provisioning/bootstrap.sh" -o /tmp/bootstrap.sh
- chmod +x /tmp/bootstrap.sh
- /tmp/bootstrap.sh
"""
DEFAULT_META_DATA = """instance-id: raspios-cloudinit-001
local-hostname: reterminal
"""
DEFAULT_NETWORK_CONFIG = """version: 2
ethernets:
eth0:
dhcp4: true
"""
DEFAULT_STATUS = {
"phase": "idle",
"message": "Waiting for reTerminal in boot mode or network.",
"progress": None,
"updated": None,
}
def read_status():
try:
with open(STATUS_FILE, "r") as f:
data = json.load(f)
out = {**DEFAULT_STATUS, **data}
if out.get("phase") == "waiting_choice":
try:
with open(DEVICE_SOURCE_FILE, "r") as sf:
out["device_source"] = (sf.read() or "").strip() or "usb"
except (FileNotFoundError, OSError):
out["device_source"] = "usb"
return out
except (FileNotFoundError, json.JSONDecodeError):
return DEFAULT_STATUS
def read_log_tail(lines=50):
try:
with open(LOG_FILE, "r") as f:
all_lines = f.readlines()
return "".join(all_lines[-lines:]).strip() if all_lines else ""
except (FileNotFoundError, PermissionError):
return ""
def _load_network_devices():
try:
if NETWORK_DEVICES_FILE.is_file():
with open(NETWORK_DEVICES_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"devices": []}
def _save_network_devices(data):
try:
os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True)
with open(NETWORK_DEVICES_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except (PermissionError, OSError):
return False
BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json"
def _load_backups_meta():
try:
if BACKUPS_META_FILE.is_file():
with open(BACKUPS_META_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}
def _save_backups_meta(data):
try:
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
with open(BACKUPS_META_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except (PermissionError, OSError):
return False
def _safe_backup_name(name):
"""Reject path traversal and ensure it's a backup filename we manage."""
if not name or ".." in name or "/" in name or "\\" in name:
return False
if not name.endswith((".img", ".img.gz", ".img.xz")):
return False
return True
def list_backups():
if not BACKUPS_DIR.is_dir():
return []
meta = _load_backups_meta()
out = []
for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if p.is_file() and p.name != "backups_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
try:
st = p.stat()
m = meta.get(p.name, {})
out.append({
"name": p.name,
"display_name": m.get("name") or p.name,
"description": m.get("description") or "",
"size": st.st_size,
"mtime": st.st_mtime,
})
except OSError:
pass
return out
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/status")
def api_status():
return jsonify(read_status())
@app.route("/api/status-clear", methods=["POST"])
def api_status_clear():
"""Reset status to idle (e.g. to dismiss a 'Golden image not found' error so you can try again)."""
try:
with open(STATUS_FILE, "w") as f:
json.dump({
"phase": "idle",
"message": DEFAULT_STATUS["message"],
"progress": None,
"updated": None,
}, f)
return jsonify({"ok": True})
except (PermissionError, OSError):
return jsonify({"ok": False, "error": "Could not write status"}), 500
@app.route("/api/log")
def api_log():
return jsonify({"log": read_log_tail()})
@app.route("/api/pending-devices")
def api_pending_devices():
"""Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy."""
st = read_status()
usb = None
if st.get("phase") == "waiting_choice":
usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")}
data = _load_network_devices()
network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")]
return jsonify({"usb": usb, "network": network})
@app.route("/api/device-action", methods=["POST"])
def api_device_action():
"""User chose Backup or Deploy for a device. source=usb | network; for network pass mac=."""
body = request.get_json(force=True, silent=True) or {}
source = (body.get("source") or "").strip().lower()
action = (body.get("action") or "").strip().lower()
if action not in ("backup", "deploy"):
return jsonify({"ok": False, "error": "action must be 'backup' or 'deploy'"}), 400
if source == "usb":
try:
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
# If user requested "shrink after backup", create flag so host runs PiShrink after dd
if action == "backup" and body.get("shrink"):
try:
(BASE_DIR / "shrink_next_backup").write_text("1")
except (PermissionError, OSError):
pass # host may still have SHRINK_BACKUP=1
with open(ACTION_REQUEST_FILE, "w") as f:
f.write(action)
return jsonify({"ok": True})
except (PermissionError, OSError):
return jsonify({"ok": False, "error": "Could not write action file"}), 500
if source == "network":
mac = (body.get("mac") or "").strip()
if not mac:
return jsonify({"ok": False, "error": "mac required for network device"}), 400
data = _load_network_devices()
for d in data.get("devices", []):
if (d.get("mac") or "").lower() == mac.lower():
d["action"] = action
d["action_at"] = time.time()
_save_network_devices(data)
return jsonify({"ok": True})
return jsonify({"ok": False, "error": "Device not found"}), 404
return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400
@app.route("/api/register-device", methods=["POST"])
def api_register_device():
"""Called by a network-booted device to register (mac, ip)."""
body = request.get_json(force=True, silent=True) or request.form
mac = (body.get("mac") or "").strip()
ip = (body.get("ip") or request.remote_addr or "").strip()
if not mac:
return jsonify({"ok": False, "error": "mac required"}), 400
data = _load_network_devices()
devices = data.get("devices", [])
for d in devices:
if (d.get("mac") or "").lower() == mac.lower():
d["ip"] = ip
d["registered_at"] = time.time()
d["action"] = d.get("action") or "wait"
_save_network_devices(data)
return jsonify({"ok": True, "message": "registered"})
devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"})
data["devices"] = devices
_save_network_devices(data)
return jsonify({"ok": True, "message": "registered"})
@app.route("/api/device-action-poll")
def api_device_action_poll():
"""Network device polls this to get its assigned action (deploy/backup) and URL."""
mac = (request.args.get("mac") or "").strip()
if not mac:
return jsonify({"action": "wait"}), 200
data = _load_network_devices()
base = request.host_url.rstrip("/")
for d in data.get("devices", []):
if (d.get("mac") or "").lower() == mac.lower():
action = d.get("action") or "wait"
if action == "deploy":
return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"})
if action == "backup":
return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"})
return jsonify({"action": "wait"})
return jsonify({"action": "wait"})
@app.route("/api/golden-image")
def api_golden_image():
"""Stream the golden image for network deploy (device pulls and writes to eMMC)."""
if not GOLDEN_IMAGE.is_file():
return jsonify({"error": "Golden image not found"}), 404
return send_file(
GOLDEN_IMAGE,
mimetype="application/octet-stream",
as_attachment=True,
download_name="golden.img",
)
@app.route("/api/backup-upload", methods=["POST"])
def api_backup_upload():
"""Network device uploads its eMMC backup (raw body)."""
mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20]
if not mac:
return jsonify({"error": "mac query param required"}), 400
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
name = f"backup-net-{mac}-{int(time.time())}.img"
path = BACKUPS_DIR / name
try:
with open(path, "wb") as f:
while True:
chunk = request.stream.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
return jsonify({"ok": True, "file": name})
except (OSError, IOError) as e:
if path.exists():
path.unlink(missing_ok=True)
return jsonify({"error": str(e)}), 500
@app.route("/api/backups")
def api_backups():
return jsonify({"backups": list_backups()})
@app.route("/api/backups/<path:name>/set-as-golden", methods=["POST"])
def api_backup_set_as_golden(name):
"""Copy this backup to golden.img so it becomes the image used for Deploy."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
try:
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
shutil.copy2(path, GOLDEN_IMAGE)
return jsonify({"ok": True, "message": f"Golden image set from {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
def _request_host_shrink(name, action="shrink", format="xz"):
"""Write shrink request for host and poll shrink_status.json. Returns (ok, message_or_error)."""
req = {"name": name, "action": action}
if action == "compress":
req["format"] = "gz" if format == "gz" else "xz"
try:
SHRINK_REQUEST_FILE.write_text(json.dumps(req))
except OSError as e:
return False, str(e)
deadline = time.monotonic() + 2100 # 35 min
while time.monotonic() < deadline:
time.sleep(5)
if not SHRINK_STATUS_FILE.exists():
continue
try:
data = json.loads(SHRINK_STATUS_FILE.read_text())
except (OSError, ValueError):
continue
if data.get("name") != name:
continue
phase = data.get("phase")
if phase == "done":
return True, data.get("message") or f"Shrunk {name}"
if phase == "error":
return False, data.get("error") or "PiShrink failed"
return False, "Shrink timed out (run on host may still be in progress)"
@app.route("/api/backups/<path:name>/shrink", methods=["POST"])
def api_backup_shrink(name):
"""Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
return jsonify({"ok": False, "error": "only raw .img files can be shrunk (not .img.gz / .img.xz)"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
ok, msg = _request_host_shrink(name, action="shrink")
if ok:
return jsonify({"ok": True, "message": msg})
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
@app.route("/api/backups/<path:name>/compress", methods=["POST"])
def api_backup_compress(name):
"""Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
if not name.endswith(".img") or name.endswith(".img.gz") or name.endswith(".img.xz"):
return jsonify({"ok": False, "error": "only raw .img files can be compressed (not .img.gz / .img.xz)"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
body = request.get_json(force=True, silent=True) or {}
fmt = (body.get("format") or request.args.get("format") or "xz").strip().lower()
if fmt not in ("gz", "gzip", "xz"):
fmt = "xz"
if fmt == "gzip":
fmt = "gz"
ok, msg = _request_host_shrink(name, action="compress", format=fmt)
if ok:
ext = ".xz" if fmt == "xz" else ".gz"
return jsonify({"ok": True, "message": msg or f"Compressed to {name}{ext}"})
return jsonify({"ok": False, "error": msg}), (503 if "not installed" in msg else 504 if "timed out" in msg else 500)
@app.route("/api/backups/<path:name>", methods=["PATCH"])
def api_backup_update(name):
"""Update backup metadata (display name, description) or rename the file."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "backup not found"}), 404
body = request.get_json(force=True, silent=True) or {}
meta = _load_backups_meta()
entry = meta.get(name, {})
new_filename = (body.get("filename") or "").strip()
if new_filename:
if not _safe_backup_name(new_filename):
return jsonify({"ok": False, "error": "invalid new filename"}), 400
new_path = BACKUPS_DIR / new_filename
if new_path.exists() and new_path != path:
return jsonify({"ok": False, "error": "target filename already exists"}), 409
try:
path.rename(new_path)
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
if name in meta:
del meta[name]
name = new_filename
path = BACKUPS_DIR / name
else:
if "name" in body:
entry["name"] = (body.get("name") or "").strip() or path.name
if "description" in body:
entry["description"] = (body.get("description") or "").strip()
meta[name] = entry
if not _save_backups_meta(meta):
return jsonify({"ok": False, "error": "could not save metadata"}), 500
return jsonify({"ok": True, "name": name})
@app.route("/api/backups/<path:name>", methods=["GET"])
def api_backup_download(name):
if not _safe_backup_name(name):
return jsonify({"error": "invalid name"}), 400
path = BACKUPS_DIR / name
if not path.is_file():
return jsonify({"error": "not found"}), 404
return send_file(path, as_attachment=True, download_name=name)
def _build_status_read():
try:
if BUILD_STATUS_FILE.is_file():
with open(BUILD_STATUS_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"phase": "idle", "message": "", "output_name": None, "error": None}
def _build_status_write(phase, message, output_name=None, error=None):
try:
BUILD_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUILD_STATUS_FILE, "w") as f:
json.dump({
"phase": phase,
"message": message,
"output_name": output_name,
"error": error,
"updated": time.time(),
}, f, indent=2)
except OSError:
pass
def _raspios_latest_url(variant="lite"):
"""Resolve latest Raspberry Pi OS (arm64) .img.xz URL. variant=lite|full."""
slug = "raspios_lite_arm64" if variant == "lite" else "raspios_full_arm64"
base = f"https://downloads.raspberrypi.com/{slug}/images"
headers = {"User-Agent": "Mozilla/5.0 (compatible; CM4-Provisioning/1.0)"}
try:
req = urllib.request.Request(base + "/", headers=headers)
with urllib.request.urlopen(req, timeout=20) as r:
html = r.read().decode("utf-8", errors="ignore")
folders = re.findall(rf"{re.escape(slug)}-(\d{{4}}-\d{{2}}-\d{{2}})/", html)
if not folders:
return None
latest = sorted(folders)[-1]
folder_url = f"{base}/{slug}-{latest}/"
req2 = urllib.request.Request(folder_url, headers=headers)
with urllib.request.urlopen(req2, timeout=20) as r:
folder_html = r.read().decode("utf-8", errors="ignore")
m = re.search(r'href="([^"]+\.img\.xz)"', folder_html)
if not m:
return None
href = m.group(1)
if href.startswith("http://") or href.startswith("https://"):
return href
return folder_url.rstrip("/") + "/" + href.lstrip("/")
except Exception:
return None
def _load_cloudinit_templates():
try:
if CLOUDINIT_TEMPLATES_FILE.is_file():
with open(CLOUDINIT_TEMPLATES_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {"templates": []}
def _save_cloudinit_templates(data):
try:
CLOUDINIT_TEMPLATES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CLOUDINIT_TEMPLATES_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except OSError:
return False
@app.route("/api/build-cloudinit-status")
def api_build_cloudinit_status():
"""Return current build status (phase, message, output_name, error)."""
return jsonify(_build_status_read())
@app.route("/api/build-cloudinit", methods=["POST"])
def api_build_cloudinit():
"""Start building a cloud-init image: write request file; host runs build (has loop devices)."""
st = _build_status_read()
if st.get("phase") in ("downloading", "decompressing", "injecting", "finalizing", "resolving"):
return jsonify({"ok": False, "error": "A build is already in progress"}), 409
body = request.get_json(silent=True) or {}
variant = (body.get("variant") or "lite").strip().lower()
if variant not in ("lite", "full"):
variant = "lite"
_build_status_write("resolving", "Resolving latest Raspberry Pi OS image URL…")
url = _raspios_latest_url(variant)
if not url:
_build_status_write("idle", "", error="Could not resolve latest Raspios URL")
return jsonify({"ok": False, "error": "Could not resolve latest image URL"}), 503
user_data = body.get("user_data") or DEFAULT_USER_DATA
meta_data = body.get("meta_data") or DEFAULT_META_DATA
network_config = body.get("network_config") or DEFAULT_NETWORK_CONFIG
set_as_golden_after = bool(body.get("set_as_golden_after"))
try:
BUILD_REQUEST_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUILD_REQUEST_FILE, "w") as f:
json.dump({
"url": url,
"variant": variant,
"user_data": user_data,
"meta_data": meta_data,
"network_config": network_config,
"set_as_golden_after": set_as_golden_after,
}, f, indent=2)
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
_build_status_write("resolving", "Build requested; host will run download and inject (check status).")
return jsonify({"ok": True, "message": "Build started on host. Download and inject may take 1545 min. Poll status or refresh the page."}), 202
@app.route("/api/raspios-latest-url")
def api_raspios_latest_url():
"""Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|full."""
variant = (request.args.get("variant") or "lite").strip().lower()
if variant not in ("lite", "full"):
variant = "lite"
url = _raspios_latest_url(variant)
if not url:
return jsonify({"ok": False, "url": None, "error": "Could not resolve latest image URL"}), 503
return jsonify({"ok": True, "url": url, "filename": url.split("/")[-1], "variant": variant})
@app.route("/api/cloudinit-templates", methods=["GET"])
def api_cloudinit_templates_list():
"""List saved cloud-init templates."""
data = _load_cloudinit_templates()
return jsonify({"templates": data.get("templates", [])})
@app.route("/api/cloudinit-templates", methods=["POST"])
def api_cloudinit_templates_create():
"""Save a new cloud-init template."""
body = request.get_json(force=True, silent=True) or {}
name = (body.get("name") or "").strip()
if not name:
return jsonify({"ok": False, "error": "name required"}), 400
data = _load_cloudinit_templates()
templates = data.get("templates", [])
tid = str(int(time.time() * 1000))
templates.append({
"id": tid,
"name": name,
"user_data": body.get("user_data", ""),
"meta_data": body.get("meta_data", ""),
"network_config": body.get("network_config", ""),
})
data["templates"] = templates
if not _save_cloudinit_templates(data):
return jsonify({"ok": False, "error": "Failed to save"}), 500
return jsonify({"ok": True, "id": tid, "name": name})
@app.route("/api/cloudinit-templates/<tid>")
def api_cloudinit_templates_get(tid):
"""Get one template by id."""
data = _load_cloudinit_templates()
for t in data.get("templates", []):
if t.get("id") == tid:
return jsonify(t)
return jsonify({"error": "not found"}), 404
@app.route("/api/cloudinit-templates/<tid>", methods=["DELETE"])
def api_cloudinit_templates_delete(tid):
"""Delete a template."""
data = _load_cloudinit_templates()
templates = [t for t in data.get("templates", []) if t.get("id") != tid]
if len(templates) == len(data.get("templates", [])):
return jsonify({"ok": False, "error": "not found"}), 404
data["templates"] = templates
if not _save_cloudinit_templates(data):
return jsonify({"ok": False, "error": "Failed to save"}), 500
return jsonify({"ok": True})
@app.route("/api/golden-info")
def api_golden_info():
"""Return whether golden image exists and its size/mtime for UI."""
if not GOLDEN_IMAGE.is_file():
return jsonify({"present": False})
try:
st = GOLDEN_IMAGE.stat()
return jsonify({"present": True, "size": st.st_size, "mtime": st.st_mtime})
except OSError:
return jsonify({"present": False})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)