Implement user authentication and admin features in eMMC provisioning dashboard: add SQLite database for user management, create admin log functionality, and enhance session handling. Update README to reflect new public and admin access levels, and improve deployment scripts to support cloud-init images and portal files management.

This commit is contained in:
nearxos
2026-02-19 15:17:47 +02:00
parent 987e71c36e
commit 39aa042dc9
9 changed files with 1167 additions and 61 deletions

View File

@@ -1,39 +1,36 @@
#!/usr/bin/env python3
"""
Flask dashboard for CM4 eMMC provisioning.
Monitors deployment status, shows device connection steps, backup/restore.
Supports USB boot mode (ask Backup/Deploy) and network-booted devices (register, then Backup/Deploy).
Public home: deploy only (status, logs, how to connect). No login.
Admin: login required — backups, cloud-init, portal files, set golden, users.
"""
import json
import os
import re
import shutil
import subprocess
import sqlite3
import time
import urllib.request
from functools import wraps
from pathlib import Path
from flask import Flask, render_template, jsonify, request, send_file, Response
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, session
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.secret_key = os.environ.get("CM4_DASHBOARD_SECRET_KEY", os.urandom(24).hex())
@app.after_request
def no_cache(response):
"""Prevent browser from caching the dashboard so deploys are visible immediately."""
if request.path == "/" or request.path.startswith("/api/"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
# --- Paths ---
BASE_DIR = Path(os.environ.get("CM4_PROVISIONING_DIR", "/var/lib/cm4-provisioning"))
STATUS_FILE = os.environ.get("CM4_STATUS_FILE", str(BASE_DIR / "status.json"))
LOG_FILE = os.environ.get("CM4_LOG_FILE", str(BASE_DIR / "flash.log"))
ACTION_REQUEST_FILE = os.environ.get("CM4_ACTION_REQUEST_FILE", str(BASE_DIR / "action_request"))
DEVICE_SOURCE_FILE = os.environ.get("CM4_DEVICE_SOURCE_FILE", str(BASE_DIR / "device_source"))
BACKUPS_DIR = Path(os.environ.get("CM4_BACKUPS_DIR", str(BASE_DIR / "backups")))
CLOUDINIT_IMAGES_DIR = Path(os.environ.get("CM4_CLOUDINIT_IMAGES_DIR", str(BASE_DIR / "cloudinit-images")))
PORTAL_FILES_DIR = Path(os.environ.get("CM4_PORTAL_FILES_DIR", str(BASE_DIR / "portal-files")))
GOLDEN_IMAGE = Path(os.environ.get("CM4_GOLDEN_IMAGE", str(BASE_DIR / "golden.img")))
NETWORK_DEVICES_FILE = Path(os.environ.get("CM4_NETWORK_DEVICES_FILE", str(BASE_DIR / "network_devices.json")))
BUILD_STATUS_FILE = Path(os.environ.get("CM4_BUILD_STATUS_FILE", str(BASE_DIR / "build_cloudinit_status.json")))
@@ -41,6 +38,117 @@ BUILD_REQUEST_FILE = Path(os.environ.get("CM4_BUILD_REQUEST_FILE", str(BASE_DIR
SHRINK_REQUEST_FILE = Path(os.environ.get("CM4_SHRINK_REQUEST_FILE", str(BASE_DIR / "shrink_request.json")))
SHRINK_STATUS_FILE = Path(os.environ.get("CM4_SHRINK_STATUS_FILE", str(BASE_DIR / "shrink_status.json")))
CLOUDINIT_TEMPLATES_FILE = Path(os.environ.get("CM4_CLOUDINIT_TEMPLATES_FILE", str(BASE_DIR / "cloudinit_templates.json")))
DB_PATH = Path(os.environ.get("CM4_DASHBOARD_DB", str(BASE_DIR / "dashboard.db")))
# --- Database (admin users + activity logs) ---
def get_db():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(DB_PATH.parent, exist_ok=True)
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS admin_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
action TEXT NOT NULL,
details TEXT,
created_at REAL NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX IF NOT EXISTS idx_logs_created ON admin_logs(created_at DESC);
""")
conn.commit()
conn.close()
def admin_log(action, details=None):
user_id = session.get("user_id")
conn = get_db()
conn.execute(
"INSERT INTO admin_logs (user_id, action, details, created_at) VALUES (?, ?, ?, ?)",
(user_id, action, details, time.time()),
)
conn.commit()
conn.close()
def require_admin(f):
@wraps(f)
def wrapped(*args, **kwargs):
if not session.get("admin_logged_in"):
if request.is_json or request.path.startswith("/api/"):
return jsonify({"ok": False, "error": "Login required"}), 401
return redirect(url_for("login", next=request.url))
return f(*args, **kwargs)
return wrapped
def get_user_by_username(username):
conn = get_db()
row = conn.execute("SELECT id, username, password_hash FROM users WHERE username = ?", (username,)).fetchone()
conn.close()
return dict(row) if row else None
def create_user(username, password):
conn = get_db()
try:
conn.execute(
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
(username, generate_password_hash(password), time.time()),
)
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def list_users():
conn = get_db()
rows = conn.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall()
conn.close()
return [{"id": r["id"], "username": r["username"], "created_at": r["created_at"]} for r in rows]
def change_password(user_id, new_password):
conn = get_db()
conn.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(generate_password_hash(new_password), user_id),
)
conn.commit()
conn.close()
def get_recent_logs(limit=100):
conn = get_db()
rows = conn.execute(
"SELECT l.id, l.action, l.details, l.created_at, u.username FROM admin_logs l LEFT JOIN users u ON l.user_id = u.id ORDER BY l.created_at DESC LIMIT ?",
(limit,),
).fetchall()
conn.close()
return [{"id": r["id"], "action": r["action"], "details": r["details"], "created_at": r["created_at"], "username": r["username"]} for r in rows]
@app.after_request
def no_cache(response):
if request.path == "/" or request.path.startswith("/api/") or request.path.startswith("/admin") or request.path.startswith("/login"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
# Default cloud-init user-data for Raspberry Pi OS (NoCloud on boot partition)
DEFAULT_USER_DATA = """#cloud-config
@@ -119,6 +227,7 @@ def _save_network_devices(data):
BACKUPS_META_FILE = BACKUPS_DIR / "backups_meta.json"
CLOUDINIT_META_FILE = CLOUDINIT_IMAGES_DIR / "cloudinit_meta.json"
def _load_backups_meta():
@@ -150,6 +259,26 @@ def _safe_backup_name(name):
return True
def _load_cloudinit_meta():
try:
if CLOUDINIT_META_FILE.is_file():
with open(CLOUDINIT_META_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {}
def _save_cloudinit_meta(data):
try:
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
with open(CLOUDINIT_META_FILE, "w") as f:
json.dump(data, f, indent=2)
return True
except (PermissionError, OSError):
return False
def list_backups():
if not BACKUPS_DIR.is_dir():
return []
@@ -172,9 +301,109 @@ def list_backups():
return out
def list_cloudinit_images():
if not CLOUDINIT_IMAGES_DIR.is_dir():
return []
meta = _load_cloudinit_meta()
out = []
for p in sorted(CLOUDINIT_IMAGES_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if p.is_file() and p.name != "cloudinit_meta.json" and p.name.endswith((".img", ".img.gz", ".img.xz")):
try:
st = p.stat()
m = meta.get(p.name, {})
out.append({
"name": p.name,
"display_name": m.get("name") or p.name,
"description": m.get("description") or "",
"size": st.st_size,
"mtime": st.st_mtime,
})
except OSError:
pass
return out
def _golden_current_source():
"""Return (source, name) if golden is a symlink to backups or cloudinit, else (None, None)."""
if not GOLDEN_IMAGE.exists():
return None, None
try:
target = GOLDEN_IMAGE.resolve()
try:
target.relative_to(BACKUPS_DIR.resolve())
return "backups", target.name
except ValueError:
pass
try:
target.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
return "cloudinit", target.name
except ValueError:
pass
except OSError:
pass
return None, None
@app.route("/")
def index():
return render_template("index.html")
return render_template("home.html")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
if session.get("admin_logged_in"):
return redirect(url_for("admin"))
return render_template("login.html")
username = (request.form.get("username") or "").strip()
password = (request.form.get("password") or "")
if not username:
return render_template("login.html", error="Username required")
user = get_user_by_username(username)
if not user:
# First user: allow self-registration
if not list_users():
if not password or len(password) < 6:
return render_template("login.html", error="First user: choose a password (min 6 characters)")
create_user(username, password)
session["admin_logged_in"] = True
session["user_id"] = get_user_by_username(username)["id"]
session["username"] = username
admin_log("first_admin_created", username)
return redirect(request.args.get("next") or url_for("admin"))
return render_template("login.html", error="Invalid username or password")
if not check_password_hash(user["password_hash"], password):
return render_template("login.html", error="Invalid username or password")
session["admin_logged_in"] = True
session["user_id"] = user["id"]
session["username"] = user["username"]
admin_log("login", username)
return redirect(request.args.get("next") or url_for("admin"))
@app.route("/logout")
def logout():
if session.get("admin_logged_in"):
admin_log("logout", session.get("username"))
session.clear()
return redirect(url_for("index"))
@app.route("/admin")
@require_admin
def admin():
return render_template("admin.html", username=session.get("username", "Admin"))
# Serve portal files for wget (e.g. cloud-init first boot). No auth.
@app.route("/files/<path:filename>")
def serve_portal_file(filename):
if ".." in filename or "/" in filename or "\\" in filename:
return jsonify({"error": "invalid path"}), 400
path = PORTAL_FILES_DIR / filename
if not path.is_file():
return jsonify({"error": "not found"}), 404
return send_file(path, as_attachment=False, download_name=filename)
@app.route("/api/status")
@@ -331,11 +560,74 @@ def api_backup_upload():
@app.route("/api/backups")
@require_admin
def api_backups():
return jsonify({"backups": list_backups(), "backups_dir": str(BACKUPS_DIR)})
@app.route("/api/cloudinit-images")
@require_admin
def api_cloudinit_images():
return jsonify({"images": list_cloudinit_images(), "cloudinit_dir": str(CLOUDINIT_IMAGES_DIR)})
@app.route("/api/portal-files")
@require_admin
def api_portal_files_list():
if not PORTAL_FILES_DIR.is_dir():
return jsonify({"files": [], "base_url": request.host_url.rstrip("/") + "/files/"})
files = []
for p in sorted(PORTAL_FILES_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if p.is_file() and ".." not in p.name:
try:
files.append({"name": p.name, "size": p.stat().st_size, "mtime": p.stat().st_mtime})
except OSError:
pass
base = request.host_url.rstrip("/") + "/files/"
return jsonify({"files": files, "base_url": base})
@app.route("/api/portal-files/upload", methods=["POST"])
@require_admin
def api_portal_files_upload():
if "file" not in request.files and "upload" not in request.files:
return jsonify({"ok": False, "error": "no file (use field 'file' or 'upload')"}), 400
f = request.files.get("file") or request.files.get("upload")
if not f or not f.filename:
return jsonify({"ok": False, "error": "no file selected"}), 400
name = re.sub(r"[^\w\-.]", "_", f.filename)[:120] or "upload"
if ".." in name or name.startswith("/"):
return jsonify({"ok": False, "error": "invalid filename"}), 400
path = PORTAL_FILES_DIR / name
try:
PORTAL_FILES_DIR.mkdir(parents=True, exist_ok=True)
f.save(str(path))
admin_log("portal_upload", name)
return jsonify({"ok": True, "name": name, "url": request.host_url.rstrip("/") + "/files/" + name})
except (OSError, IOError) as e:
if path.exists():
path.unlink(missing_ok=True)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/portal-files/<path:name>", methods=["DELETE"])
@require_admin
def api_portal_file_delete(name):
if ".." in name or "/" in name or "\\" in name:
return jsonify({"ok": False, "error": "invalid name"}), 400
path = PORTAL_FILES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "not found"}), 404
try:
path.unlink()
admin_log("portal_delete", name)
return jsonify({"ok": True})
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/backups/upload", methods=["POST"])
@require_admin
def api_backups_upload():
"""Upload an image file from the dashboard (multipart form)."""
if "file" not in request.files and "image" not in request.files:
@@ -370,9 +662,27 @@ def api_backups_upload():
return jsonify({"ok": False, "error": str(e)}), 500
def _set_golden_from_path(path):
"""Set golden image to point to given path (file in backups or cloudinit dir)."""
GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True)
if GOLDEN_IMAGE.exists():
GOLDEN_IMAGE.unlink()
path_resolved = path.resolve()
try:
path_resolved.relative_to(BACKUPS_DIR.resolve())
except ValueError:
try:
path_resolved.relative_to(CLOUDINIT_IMAGES_DIR.resolve())
except ValueError:
shutil.copy2(path, GOLDEN_IMAGE)
return
os.symlink(path_resolved, GOLDEN_IMAGE)
@app.route("/api/backups/<path:name>/set-as-golden", methods=["POST"])
@require_admin
def api_backup_set_as_golden(name):
"""Use this backup as the golden image (symlink when in backups dir to avoid copying)."""
"""Use this backup as the golden image (symlink)."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid backup name"}), 400
path = BACKUPS_DIR / name
@@ -380,18 +690,27 @@ def api_backup_set_as_golden(name):
return jsonify({"ok": False, "error": "backup not found"}), 404
try:
BACKUPS_DIR.mkdir(parents=True, exist_ok=True)
GOLDEN_IMAGE.parent.mkdir(parents=True, exist_ok=True)
if GOLDEN_IMAGE.exists():
GOLDEN_IMAGE.unlink()
# Symlink to the backup so we use the same file instead of duplicating
path_resolved = path.resolve()
try:
path_resolved.relative_to(BACKUPS_DIR.resolve())
os.symlink(path_resolved, GOLDEN_IMAGE)
except ValueError:
# Backup not under BACKUPS_DIR (shouldn't happen for list); fall back to copy
shutil.copy2(path, GOLDEN_IMAGE)
return jsonify({"ok": True, "message": f"Golden image set from {name}"})
_set_golden_from_path(path)
admin_log("set_golden", f"backups/{name}")
return jsonify({"ok": True, "message": f"Golden image set from backup {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/cloudinit-images/<path:name>/set-as-golden", methods=["POST"])
@require_admin
def api_cloudinit_set_as_golden(name):
"""Use this cloud-init image as the golden image (symlink)."""
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "image not found"}), 404
try:
CLOUDINIT_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
_set_golden_from_path(path)
admin_log("set_golden", f"cloudinit/{name}")
return jsonify({"ok": True, "message": f"Golden image set from cloud-init image {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
@@ -424,7 +743,81 @@ def _request_host_shrink(name, action="shrink", format="xz"):
return False, "Shrink timed out (run on host may still be in progress)"
@app.route("/api/cloudinit-images/<path:name>", methods=["GET"])
@require_admin
def api_cloudinit_download(name):
if not _safe_backup_name(name):
return jsonify({"error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"error": "not found"}), 404
return send_file(path, as_attachment=True, download_name=name)
@app.route("/api/cloudinit-images/<path:name>", methods=["PATCH"])
@require_admin
def api_cloudinit_update(name):
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "not found"}), 404
body = request.get_json(force=True, silent=True) or {}
meta = _load_cloudinit_meta()
entry = meta.get(name, {})
new_filename = (body.get("filename") or "").strip()
if new_filename:
if not _safe_backup_name(new_filename):
return jsonify({"ok": False, "error": "invalid new filename"}), 400
new_path = CLOUDINIT_IMAGES_DIR / new_filename
if new_path.exists() and new_path != path:
return jsonify({"ok": False, "error": "target filename already exists"}), 409
try:
path.rename(new_path)
except OSError as e:
return jsonify({"ok": False, "error": str(e)}), 500
meta[new_filename] = {"name": entry.get("name") or name, "description": entry.get("description") or ""}
meta.pop(name, None)
name = new_filename
path = CLOUDINIT_IMAGES_DIR / name
else:
if "name" in body:
entry["name"] = (body.get("name") or "").strip() or path.name
if "description" in body:
entry["description"] = (body.get("description") or "").strip()
meta[name] = entry
if not _save_cloudinit_meta(meta):
return jsonify({"ok": False, "error": "could not save metadata"}), 500
return jsonify({"ok": True, "name": name})
@app.route("/api/cloudinit-images/<path:name>", methods=["DELETE"])
@require_admin
def api_cloudinit_delete(name):
if not _safe_backup_name(name):
return jsonify({"ok": False, "error": "invalid name"}), 400
path = CLOUDINIT_IMAGES_DIR / name
if not path.is_file():
return jsonify({"ok": False, "error": "not found"}), 404
try:
if GOLDEN_IMAGE.exists() and GOLDEN_IMAGE.is_symlink():
try:
if GOLDEN_IMAGE.resolve() == path.resolve():
GOLDEN_IMAGE.unlink()
except OSError:
pass
path.unlink()
meta = _load_cloudinit_meta()
meta.pop(name, None)
_save_cloudinit_meta(meta)
admin_log("cloudinit_delete", name)
return jsonify({"ok": True, "message": f"Deleted {name}"})
except (OSError, IOError) as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/backups/<path:name>/shrink", methods=["POST"])
@require_admin
def api_backup_shrink(name):
"""Request PiShrink on host for a raw .img backup (shrinks in place). Dashboard polls host status."""
if not _safe_backup_name(name):
@@ -441,6 +834,7 @@ def api_backup_shrink(name):
@app.route("/api/backups/<path:name>/compress", methods=["POST"])
@require_admin
def api_backup_compress(name):
"""Request PiShrink with compression on host. Produces .img.gz or .img.xz. Dashboard polls host status."""
if not _safe_backup_name(name):
@@ -464,6 +858,7 @@ def api_backup_compress(name):
@app.route("/api/backups/<path:name>", methods=["PATCH"])
@require_admin
def api_backup_update(name):
"""Update backup metadata (display name, description) or rename the file."""
if not _safe_backup_name(name):
@@ -504,6 +899,7 @@ def api_backup_update(name):
@app.route("/api/backups/<path:name>", methods=["DELETE"])
@require_admin
def api_backup_delete(name):
"""Delete a backup file. If it is the current golden image (symlink), the golden link is removed."""
if not _safe_backup_name(name):
@@ -528,6 +924,7 @@ def api_backup_delete(name):
@app.route("/api/backups/<path:name>", methods=["GET"])
@require_admin
def api_backup_download(name):
if not _safe_backup_name(name):
return jsonify({"error": "invalid name"}), 400
@@ -611,12 +1008,14 @@ def _save_cloudinit_templates(data):
@app.route("/api/build-cloudinit-status")
@require_admin
def api_build_cloudinit_status():
"""Return current build status (phase, message, output_name, error)."""
return jsonify(_build_status_read())
@app.route("/api/build-cloudinit", methods=["POST"])
@require_admin
def api_build_cloudinit():
"""Start building a cloud-init image: write request file; host runs build (has loop devices)."""
st = _build_status_read()
@@ -653,6 +1052,7 @@ def api_build_cloudinit():
@app.route("/api/raspios-latest-url")
@require_admin
def api_raspios_latest_url():
"""Return the URL of the latest Raspberry Pi OS (arm64) image. Query: variant=lite|full."""
variant = (request.args.get("variant") or "lite").strip().lower()
@@ -665,6 +1065,7 @@ def api_raspios_latest_url():
@app.route("/api/cloudinit-templates", methods=["GET"])
@require_admin
def api_cloudinit_templates_list():
"""List saved cloud-init templates."""
data = _load_cloudinit_templates()
@@ -672,6 +1073,7 @@ def api_cloudinit_templates_list():
@app.route("/api/cloudinit-templates", methods=["POST"])
@require_admin
def api_cloudinit_templates_create():
"""Save a new cloud-init template."""
body = request.get_json(force=True, silent=True) or {}
@@ -695,6 +1097,7 @@ def api_cloudinit_templates_create():
@app.route("/api/cloudinit-templates/<tid>")
@require_admin
def api_cloudinit_templates_get(tid):
"""Get one template by id."""
data = _load_cloudinit_templates()
@@ -705,6 +1108,7 @@ def api_cloudinit_templates_get(tid):
@app.route("/api/cloudinit-templates/<tid>", methods=["DELETE"])
@require_admin
def api_cloudinit_templates_delete(tid):
"""Delete a template."""
data = _load_cloudinit_templates()
@@ -719,15 +1123,72 @@ def api_cloudinit_templates_delete(tid):
@app.route("/api/golden-info")
def api_golden_info():
"""Return whether golden image exists and its size/mtime for UI."""
"""Return whether golden image exists, size/mtime, and which file it points to (for UI)."""
if not GOLDEN_IMAGE.is_file():
return jsonify({"present": False})
try:
st = GOLDEN_IMAGE.stat()
return jsonify({"present": True, "size": st.st_size, "mtime": st.st_mtime})
src, src_name = _golden_current_source()
out = {"present": True, "size": st.st_size, "mtime": st.st_mtime}
if src and src_name:
out["source"] = src
out["name"] = src_name
return jsonify(out)
except OSError:
return jsonify({"present": False})
@app.route("/api/admin/users", methods=["GET"])
@require_admin
def api_admin_users():
return jsonify({"users": list_users()})
@app.route("/api/admin/users", methods=["POST"])
@require_admin
def api_admin_add_user():
body = request.get_json(force=True, silent=True) or {}
username = (body.get("username") or "").strip()
password = (body.get("password") or "")
if not username:
return jsonify({"ok": False, "error": "username required"}), 400
if len(password) < 6:
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
if create_user(username, password):
admin_log("user_created", username)
return jsonify({"ok": True, "message": f"User {username} created"})
return jsonify({"ok": False, "error": "username already exists"}), 409
@app.route("/api/admin/users/<int:user_id>/password", methods=["POST"])
@require_admin
def api_admin_change_password(user_id):
body = request.get_json(force=True, silent=True) or {}
new_password = body.get("password") or ""
if len(new_password) < 6:
return jsonify({"ok": False, "error": "password must be at least 6 characters"}), 400
conn = get_db()
row = conn.execute("SELECT id FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
if not row:
return jsonify({"ok": False, "error": "user not found"}), 404
change_password(user_id, new_password)
admin_log("password_changed", str(user_id))
return jsonify({"ok": True, "message": "Password updated"})
@app.route("/api/admin/logs")
@require_admin
def api_admin_logs():
limit = min(int(request.args.get("limit", 100)), 500)
return jsonify({"logs": get_recent_logs(limit)})
# Ensure DB exists when app is loaded (e.g. by gunicorn or systemd)
try:
init_db()
except Exception:
pass
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)