Files
Alpine_5G/web/app.py
nearxos 9dc35a57a2 Enhance 5G modem management with integrated web GUI and connection control
- Introduced a web GUI for managing 5G connections, replacing the standalone 5g-router service.
- Updated scripts to ensure exclusive access to the AT port, preventing conflicts.
- Improved troubleshooting documentation in 5G_MODEM_TROUBLESHOOTING.md, adding checks for processes using the AT port.
- Enhanced connection management in the web app, including auto-connect and detailed status APIs.
- Updated installation scripts to reflect changes in service management and dependencies.
2026-02-02 10:34:25 +02:00

829 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Alpine 5G Router Web GUI
Login: admin/support with different permissions. Run: python app.py or gunicorn.
Rev: 2 (see REVISION in repo root)
Now includes integrated 5G connection management (replaces standalone 5g-router service).
"""
import atexit
import json
import logging
import os
import subprocess
import threading
from pathlib import Path
from flask import (
Flask,
jsonify,
redirect,
render_template,
request,
send_from_directory,
session,
url_for,
)
from auth_manager import (
ROLES,
add_user,
can_access_config,
can_access_firewall,
can_manage_users,
can_restart_5g,
can_view_logs,
can_view_status,
delete_user,
get_user,
init_default_users,
list_users,
set_password,
verify_user,
)
from db import (
init_db,
iptables_add,
iptables_delete,
iptables_list,
iptables_update,
routes_add,
routes_delete,
routes_list,
routes_update,
)
# Connection management (replaces 5g-router service)
from at_client import ATClient
from connection_manager import ConnectionConfig, ConnectionManager
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
app = Flask(__name__, static_folder="static", static_url_path="", template_folder="templates")
app.secret_key = os.environ.get("SECRET_KEY", "change-me-in-production-alpine-5g")
app.config["MAX_CONTENT_LENGTH"] = 512 * 1024 # 512KB max for config/log uploads
CONFIG_PATH = "/etc/5g-router.conf"
IPTABLES_RULES = "/etc/iptables/rules.v4"
LOG_5G = "/var/log/5g-router.log"
LOG_SPEEDTEST = "/var/log/speedtest-5g.log"
CONNECT_SCRIPT = "/usr/local/bin/connect-5g.sh" # Kept for manual use
STATUS_SCRIPT = "/usr/local/bin/status-5g.sh"
MODEM_STATUS_SCRIPT = "/usr/local/bin/modem-status-at.sh" # Deprecated: using ATClient
SPEEDTEST_SCRIPT = "speedtest-cli"
# ---- Connection Manager (replaces 5g-router service) ----
_conn_config = ConnectionConfig.from_file(CONFIG_PATH)
_at_client = ATClient(port=_conn_config.at_port)
_conn_manager = ConnectionManager(_at_client, _conn_config)
_startup_connect_done = False
def _startup_connect():
"""Auto-connect on first request (deferred startup)."""
global _startup_connect_done
if _startup_connect_done:
return
_startup_connect_done = True
def do_connect():
logging.info("Auto-connecting 5G on startup...")
_conn_manager.connect()
if _conn_config.watchdog_interval > 0:
_conn_manager.start_watchdog()
# Run in background thread so startup doesn't block
threading.Thread(target=do_connect, name="5g-startup", daemon=True).start()
@atexit.register
def _shutdown():
"""Cleanup on shutdown."""
try:
_conn_manager.stop_watchdog()
_at_client.close()
except Exception:
pass
def _current_user():
if not session.get("username"):
return None
return get_user(session["username"])
def _require_login(f):
from functools import wraps
@wraps(f)
def inner(*args, **kwargs):
if not _current_user():
if request.path.startswith("/api/"):
return jsonify({"error": "Unauthorized"}), 401
return redirect(url_for("login_page"))
return f(*args, **kwargs)
return inner
def _require_role(*allowed_roles):
def decorator(f):
from functools import wraps
@wraps(f)
def inner(*args, **kwargs):
u = _current_user()
if not u:
return jsonify({"error": "Unauthorized"}), 401
if u.get("role") not in allowed_roles:
return jsonify({"error": "Forbidden"}), 403
return f(*args, **kwargs)
return inner
return decorator
# ---- Pages (one HTML per function) ----
@app.route("/")
def index():
if _current_user():
return redirect(url_for("status_page"))
return redirect(url_for("login_page"))
@app.route("/login")
def login_page():
if _current_user():
return redirect(url_for("status_page"))
return render_template("login.html")
@app.route("/logout")
def logout_page():
session.clear()
return redirect(url_for("login_page"))
def _render_page(template, page_id, admin_only=False):
user = _current_user()
if not user:
return redirect(url_for("login_page"))
if admin_only and user.get("role") != "admin":
return redirect(url_for("status_page")) # or 403
return render_template(template, user=user, page_id=page_id)
@app.route("/status")
@_require_login
def status_page():
return _render_page("status.html", "status")
@app.route("/logs")
@_require_login
def logs_page():
return _render_page("logs.html", "logs")
@app.route("/restart")
@_require_login
def restart_page():
return _render_page("restart.html", "restart")
@app.route("/config")
@_require_login
def config_page():
return _render_page("config.html", "config", admin_only=True)
@app.route("/firewall")
@_require_login
def firewall_page():
return _render_page("firewall.html", "firewall", admin_only=True)
@app.route("/routes")
@_require_login
def routes_page():
return _render_page("routes.html", "routes", admin_only=True)
@app.route("/users")
@_require_login
def users_page():
return _render_page("users.html", "users", admin_only=True)
@app.route("/static/<path:path>")
def static_files(path):
return send_from_directory("static", path)
# ---- API: Auth ----
@app.route("/api/login", methods=["POST"])
def api_login():
data = request.get_json() or {}
username = (data.get("username") or "").strip()
password = data.get("password") or ""
if not username or not password:
return jsonify({"error": "Username and password required"}), 400
user = verify_user(username, password)
if not user:
return jsonify({"error": "Invalid username or password"}), 401
session["username"] = user["username"]
session["role"] = user["role"]
return jsonify({"user": user})
@app.route("/api/logout", methods=["POST"])
def api_logout():
session.clear()
return jsonify({"ok": True})
@app.route("/api/me")
@_require_login
def api_me():
u = _current_user()
return jsonify({"user": u})
# ---- API: Status (support + admin) ----
@app.route("/api/status")
@_require_login
def api_status():
if not can_view_status(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
# Trigger auto-connect on first status request (deferred startup)
_startup_connect()
try:
# Get connection and modem status from ConnectionManager (native Python)
full_status = _conn_manager.get_full_status()
# Also include basic system status from status script for interface info
data = {}
try:
out = subprocess.run(
[STATUS_SCRIPT, "--json"],
capture_output=True,
text=True,
timeout=10,
)
if out.returncode == 0:
data = json.loads(out.stdout)
except Exception:
pass
# Merge connection manager status
data["connection"] = full_status.get("connection", {})
data["modem"] = full_status.get("modem", {})
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Config (admin only) ----
def _read_config():
if not Path(CONFIG_PATH).exists():
return {}
text = Path(CONFIG_PATH).read_text()
# Simple key="value" or KEY="value" parsing
cfg = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
k, _, v = line.partition("=")
k = k.strip()
v = v.strip().strip('"').strip("'")
cfg[k] = v
return cfg
def _write_config(cfg):
lines = []
for k, v in cfg.items():
if " " in str(v) or '"' in str(v):
v = json.dumps(str(v))
else:
v = f'"{v}"'
lines.append(f'{k}={v}')
Path(CONFIG_PATH).write_text("\n".join(lines) + "\n")
@app.route("/api/config", methods=["GET"])
@_require_login
def api_config_get():
if not can_access_config(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
return jsonify(_read_config())
@app.route("/api/config", methods=["PUT"])
@_require_login
def api_config_put():
if not can_access_config(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json()
if not data:
return jsonify({"error": "JSON body required"}), 400
try:
_write_config(data)
return jsonify({"ok": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Logs (support + admin) ----
@app.route("/api/logs")
@_require_login
def api_logs():
if not can_view_logs(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
which = request.args.get("which", "5g") # 5g | speedtest
path = LOG_5G if which == "5g" else LOG_SPEEDTEST
tail = int(request.args.get("tail", 200))
if tail > 2000:
tail = 2000
if not Path(path).exists():
return jsonify({"lines": [], "file": path})
try:
lines = subprocess.run(
["tail", "-n", str(tail), path],
capture_output=True,
text=True,
timeout=5,
)
return jsonify({"lines": (lines.stdout or "").strip().split("\n"), "file": path})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Firewall (admin only, SQLite) ----
def _iptables_generate_from_db():
"""Generate iptables-restore file content from DB rules."""
rules = iptables_list()
by_table = {}
for r in rules:
if not r.get("enabled"):
continue
t = r.get("table_name") or "filter"
if t not in by_table:
by_table[t] = []
by_table[t].append(r.get("rule_line") or "")
lines = []
for table in ("filter", "nat"):
lines.append("*" + table)
if table == "filter":
lines.append(":INPUT ACCEPT [0:0]")
lines.append(":FORWARD ACCEPT [0:0]")
lines.append(":OUTPUT ACCEPT [0:0]")
lines.append("# Allow web GUI (port 5000) from eth0")
lines.append("-A INPUT -i eth0 -p tcp --dport 5000 -j ACCEPT")
else:
lines.append(":PREROUTING ACCEPT [0:0]")
lines.append(":INPUT ACCEPT [0:0]")
lines.append(":OUTPUT ACCEPT [0:0]")
lines.append(":POSTROUTING ACCEPT [0:0]")
for rule_line in by_table.get(table, []):
if rule_line.strip():
lines.append(rule_line.strip())
lines.append("COMMIT")
return "\n".join(lines) + "\n"
def _seed_firewall_from_file_if_empty():
"""One-time: if DB has no rules and rules.v4 exists, import into DB."""
if iptables_list():
return
if not Path(IPTABLES_RULES).exists():
return
try:
content = Path(IPTABLES_RULES).read_text()
table = None
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("*"):
table = line[1:].strip()
continue
if line == "COMMIT":
table = None
continue
if table and line.startswith("-A "):
iptables_add(table, line, enabled=1, order_idx=0)
except Exception:
pass
@app.route("/api/firewall", methods=["GET"])
@_require_login
def api_firewall_get():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
_seed_firewall_from_file_if_empty()
rules = iptables_list()
# Normalize for JSON (sqlite3.Row -> dict with int for id)
out = []
for r in rules:
out.append({
"id": r["id"],
"table_name": r["table_name"],
"rule_line": r["rule_line"],
"enabled": bool(r["enabled"]),
"order_idx": r["order_idx"],
"created_at": r["created_at"] or "",
})
return jsonify({"rules": out, "path": IPTABLES_RULES})
@app.route("/api/firewall", methods=["POST"])
@_require_login
def api_firewall_post():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
table_name = (data.get("table_name") or "filter").strip()
rule_line = (data.get("rule_line") or "").strip()
if not rule_line:
return jsonify({"error": "rule_line required"}), 400
order_idx = int(data.get("order_idx") or 0)
try:
rid = iptables_add(table_name, rule_line, enabled=1, order_idx=order_idx)
return jsonify({"ok": True, "id": rid})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/firewall/<int:rule_id>", methods=["PUT"])
@_require_login
def api_firewall_put_id(rule_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
iptables_update(
rule_id,
table_name=data.get("table_name"),
rule_line=data.get("rule_line"),
enabled=data.get("enabled") if "enabled" in data else None,
order_idx=data.get("order_idx") if "order_idx" in data else None,
)
return jsonify({"ok": True})
@app.route("/api/firewall/<int:rule_id>", methods=["DELETE"])
@_require_login
def api_firewall_delete_id(rule_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
iptables_delete(rule_id)
return jsonify({"ok": True})
@app.route("/api/firewall/apply", methods=["POST"])
@_require_login
def api_firewall_apply():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
content = _iptables_generate_from_db()
Path(IPTABLES_RULES).parent.mkdir(parents=True, exist_ok=True)
Path(IPTABLES_RULES).write_text(content)
subprocess.run(["iptables-restore", IPTABLES_RULES], check=True, timeout=10, capture_output=True, text=True)
return jsonify({"ok": True, "message": "Rules applied"})
except subprocess.CalledProcessError as e:
return jsonify({"error": "iptables-restore failed: " + (e.stderr or str(e))}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: 5G Connection Control (support + admin) ----
@app.route("/api/5g/restart", methods=["POST"])
@_require_login
def api_5g_restart():
"""Restart 5G connection (disconnect then reconnect)."""
if not can_restart_5g(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
def do_restart():
_conn_manager.disconnect()
_conn_manager.connect()
# Run in background so API returns immediately
threading.Thread(target=do_restart, name="5g-restart", daemon=True).start()
return jsonify({"ok": True, "message": "Restart initiated"})
@app.route("/api/5g/connect", methods=["POST"])
@_require_login
def api_5g_connect():
"""Manually connect 5G."""
if not can_restart_5g(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
def do_connect():
_conn_manager.connect()
threading.Thread(target=do_connect, name="5g-connect", daemon=True).start()
return jsonify({"ok": True, "message": "Connect initiated"})
@app.route("/api/5g/disconnect", methods=["POST"])
@_require_login
def api_5g_disconnect():
"""Manually disconnect 5G."""
if not can_restart_5g(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
def do_disconnect():
_conn_manager.disconnect()
threading.Thread(target=do_disconnect, name="5g-disconnect", daemon=True).start()
return jsonify({"ok": True, "message": "Disconnect initiated"})
@app.route("/api/5g/status")
@_require_login
def api_5g_connection_status():
"""Get detailed 5G connection status."""
if not can_view_status(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
return jsonify(_conn_manager.get_full_status())
# ---- API: Speedtest (support + admin) ----
def _read_5g_router_config():
"""Read WAN_IF and FAILOVER_IF from /etc/5g-router.conf. Defaults: eth1, eth0."""
cfg = {}
if Path(CONFIG_PATH).exists():
for line in Path(CONFIG_PATH).read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
cfg[k.strip()] = v.strip().strip('"').strip("'")
return {
"wan_if": (cfg.get("WAN_IF") or "eth1").strip(),
"failover_if": (cfg.get("FAILOVER_IF") or "eth0").strip(),
}
def _get_interface_ip(iface):
"""Get first IPv4 address of interface."""
try:
out = subprocess.run(
["ip", "-4", "addr", "show", iface],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0:
return None
import re
m = re.search(r"inet\s+(\d+\.\d+\.\d+\.\d+)", out.stdout)
return m.group(1) if m else None
except Exception:
return None
def _get_public_ip(iface):
"""Get public IPv4 as seen when using this interface (curl --interface)."""
try:
out = subprocess.run(
["curl", "-s", "--max-time", "10", "--interface", iface, "https://api.ipify.org"],
capture_output=True,
text=True,
timeout=12,
)
if out.returncode == 0 and out.stdout and out.stdout.strip():
return out.stdout.strip()
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None
@app.route("/api/speedtest", methods=["POST"])
@_require_login
def api_speedtest():
if not can_view_status(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
interface = (data.get("interface") or "5g").strip().lower() # 5g | wan
net_cfg = _read_5g_router_config()
# 5g = modem WAN (WAN_IF), wan = failover/management (FAILOVER_IF)
if interface == "5g":
iface = net_cfg["wan_if"]
else:
iface = net_cfg["failover_if"]
source_ip = _get_interface_ip(iface)
# For 5G we must bind to the modem interface; otherwise traffic uses default route (WAN)
if interface == "5g" and not source_ip:
return jsonify({
"error": f"5G interface ({iface}) has no IP. Connect 5G first, then run speedtest.",
}), 503
args = [SPEEDTEST_SCRIPT, "--simple"]
if source_ip:
args.extend(["--source", source_ip])
try:
out = subprocess.run(
args,
capture_output=True,
text=True,
timeout=120,
)
raw = (out.stdout or "").strip()
err = (out.stderr or "").strip()
if out.returncode != 0:
return jsonify({"error": err or "speedtest failed", "raw": raw}), 500
# Public IP as seen from this interface
public_ip = _get_public_ip(iface)
result = {
"raw": raw,
"interface": interface,
"iface": iface,
"source_ip": source_ip,
"public_ip": public_ip,
}
for line in raw.split("\n"):
line = line.strip()
if line.startswith("Ping:"):
result["ping"] = line.replace("Ping:", "").strip()
elif line.startswith("Download:"):
result["download"] = line.replace("Download:", "").strip()
elif line.startswith("Upload:"):
result["upload"] = line.replace("Upload:", "").strip()
return jsonify(result)
except subprocess.TimeoutExpired:
return jsonify({"error": "speedtest timeout (120s)"}), 504
except FileNotFoundError:
return jsonify({"error": "speedtest-cli not installed (apk add speedtest-cli)"}), 503
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Routes (admin only, SQLite) ----
@app.route("/api/routes", methods=["GET"])
@_require_login
def api_routes_get():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
rows = routes_list()
out = []
for r in rows:
out.append({
"id": r["id"],
"destination": r["destination"] or "",
"gateway": r["gateway"] or "",
"dev": r["dev"] or "",
"metric": r["metric"],
"enabled": bool(r["enabled"]),
"created_at": r["created_at"] or "",
})
return jsonify({"routes": out})
@app.route("/api/routes/live", methods=["GET"])
@_require_login
def api_routes_live():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
out = subprocess.run(["ip", "route", "show"], capture_output=True, text=True, timeout=5)
lines = (out.stdout or "").strip().split("\n")
return jsonify({"routes": lines})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/routes", methods=["POST"])
@_require_login
def api_routes_post():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
destination = (data.get("destination") or "").strip()
if not destination:
return jsonify({"error": "destination required"}), 400
gateway = (data.get("gateway") or "").strip() or None
dev = (data.get("dev") or "").strip() or None
metric = data.get("metric")
if metric is not None:
metric = int(metric)
try:
rid = routes_add(destination, gateway=gateway, dev=dev, metric=metric)
return jsonify({"ok": True, "id": rid})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/routes/<int:route_id>", methods=["PUT"])
@_require_login
def api_routes_put_id(route_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
data = request.get_json() or {}
kwargs = {}
if "destination" in data:
kwargs["destination"] = data["destination"]
if "gateway" in data:
kwargs["gateway"] = data["gateway"]
if "dev" in data:
kwargs["dev"] = data["dev"]
if "metric" in data:
kwargs["metric"] = int(data["metric"]) if data["metric"] is not None else None
if "enabled" in data:
kwargs["enabled"] = data["enabled"]
routes_update(route_id, **kwargs)
return jsonify({"ok": True})
@app.route("/api/routes/<int:route_id>", methods=["DELETE"])
@_require_login
def api_routes_delete_id(route_id):
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
routes_delete(route_id)
return jsonify({"ok": True})
@app.route("/api/routes/apply", methods=["POST"])
@_require_login
def api_routes_apply():
if not can_access_firewall(_current_user().get("role")):
return jsonify({"error": "Forbidden"}), 403
try:
rows = [r for r in routes_list() if r.get("enabled")]
for r in rows:
dest = (r.get("destination") or "").strip()
if not dest:
continue
args = ["ip", "route", "add", dest]
if (r.get("gateway") or "").strip():
args.extend(["via", (r.get("gateway") or "").strip()])
if (r.get("dev") or "").strip():
args.extend(["dev", (r.get("dev") or "").strip()])
if r.get("metric") is not None:
args.extend(["metric", str(int(r["metric"]))])
subprocess.run(args, timeout=5, capture_output=True, text=True)
return jsonify({"ok": True, "message": "Routes applied"})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ---- API: Users (admin only) ----
@app.route("/api/users", methods=["GET"])
@_require_login
@_require_role("admin")
def api_users_list():
return jsonify({"users": list_users(), "roles": list(ROLES)})
@app.route("/api/users", methods=["PUT"])
@_require_login
@_require_role("admin")
def api_users_put():
data = request.get_json()
if not data:
return jsonify({"error": "JSON required"}), 400
action = data.get("action") # add | password | delete
if action == "add":
ok, err = add_user(
data.get("username", "").strip(),
data.get("password", ""),
data.get("role", "support"),
)
if not ok:
return jsonify({"error": err or "Failed"}), 400
return jsonify({"ok": True})
if action == "password":
username = data.get("username", "").strip()
password = data.get("password", "")
if not username or not password:
return jsonify({"error": "username and password required"}), 400
if get_user(username) is None:
return jsonify({"error": "User not found"}), 404
set_password(username, password)
return jsonify({"ok": True})
if action == "delete":
username = data.get("username", "").strip()
if not username:
return jsonify({"error": "username required"}), 400
ok, err = delete_user(username)
if not ok:
return jsonify({"error": err or "Failed"}), 400
return jsonify({"ok": True})
return jsonify({"error": "Invalid action"}), 400
# ---- Entry (default users created on first request or at import) ----
init_default_users()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)