Files
Alpine_5G/web/app.py
nearxos 160ad641ce Add web GUI, docs, scripts, and 5G router config
- Web app (Flask): status, config, firewall, logs, users, restart
- Docs: AT commands, deploy, DNS, quickstart, web GUI
- Scripts: connect, deploy, diag, healthcheck, modem-status, speedtest, status, troubleshoot
- Init and iptables: 5g-router, 5g-webgui, rules.v4
- CHANGELOG, TODO, REVISION; config and README updates
2026-02-02 09:38:23 +02:00

747 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Alpine 5G Router Web GUI
Login: admin/support with different permissions. Run: python app.py or gunicorn.
Rev: 1 (see REVISION in repo root)
"""
import json
import os
import subprocess
from pathlib import Path
from flask import (
Flask,
jsonify,
redirect,
render_template,
request,
send_from_directory,
session,
url_for,
)
from auth_manager import (
ROLES,
add_user,
can_access_config,
can_access_firewall,
can_manage_users,
can_restart_5g,
can_view_logs,
can_view_status,
delete_user,
get_user,
init_default_users,
list_users,
set_password,
verify_user,
)
from db import (
init_db,
iptables_add,
iptables_delete,
iptables_list,
iptables_update,
routes_add,
routes_delete,
routes_list,
routes_update,
)
app = Flask(__name__, static_folder="static", static_url_path="", template_folder="templates")
app.secret_key = os.environ.get("SECRET_KEY", "change-me-in-production-alpine-5g")
app.config["MAX_CONTENT_LENGTH"] = 512 * 1024 # 512KB max for config/log uploads
CONFIG_PATH = "/etc/5g-router.conf"
IPTABLES_RULES = "/etc/iptables/rules.v4"
LOG_5G = "/var/log/5g-router.log"
LOG_SPEEDTEST = "/var/log/speedtest-5g.log"
CONNECT_SCRIPT = "/usr/local/bin/connect-5g.sh"
STATUS_SCRIPT = "/usr/local/bin/status-5g.sh"
MODEM_STATUS_SCRIPT = "/usr/local/bin/modem-status-at.sh"
SPEEDTEST_SCRIPT = "speedtest-cli"
def _current_user():
if not session.get("username"):
return None
return get_user(session["username"])
def _require_login(f):
from functools import wraps
@wraps(f)
def inner(*args, **kwargs):
if not _current_user():
if request.path.startswith("/api/"):
return jsonify({"error": "Unauthorized"}), 401
return redirect(url_for("login_page"))
return f(*args, **kwargs)
return inner
def _require_role(*allowed_roles):
def decorator(f):
from functools import wraps
@wraps(f)
def inner(*args, **kwargs):
u = _current_user()
if not u:
return jsonify({"error": "Unauthorized"}), 401
if u.get("role") not in allowed_roles:
return jsonify({"error": "Forbidden"}), 403
return f(*args, **kwargs)
return inner
return decorator
# ---- Pages (one HTML per function) ----
@app.route("/")
def index():
if _current_user():
return redirect(url_for("status_page"))
return redirect(url_for("login_page"))
@app.route("/login")
def login_page():
if _current_user():
return redirect(url_for("status_page"))
return render_template("login.html")
@app.route("/logout")
def logout_page():
session.clear()
return redirect(url_for("login_page"))
def _render_page(template, page_id, admin_only=False):
user = _current_user()
if not user:
return redirect(url_for("login_page"))
if admin_only and user.get("role") != "admin":
return redirect(url_for("status_page")) # or 403
return render_template(template, user=user, page_id=page_id)
@app.route("/status")
@_require_login
def status_page():
return _render_page("status.html", "status")
@app.route("/logs")
@_require_login
def logs_page():
return _render_page("logs.html", "logs")
@app.route("/restart")
@_require_login
def restart_page():
return _render_page("restart.html", "restart")
@app.route("/config")
@_require_login
def config_page():
return _render_page("config.html", "config", admin_only=True)
@app.route("/firewall")
@_require_login
def firewall_page():
return _render_page("firewall.html", "firewall", admin_only=True)
@app.route("/routes")
@_require_login
def routes_page():
return _render_page("routes.html", "routes", admin_only=True)
@app.route("/users")
@_require_login
def users_page():
return _render_page("users.html", "users", admin_only=True)
@app.route("/static/<path:path>")
def static_files(path):
return send_from_directory("static", path)
# ---- API: Auth ----
@app.route("/api/login", methods=["POST"])
def api_login():
data = request.get_json() or {}
username = (data.get("username") or "").strip()
password = data.get("password") or ""
if not username or not password:
return jsonify({"error": "Username and password required"}), 400
user = verify_user(username, password)
if not user:
return jsonify({"error": "Invalid username or password"}), 401
session["username"] = user["username"]
session["role"] = user["role"]
return jsonify({"user": user})
@app.route("/api/logout", methods=["POST"])
def api_logout():
session.clear()
return jsonify({"ok": True})
@app.route("/api/me")
@_require_login
def api_me():
u = _current_user()
return jsonify({"user": u})
# ---- API: Status (support + admin) ----
@app.route("/api/status")
@_require_login
def api_status():
if not can_view_status(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
out = subprocess.run(
[STATUS_SCRIPT, "--json"],
capture_output=True,
text=True,
timeout=10,
)
if out.returncode != 0:
return jsonify({"error": "status script failed", "stderr": out.stderr}), 500
data = json.loads(out.stdout)
# Enrich with modem AT status (AT_COMMANDS_REFERENCE.md)
if Path(MODEM_STATUS_SCRIPT).exists():
try:
mod_out = subprocess.run(
[MODEM_STATUS_SCRIPT],
capture_output=True,
text=True,
timeout=15,
cwd="/",
)
if mod_out.returncode == 0 and mod_out.stdout.strip():
mod_data = json.loads(mod_out.stdout)
data["modem"] = mod_data
except (FileNotFoundError, json.JSONDecodeError, subprocess.TimeoutExpired):
data["modem"] = {}
else:
data["modem"] = {}
return jsonify(data)
except subprocess.TimeoutExpired:
return jsonify({"error": "timeout"}), 504
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Config (admin only) ----
def _read_config():
if not Path(CONFIG_PATH).exists():
return {}
text = Path(CONFIG_PATH).read_text()
# Simple key="value" or KEY="value" parsing
cfg = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, _, v = line.partition("=")
k = k.strip()
v = v.strip().strip('"').strip("'")
cfg[k] = v
return cfg
def _write_config(cfg):
lines = []
for k, v in cfg.items():
if " " in str(v) or '"' in str(v):
v = json.dumps(str(v))
else:
v = f'"{v}"'
lines.append(f'{k}={v}')
Path(CONFIG_PATH).write_text("\n".join(lines) + "\n")
@app.route("/api/config", methods=["GET"])
@_require_login
def api_config_get():
if not can_access_config(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
return jsonify(_read_config())
@app.route("/api/config", methods=["PUT"])
@_require_login
def api_config_put():
if not can_access_config(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json()
if not data:
return jsonify({"error": "JSON body required"}), 400
try:
_write_config(data)
return jsonify({"ok": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Logs (support + admin) ----
@app.route("/api/logs")
@_require_login
def api_logs():
if not can_view_logs(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
which = request.args.get("which", "5g") # 5g | speedtest
path = LOG_5G if which == "5g" else LOG_SPEEDTEST
tail = int(request.args.get("tail", 200))
if tail > 2000:
tail = 2000
if not Path(path).exists():
return jsonify({"lines": [], "file": path})
try:
lines = subprocess.run(
["tail", "-n", str(tail), path],
capture_output=True,
text=True,
timeout=5,
)
return jsonify({"lines": (lines.stdout or "").strip().split("\n"), "file": path})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Firewall (admin only, SQLite) ----
def _iptables_generate_from_db():
"""Generate iptables-restore file content from DB rules."""
rules = iptables_list()
by_table = {}
for r in rules:
if not r.get("enabled"):
continue
t = r.get("table_name") or "filter"
if t not in by_table:
by_table[t] = []
by_table[t].append(r.get("rule_line") or "")
lines = []
for table in ("filter", "nat"):
lines.append("*" + table)
if table == "filter":
lines.append(":INPUT ACCEPT [0:0]")
lines.append(":FORWARD ACCEPT [0:0]")
lines.append(":OUTPUT ACCEPT [0:0]")
lines.append("# Allow web GUI (port 5000) from eth0")
lines.append("-A INPUT -i eth0 -p tcp --dport 5000 -j ACCEPT")
else:
lines.append(":PREROUTING ACCEPT [0:0]")
lines.append(":INPUT ACCEPT [0:0]")
lines.append(":OUTPUT ACCEPT [0:0]")
lines.append(":POSTROUTING ACCEPT [0:0]")
for rule_line in by_table.get(table, []):
if rule_line.strip():
lines.append(rule_line.strip())
lines.append("COMMIT")
return "\n".join(lines) + "\n"
def _seed_firewall_from_file_if_empty():
"""One-time: if DB has no rules and rules.v4 exists, import into DB."""
if iptables_list():
return
if not Path(IPTABLES_RULES).exists():
return
try:
content = Path(IPTABLES_RULES).read_text()
table = None
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("*"):
table = line[1:].strip()
continue
if line == "COMMIT":
table = None
continue
if table and line.startswith("-A "):
iptables_add(table, line, enabled=1, order_idx=0)
except Exception:
pass
@app.route("/api/firewall", methods=["GET"])
@_require_login
def api_firewall_get():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
_seed_firewall_from_file_if_empty()
rules = iptables_list()
# Normalize for JSON (sqlite3.Row -> dict with int for id)
out = []
for r in rules:
out.append({
"id": r["id"],
"table_name": r["table_name"],
"rule_line": r["rule_line"],
"enabled": bool(r["enabled"]),
"order_idx": r["order_idx"],
"created_at": r["created_at"] or "",
})
return jsonify({"rules": out, "path": IPTABLES_RULES})
@app.route("/api/firewall", methods=["POST"])
@_require_login
def api_firewall_post():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
table_name = (data.get("table_name") or "filter").strip()
rule_line = (data.get("rule_line") or "").strip()
if not rule_line:
return jsonify({"error": "rule_line required"}), 400
order_idx = int(data.get("order_idx") or 0)
try:
rid = iptables_add(table_name, rule_line, enabled=1, order_idx=order_idx)
return jsonify({"ok": True, "id": rid})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/firewall/<int:rule_id>", methods=["PUT"])
@_require_login
def api_firewall_put_id(rule_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
iptables_update(
rule_id,
table_name=data.get("table_name"),
rule_line=data.get("rule_line"),
enabled=data.get("enabled") if "enabled" in data else None,
order_idx=data.get("order_idx") if "order_idx" in data else None,
)
return jsonify({"ok": True})
@app.route("/api/firewall/<int:rule_id>", methods=["DELETE"])
@_require_login
def api_firewall_delete_id(rule_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
iptables_delete(rule_id)
return jsonify({"ok": True})
@app.route("/api/firewall/apply", methods=["POST"])
@_require_login
def api_firewall_apply():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
content = _iptables_generate_from_db()
Path(IPTABLES_RULES).parent.mkdir(parents=True, exist_ok=True)
Path(IPTABLES_RULES).write_text(content)
subprocess.run(["iptables-restore", IPTABLES_RULES], check=True, timeout=10, capture_output=True, text=True)
return jsonify({"ok": True, "message": "Rules applied"})
except subprocess.CalledProcessError as e:
return jsonify({"error": "iptables-restore failed: " + (e.stderr or str(e))}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: 5G restart (support + admin) ----
@app.route("/api/5g/restart", methods=["POST"])
@_require_login
def api_5g_restart():
if not can_restart_5g(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
subprocess.Popen(
[CONNECT_SCRIPT],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return jsonify({"ok": True, "message": "Restart initiated"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Speedtest (support + admin) ----
def _read_5g_router_config():
"""Read WAN_IF and FAILOVER_IF from /etc/5g-router.conf. Defaults: eth1, eth0."""
cfg = {}
if Path(CONFIG_PATH).exists():
for line in Path(CONFIG_PATH).read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
cfg[k.strip()] = v.strip().strip('"').strip("'")
return {
"wan_if": (cfg.get("WAN_IF") or "eth1").strip(),
"failover_if": (cfg.get("FAILOVER_IF") or "eth0").strip(),
}
def _get_interface_ip(iface):
"""Get first IPv4 address of interface."""
try:
out = subprocess.run(
["ip", "-4", "addr", "show", iface],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0:
return None
import re
m = re.search(r"inet\s+(\d+\.\d+\.\d+\.\d+)", out.stdout)
return m.group(1) if m else None
except Exception:
return None
def _get_public_ip(iface):
"""Get public IPv4 as seen when using this interface (curl --interface)."""
try:
out = subprocess.run(
["curl", "-s", "--max-time", "10", "--interface", iface, "https://api.ipify.org"],
capture_output=True,
text=True,
timeout=12,
)
if out.returncode == 0 and out.stdout and out.stdout.strip():
return out.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None
@app.route("/api/speedtest", methods=["POST"])
@_require_login
def api_speedtest():
if not can_view_status(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
interface = (data.get("interface") or "5g").strip().lower() # 5g | wan
net_cfg = _read_5g_router_config()
# 5g = modem WAN (WAN_IF), wan = failover/management (FAILOVER_IF)
if interface == "5g":
iface = net_cfg["wan_if"]
else:
iface = net_cfg["failover_if"]
source_ip = _get_interface_ip(iface)
# For 5G we must bind to the modem interface; otherwise traffic uses default route (WAN)
if interface == "5g" and not source_ip:
return jsonify({
"error": f"5G interface ({iface}) has no IP. Connect 5G first, then run speedtest.",
}), 503
args = [SPEEDTEST_SCRIPT, "--simple"]
if source_ip:
args.extend(["--source", source_ip])
try:
out = subprocess.run(
args,
capture_output=True,
text=True,
timeout=120,
)
raw = (out.stdout or "").strip()
err = (out.stderr or "").strip()
if out.returncode != 0:
return jsonify({"error": err or "speedtest failed", "raw": raw}), 500
# Public IP as seen from this interface
public_ip = _get_public_ip(iface)
result = {
"raw": raw,
"interface": interface,
"iface": iface,
"source_ip": source_ip,
"public_ip": public_ip,
}
for line in raw.split("\n"):
line = line.strip()
if line.startswith("Ping:"):
result["ping"] = line.replace("Ping:", "").strip()
elif line.startswith("Download:"):
result["download"] = line.replace("Download:", "").strip()
elif line.startswith("Upload:"):
result["upload"] = line.replace("Upload:", "").strip()
return jsonify(result)
except subprocess.TimeoutExpired:
return jsonify({"error": "speedtest timeout (120s)"}), 504
except FileNotFoundError:
return jsonify({"error": "speedtest-cli not installed (apk add speedtest-cli)"}), 503
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Routes (admin only, SQLite) ----
@app.route("/api/routes", methods=["GET"])
@_require_login
def api_routes_get():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
rows = routes_list()
out = []
for r in rows:
out.append({
"id": r["id"],
"destination": r["destination"] or "",
"gateway": r["gateway"] or "",
"dev": r["dev"] or "",
"metric": r["metric"],
"enabled": bool(r["enabled"]),
"created_at": r["created_at"] or "",
})
return jsonify({"routes": out})
@app.route("/api/routes/live", methods=["GET"])
@_require_login
def api_routes_live():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
out = subprocess.run(["ip", "route", "show"], capture_output=True, text=True, timeout=5)
lines = (out.stdout or "").strip().split("\n")
return jsonify({"routes": lines})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/routes", methods=["POST"])
@_require_login
def api_routes_post():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
destination = (data.get("destination") or "").strip()
if not destination:
return jsonify({"error": "destination required"}), 400
gateway = (data.get("gateway") or "").strip() or None
dev = (data.get("dev") or "").strip() or None
metric = data.get("metric")
if metric is not None:
metric = int(metric)
try:
rid = routes_add(destination, gateway=gateway, dev=dev, metric=metric)
return jsonify({"ok": True, "id": rid})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/routes/<int:route_id>", methods=["PUT"])
@_require_login
def api_routes_put_id(route_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
kwargs = {}
if "destination" in data:
kwargs["destination"] = data["destination"]
if "gateway" in data:
kwargs["gateway"] = data["gateway"]
if "dev" in data:
kwargs["dev"] = data["dev"]
if "metric" in data:
kwargs["metric"] = int(data["metric"]) if data["metric"] is not None else None
if "enabled" in data:
kwargs["enabled"] = data["enabled"]
routes_update(route_id, **kwargs)
return jsonify({"ok": True})
@app.route("/api/routes/<int:route_id>", methods=["DELETE"])
@_require_login
def api_routes_delete_id(route_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
routes_delete(route_id)
return jsonify({"ok": True})
@app.route("/api/routes/apply", methods=["POST"])
@_require_login
def api_routes_apply():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
rows = [r for r in routes_list() if r.get("enabled")]
for r in rows:
dest = (r.get("destination") or "").strip()
if not dest:
continue
args = ["ip", "route", "add", dest]
if (r.get("gateway") or "").strip():
args.extend(["via", (r.get("gateway") or "").strip()])
if (r.get("dev") or "").strip():
args.extend(["dev", (r.get("dev") or "").strip()])
if r.get("metric") is not None:
args.extend(["metric", str(int(r["metric"]))])
subprocess.run(args, timeout=5, capture_output=True, text=True)
return jsonify({"ok": True, "message": "Routes applied"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Users (admin only) ----
@app.route("/api/users", methods=["GET"])
@_require_login
@_require_role("admin")
def api_users_list():
return jsonify({"users": list_users(), "roles": list(ROLES)})
@app.route("/api/users", methods=["PUT"])
@_require_login
@_require_role("admin")
def api_users_put():
data = request.get_json()
if not data:
return jsonify({"error": "JSON required"}), 400
action = data.get("action") # add | password | delete
if action == "add":
ok, err = add_user(
data.get("username", "").strip(),
data.get("password", ""),
data.get("role", "support"),
)
if not ok:
return jsonify({"error": err or "Failed"}), 400
return jsonify({"ok": True})
if action == "password":
username = data.get("username", "").strip()
password = data.get("password", "")
if not username or not password:
return jsonify({"error": "username and password required"}), 400
if get_user(username) is None:
return jsonify({"error": "User not found"}), 404
set_password(username, password)
return jsonify({"ok": True})
if action == "delete":
username = data.get("username", "").strip()
if not username:
return jsonify({"error": "username required"}), 400
ok, err = delete_user(username)
if not ok:
return jsonify({"error": err or "Failed"}), 400
return jsonify({"ok": True})
return jsonify({"error": "Invalid action"}), 400
# ---- Entry (default users created on first request or at import) ----
init_default_users()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)