Implement automatic page scaling feature with viewport adjustments
This commit is contained in:
245
chromium-setup/emmc-provisioning/dashboard/app.py
Normal file
245
chromium-setup/emmc-provisioning/dashboard/app.py
Normal file
@@ -0,0 +1,245 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Flask dashboard for CM4 eMMC provisioning.
|
||||
Monitors deployment status, shows device connection steps, backup/restore.
|
||||
Supports USB boot mode (ask Backup/Deploy) and network-booted devices (register, then Backup/Deploy).
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from flask import Flask, render_template, jsonify, request, send_file, Response
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning"))
|
||||
STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json"))
|
||||
LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log"))
|
||||
ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request"))
|
||||
DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source"))
|
||||
BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups")))
|
||||
GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img")))
|
||||
NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json")))
|
||||
|
||||
DEFAULT_STATUS = {
|
||||
"phase": "idle",
|
||||
"message": "Waiting for reTerminal in boot mode or network.",
|
||||
"progress": None,
|
||||
"updated": None,
|
||||
}
|
||||
|
||||
|
||||
def read_status():
|
||||
try:
|
||||
with open(STATUS_FILE, "r") as f:
|
||||
data = json.load(f)
|
||||
out = {**DEFAULT_STATUS, **data}
|
||||
if out.get("phase") == "waiting_choice":
|
||||
try:
|
||||
with open(DEVICE_SOURCE_FILE, "r") as sf:
|
||||
out["device_source"] = (sf.read() or "").strip() or "usb"
|
||||
except (FileNotFoundError, OSError):
|
||||
out["device_source"] = "usb"
|
||||
return out
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return DEFAULT_STATUS
|
||||
|
||||
|
||||
def read_log_tail(lines=50):
|
||||
try:
|
||||
with open(LOG_FILE, "r") as f:
|
||||
all_lines = f.readlines()
|
||||
return "".join(all_lines[-lines:]).strip() if all_lines else ""
|
||||
except (FileNotFoundError, PermissionError):
|
||||
return ""
|
||||
|
||||
|
||||
def _load_network_devices():
|
||||
try:
|
||||
if NETWORK_DEVICES_FILE.is_file():
|
||||
with open(NETWORK_DEVICES_FILE, "r") as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return {"devices": []}
|
||||
|
||||
|
||||
def _save_network_devices(data):
|
||||
try:
|
||||
os.makedirs(NETWORK_DEVICES_FILE.parent, exist_ok=True)
|
||||
with open(NETWORK_DEVICES_FILE, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
return True
|
||||
except (PermissionError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
def list_backups():
|
||||
if not BACKUPS_DIR.is_dir():
|
||||
return []
|
||||
out = []
|
||||
for p in sorted(BACKUPS_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
|
||||
if p.is_file() and p.suffix in (".img", ".img.gz"):
|
||||
try:
|
||||
st = p.stat()
|
||||
out.append({"name": p.name, "size": st.st_size, "mtime": st.st_mtime})
|
||||
except OSError:
|
||||
pass
|
||||
return out
|
||||
|
||||
|
||||
@app.route("/")
|
||||
def index():
|
||||
return render_template("index.html")
|
||||
|
||||
|
||||
@app.route("/api/status")
|
||||
def api_status():
|
||||
return jsonify(read_status())
|
||||
|
||||
|
||||
@app.route("/api/log")
|
||||
def api_log():
|
||||
return jsonify({"log": read_log_tail()})
|
||||
|
||||
|
||||
@app.route("/api/pending-devices")
|
||||
def api_pending_devices():
|
||||
"""Returns USB (if waiting_choice) and registered network devices so the UI can show Backup/Deploy."""
|
||||
st = read_status()
|
||||
usb = None
|
||||
if st.get("phase") == "waiting_choice":
|
||||
usb = {"source": "usb", "message": st.get("message", "Device connected (USB). Choose action.")}
|
||||
data = _load_network_devices()
|
||||
network = [d for d in data.get("devices", []) if d.get("action") in (None, "wait")]
|
||||
return jsonify({"usb": usb, "network": network})
|
||||
|
||||
|
||||
@app.route("/api/device-action", methods=["POST"])
|
||||
def api_device_action():
|
||||
"""User chose Backup or Deploy for a device. source=usb | network; for network pass mac=."""
|
||||
body = request.get_json(force=True, silent=True) or {}
|
||||
source = (body.get("source") or "").strip().lower()
|
||||
action = (body.get("action") or "").strip().lower()
|
||||
if action not in ("backup", "deploy"):
|
||||
return jsonify({"ok": False, "error": "action must be 'backup' or 'deploy'"}), 400
|
||||
if source == "usb":
|
||||
try:
|
||||
os.makedirs(os.path.dirname(ACTION_REQUEST_FILE) or ".", exist_ok=True)
|
||||
with open(ACTION_REQUEST_FILE, "w") as f:
|
||||
f.write(action)
|
||||
return jsonify({"ok": True})
|
||||
except (PermissionError, OSError):
|
||||
return jsonify({"ok": False, "error": "Could not write action file"}), 500
|
||||
if source == "network":
|
||||
mac = (body.get("mac") or "").strip()
|
||||
if not mac:
|
||||
return jsonify({"ok": False, "error": "mac required for network device"}), 400
|
||||
data = _load_network_devices()
|
||||
for d in data.get("devices", []):
|
||||
if (d.get("mac") or "").lower() == mac.lower():
|
||||
d["action"] = action
|
||||
d["action_at"] = time.time()
|
||||
_save_network_devices(data)
|
||||
return jsonify({"ok": True})
|
||||
return jsonify({"ok": False, "error": "Device not found"}), 404
|
||||
return jsonify({"ok": False, "error": "source must be 'usb' or 'network'"}), 400
|
||||
|
||||
|
||||
@app.route("/api/register-device", methods=["POST"])
|
||||
def api_register_device():
|
||||
"""Called by a network-booted device to register (mac, ip)."""
|
||||
body = request.get_json(force=True, silent=True) or request.form
|
||||
mac = (body.get("mac") or "").strip()
|
||||
ip = (body.get("ip") or request.remote_addr or "").strip()
|
||||
if not mac:
|
||||
return jsonify({"ok": False, "error": "mac required"}), 400
|
||||
data = _load_network_devices()
|
||||
devices = data.get("devices", [])
|
||||
for d in devices:
|
||||
if (d.get("mac") or "").lower() == mac.lower():
|
||||
d["ip"] = ip
|
||||
d["registered_at"] = time.time()
|
||||
d["action"] = d.get("action") or "wait"
|
||||
_save_network_devices(data)
|
||||
return jsonify({"ok": True, "message": "registered"})
|
||||
devices.append({"mac": mac, "ip": ip, "registered_at": time.time(), "action": "wait"})
|
||||
data["devices"] = devices
|
||||
_save_network_devices(data)
|
||||
return jsonify({"ok": True, "message": "registered"})
|
||||
|
||||
|
||||
@app.route("/api/device-action-poll")
|
||||
def api_device_action_poll():
|
||||
"""Network device polls this to get its assigned action (deploy/backup) and URL."""
|
||||
mac = (request.args.get("mac") or "").strip()
|
||||
if not mac:
|
||||
return jsonify({"action": "wait"}), 200
|
||||
data = _load_network_devices()
|
||||
base = request.host_url.rstrip("/")
|
||||
for d in data.get("devices", []):
|
||||
if (d.get("mac") or "").lower() == mac.lower():
|
||||
action = d.get("action") or "wait"
|
||||
if action == "deploy":
|
||||
return jsonify({"action": "deploy", "url": f"{base}/api/golden-image"})
|
||||
if action == "backup":
|
||||
return jsonify({"action": "backup", "upload_url": f"{base}/api/backup-upload?mac={mac}"})
|
||||
return jsonify({"action": "wait"})
|
||||
return jsonify({"action": "wait"})
|
||||
|
||||
|
||||
@app.route("/api/golden-image")
|
||||
def api_golden_image():
|
||||
"""Stream the golden image for network deploy (device pulls and writes to eMMC)."""
|
||||
if not GOLDEN_IMAGE.is_file():
|
||||
return jsonify({"error": "Golden image not found"}), 404
|
||||
return send_file(
|
||||
GOLDEN_IMAGE,
|
||||
mimetype="application/octet-stream",
|
||||
as_attachment=True,
|
||||
download_name="golden.img",
|
||||
)
|
||||
|
||||
|
||||
@app.route("/api/backup-upload", methods=["POST"])
|
||||
def api_backup_upload():
|
||||
"""Network device uploads its eMMC backup (raw body)."""
|
||||
mac = (request.args.get("mac") or "").strip().replace(":", "-")[:20]
|
||||
if not mac:
|
||||
return jsonify({"error": "mac query param required"}), 400
|
||||
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
name = f"backup-net-{mac}-{int(time.time())}.img"
|
||||
path = BACKUPS_DIR / name
|
||||
try:
|
||||
with open(path, "wb") as f:
|
||||
while True:
|
||||
chunk = request.stream.read(1024 * 1024)
|
||||
if not chunk:
|
||||
break
|
||||
f.write(chunk)
|
||||
return jsonify({"ok": True, "file": name})
|
||||
except (OSError, IOError) as e:
|
||||
if path.exists():
|
||||
path.unlink(missing_ok=True)
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
|
||||
@app.route("/api/backups")
|
||||
def api_backups():
|
||||
return jsonify({"backups": list_backups()})
|
||||
|
||||
|
||||
@app.route("/api/backups/<path:name>")
|
||||
def api_backup_download(name):
|
||||
if ".." in name or "/" in name or "\\" in name:
|
||||
return jsonify({"error": "invalid name"}), 400
|
||||
path = BACKUPS_DIR / name
|
||||
if not path.is_file():
|
||||
return jsonify({"error": "not found"}), 404
|
||||
return send_file(path, as_attachment=True, download_name=name)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=5000, debug=False)
|
||||
Reference in New Issue
Block a user