""" Alpine 5G Router – Web GUI Login: admin/support with different permissions. Run: python app.py or gunicorn. Rev: 2 (see REVISION in repo root) Now includes integrated 5G connection management (replaces standalone 5g-router service). """ import atexit import json import logging import os import subprocess import threading from pathlib import Path from flask import ( Flask, jsonify, redirect, render_template, request, send_from_directory, session, url_for, ) from auth_manager import ( ROLES, add_user, can_access_config, can_access_firewall, can_manage_users, can_restart_5g, can_view_logs, can_view_status, delete_user, get_user, init_default_users, list_users, set_password, verify_user, ) from db import ( init_db, iptables_add, iptables_delete, iptables_list, iptables_update, routes_add, routes_delete, routes_list, routes_update, ) # Connection management (replaces 5g-router service) from at_client import ATClient from connection_manager import ConnectionConfig, ConnectionManager # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) app = Flask(__name__, static_folder="static", static_url_path="", template_folder="templates") app.secret_key = os.environ.get("SECRET_KEY", "change-me-in-production-alpine-5g") app.config["MAX_CONTENT_LENGTH"] = 512 * 1024 # 512KB max for config/log uploads CONFIG_PATH = "/etc/5g-router.conf" IPTABLES_RULES = "/etc/iptables/rules.v4" LOG_5G = "/var/log/5g-router.log" LOG_SPEEDTEST = "/var/log/speedtest-5g.log" CONNECT_SCRIPT = "/usr/local/bin/connect-5g.sh" # Kept for manual use STATUS_SCRIPT = "/usr/local/bin/status-5g.sh" MODEM_STATUS_SCRIPT = "/usr/local/bin/modem-status-at.sh" # Deprecated: using ATClient SPEEDTEST_SCRIPT = "speedtest-cli" # ---- Connection Manager (replaces 5g-router service) ---- _conn_config = ConnectionConfig.from_file(CONFIG_PATH) _at_client = ATClient(port=_conn_config.at_port) _conn_manager = ConnectionManager(_at_client, _conn_config) _startup_connect_done = False def _startup_connect(): """Auto-connect on first request (deferred startup).""" global _startup_connect_done if _startup_connect_done: return _startup_connect_done = True def do_connect(): logging.info("Auto-connecting 5G on startup...") _conn_manager.connect() if _conn_config.watchdog_interval > 0: _conn_manager.start_watchdog() # Run in background thread so startup doesn't block threading.Thread(target=do_connect, name="5g-startup", daemon=True).start() @atexit.register def _shutdown(): """Cleanup on shutdown.""" try: _conn_manager.stop_watchdog() _at_client.close() except Exception: pass def _current_user(): if not session.get("username"): return None return get_user(session["username"]) def _require_login(f): from functools import wraps @wraps(f) def inner(*args, **kwargs): if not _current_user(): if request.path.startswith("/api/"): return jsonify({"error": "Unauthorized"}), 401 return redirect(url_for("login_page")) return f(*args, **kwargs) return inner def _require_role(*allowed_roles): def decorator(f): from functools import wraps @wraps(f) def inner(*args, **kwargs): u = _current_user() if not u: return jsonify({"error": "Unauthorized"}), 401 if u.get("role") not in allowed_roles: return jsonify({"error": "Forbidden"}), 403 return f(*args, **kwargs) return inner return decorator # ---- Pages (one HTML per function) ---- @app.route("/") def index(): if _current_user(): return redirect(url_for("status_page")) return redirect(url_for("login_page")) @app.route("/login") def login_page(): if _current_user(): return redirect(url_for("status_page")) return render_template("login.html") @app.route("/logout") def logout_page(): session.clear() return redirect(url_for("login_page")) def _render_page(template, page_id, admin_only=False): user = _current_user() if not user: return redirect(url_for("login_page")) if admin_only and user.get("role") != "admin": return redirect(url_for("status_page")) # or 403 return render_template(template, user=user, page_id=page_id) @app.route("/status") @_require_login def status_page(): return _render_page("status.html", "status") @app.route("/logs") @_require_login def logs_page(): return _render_page("logs.html", "logs") @app.route("/restart") @_require_login def restart_page(): return _render_page("restart.html", "restart") @app.route("/config") @_require_login def config_page(): return _render_page("config.html", "config", admin_only=True) @app.route("/firewall") @_require_login def firewall_page(): return _render_page("firewall.html", "firewall", admin_only=True) @app.route("/routes") @_require_login def routes_page(): return _render_page("routes.html", "routes", admin_only=True) @app.route("/users") @_require_login def users_page(): return _render_page("users.html", "users", admin_only=True) @app.route("/static/") def static_files(path): return send_from_directory("static", path) # ---- API: Auth ---- @app.route("/api/login", methods=["POST"]) def api_login(): data = request.get_json() or {} username = (data.get("username") or "").strip() password = data.get("password") or "" if not username or not password: return jsonify({"error": "Username and password required"}), 400 user = verify_user(username, password) if not user: return jsonify({"error": "Invalid username or password"}), 401 session["username"] = user["username"] session["role"] = user["role"] return jsonify({"user": user}) @app.route("/api/logout", methods=["POST"]) def api_logout(): session.clear() return jsonify({"ok": True}) @app.route("/api/me") @_require_login def api_me(): u = _current_user() return jsonify({"user": u}) # ---- API: Status (support + admin) ---- @app.route("/api/status") @_require_login def api_status(): if not can_view_status(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 # Trigger auto-connect on first status request (deferred startup) _startup_connect() try: # Get connection and modem status from ConnectionManager (native Python) full_status = _conn_manager.get_full_status() # Also include basic system status from status script for interface info data = {} try: out = subprocess.run( [STATUS_SCRIPT, "--json"], capture_output=True, text=True, timeout=10, ) if out.returncode == 0: data = json.loads(out.stdout) except Exception: pass # Merge connection manager status data["connection"] = full_status.get("connection", {}) data["modem"] = full_status.get("modem", {}) return jsonify(data) except Exception as e: return jsonify({"error": str(e)}), 500 # ---- API: Config (admin only) ---- def _read_config(): if not Path(CONFIG_PATH).exists(): return {} text = Path(CONFIG_PATH).read_text() # Simple key="value" or KEY="value" parsing cfg = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue if "=" in line: k, _, v = line.partition("=") k = k.strip() v = v.strip().strip('"').strip("'") cfg[k] = v return cfg def _write_config(cfg): lines = [] for k, v in cfg.items(): if " " in str(v) or '"' in str(v): v = json.dumps(str(v)) else: v = f'"{v}"' lines.append(f'{k}={v}') Path(CONFIG_PATH).write_text("\n".join(lines) + "\n") @app.route("/api/config", methods=["GET"]) @_require_login def api_config_get(): if not can_access_config(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 return jsonify(_read_config()) @app.route("/api/config", methods=["PUT"]) @_require_login def api_config_put(): if not can_access_config(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 data = request.get_json() if not data: return jsonify({"error": "JSON body required"}), 400 try: _write_config(data) return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 # ---- API: Logs (support + admin) ---- @app.route("/api/logs") @_require_login def api_logs(): if not can_view_logs(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 which = request.args.get("which", "5g") # 5g | speedtest path = LOG_5G if which == "5g" else LOG_SPEEDTEST tail = int(request.args.get("tail", 200)) if tail > 2000: tail = 2000 if not Path(path).exists(): return jsonify({"lines": [], "file": path}) try: lines = subprocess.run( ["tail", "-n", str(tail), path], capture_output=True, text=True, timeout=5, ) return jsonify({"lines": (lines.stdout or "").strip().split("\n"), "file": path}) except Exception as e: return jsonify({"error": str(e)}), 500 # ---- API: Firewall (admin only, SQLite) ---- def _iptables_generate_from_db(): """Generate iptables-restore file content from DB rules.""" rules = iptables_list() by_table = {} for r in rules: if not r.get("enabled"): continue t = r.get("table_name") or "filter" if t not in by_table: by_table[t] = [] by_table[t].append(r.get("rule_line") or "") lines = [] for table in ("filter", "nat"): lines.append("*" + table) if table == "filter": lines.append(":INPUT ACCEPT [0:0]") lines.append(":FORWARD ACCEPT [0:0]") lines.append(":OUTPUT ACCEPT [0:0]") lines.append("# Allow web GUI (port 5000) from eth0") lines.append("-A INPUT -i eth0 -p tcp --dport 5000 -j ACCEPT") else: lines.append(":PREROUTING ACCEPT [0:0]") lines.append(":INPUT ACCEPT [0:0]") lines.append(":OUTPUT ACCEPT [0:0]") lines.append(":POSTROUTING ACCEPT [0:0]") for rule_line in by_table.get(table, []): if rule_line.strip(): lines.append(rule_line.strip()) lines.append("COMMIT") return "\n".join(lines) + "\n" def _seed_firewall_from_file_if_empty(): """One-time: if DB has no rules and rules.v4 exists, import into DB.""" if iptables_list(): return if not Path(IPTABLES_RULES).exists(): return try: content = Path(IPTABLES_RULES).read_text() table = None for line in content.splitlines(): line = line.strip() if not line or line.startswith("#"): continue if line.startswith("*"): table = line[1:].strip() continue if line == "COMMIT": table = None continue if table and line.startswith("-A "): iptables_add(table, line, enabled=1, order_idx=0) except Exception: pass @app.route("/api/firewall", methods=["GET"]) @_require_login def api_firewall_get(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 _seed_firewall_from_file_if_empty() rules = iptables_list() # Normalize for JSON (sqlite3.Row -> dict with int for id) out = [] for r in rules: out.append({ "id": r["id"], "table_name": r["table_name"], "rule_line": r["rule_line"], "enabled": bool(r["enabled"]), "order_idx": r["order_idx"], "created_at": r["created_at"] or "", }) return jsonify({"rules": out, "path": IPTABLES_RULES}) @app.route("/api/firewall", methods=["POST"]) @_require_login def api_firewall_post(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 data = request.get_json() or {} table_name = (data.get("table_name") or "filter").strip() rule_line = (data.get("rule_line") or "").strip() if not rule_line: return jsonify({"error": "rule_line required"}), 400 order_idx = int(data.get("order_idx") or 0) try: rid = iptables_add(table_name, rule_line, enabled=1, order_idx=order_idx) return jsonify({"ok": True, "id": rid}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/firewall/", methods=["PUT"]) @_require_login def api_firewall_put_id(rule_id): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 data = request.get_json() or {} iptables_update( rule_id, table_name=data.get("table_name"), rule_line=data.get("rule_line"), enabled=data.get("enabled") if "enabled" in data else None, order_idx=data.get("order_idx") if "order_idx" in data else None, ) return jsonify({"ok": True}) @app.route("/api/firewall/", methods=["DELETE"]) @_require_login def api_firewall_delete_id(rule_id): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 iptables_delete(rule_id) return jsonify({"ok": True}) @app.route("/api/firewall/apply", methods=["POST"]) @_require_login def api_firewall_apply(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 try: content = _iptables_generate_from_db() Path(IPTABLES_RULES).parent.mkdir(parents=True, exist_ok=True) Path(IPTABLES_RULES).write_text(content) subprocess.run(["iptables-restore", IPTABLES_RULES], check=True, timeout=10, capture_output=True, text=True) return jsonify({"ok": True, "message": "Rules applied"}) except subprocess.CalledProcessError as e: return jsonify({"error": "iptables-restore failed: " + (e.stderr or str(e))}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 # ---- API: 5G Connection Control (support + admin) ---- @app.route("/api/5g/restart", methods=["POST"]) @_require_login def api_5g_restart(): """Restart 5G connection (disconnect then reconnect).""" if not can_restart_5g(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 def do_restart(): _conn_manager.disconnect() _conn_manager.connect() # Run in background so API returns immediately threading.Thread(target=do_restart, name="5g-restart", daemon=True).start() return jsonify({"ok": True, "message": "Restart initiated"}) @app.route("/api/5g/connect", methods=["POST"]) @_require_login def api_5g_connect(): """Manually connect 5G.""" if not can_restart_5g(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 def do_connect(): _conn_manager.connect() threading.Thread(target=do_connect, name="5g-connect", daemon=True).start() return jsonify({"ok": True, "message": "Connect initiated"}) @app.route("/api/5g/disconnect", methods=["POST"]) @_require_login def api_5g_disconnect(): """Manually disconnect 5G.""" if not can_restart_5g(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 def do_disconnect(): _conn_manager.disconnect() threading.Thread(target=do_disconnect, name="5g-disconnect", daemon=True).start() return jsonify({"ok": True, "message": "Disconnect initiated"}) @app.route("/api/5g/status") @_require_login def api_5g_connection_status(): """Get detailed 5G connection status.""" if not can_view_status(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 return jsonify(_conn_manager.get_full_status()) # ---- API: Speedtest (support + admin) ---- def _read_5g_router_config(): """Read WAN_IF and FAILOVER_IF from /etc/5g-router.conf. Defaults: eth1, eth0.""" cfg = {} if Path(CONFIG_PATH).exists(): for line in Path(CONFIG_PATH).read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") cfg[k.strip()] = v.strip().strip('"').strip("'") return { "wan_if": (cfg.get("WAN_IF") or "eth1").strip(), "failover_if": (cfg.get("FAILOVER_IF") or "eth0").strip(), } def _get_interface_ip(iface): """Get first IPv4 address of interface.""" try: out = subprocess.run( ["ip", "-4", "addr", "show", iface], capture_output=True, text=True, timeout=5, ) if out.returncode != 0: return None import re m = re.search(r"inet\s+(\d+\.\d+\.\d+\.\d+)", out.stdout) return m.group(1) if m else None except Exception: return None def _get_public_ip(iface): """Get public IPv4 as seen when using this interface (curl --interface).""" try: out = subprocess.run( ["curl", "-s", "--max-time", "10", "--interface", iface, "https://api.ipify.org"], capture_output=True, text=True, timeout=12, ) if out.returncode == 0 and out.stdout and out.stdout.strip(): return out.stdout.strip() except (FileNotFoundError, subprocess.TimeoutExpired): pass return None @app.route("/api/speedtest", methods=["POST"]) @_require_login def api_speedtest(): if not can_view_status(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 data = request.get_json() or {} interface = (data.get("interface") or "5g").strip().lower() # 5g | wan net_cfg = _read_5g_router_config() # 5g = modem WAN (WAN_IF), wan = failover/management (FAILOVER_IF) if interface == "5g": iface = net_cfg["wan_if"] else: iface = net_cfg["failover_if"] source_ip = _get_interface_ip(iface) # For 5G we must bind to the modem interface; otherwise traffic uses default route (WAN) if interface == "5g" and not source_ip: return jsonify({ "error": f"5G interface ({iface}) has no IP. Connect 5G first, then run speedtest.", }), 503 args = [SPEEDTEST_SCRIPT, "--simple"] if source_ip: args.extend(["--source", source_ip]) try: out = subprocess.run( args, capture_output=True, text=True, timeout=120, ) raw = (out.stdout or "").strip() err = (out.stderr or "").strip() if out.returncode != 0: return jsonify({"error": err or "speedtest failed", "raw": raw}), 500 # Public IP as seen from this interface public_ip = _get_public_ip(iface) result = { "raw": raw, "interface": interface, "iface": iface, "source_ip": source_ip, "public_ip": public_ip, } for line in raw.split("\n"): line = line.strip() if line.startswith("Ping:"): result["ping"] = line.replace("Ping:", "").strip() elif line.startswith("Download:"): result["download"] = line.replace("Download:", "").strip() elif line.startswith("Upload:"): result["upload"] = line.replace("Upload:", "").strip() return jsonify(result) except subprocess.TimeoutExpired: return jsonify({"error": "speedtest timeout (120s)"}), 504 except FileNotFoundError: return jsonify({"error": "speedtest-cli not installed (apk add speedtest-cli)"}), 503 except Exception as e: return jsonify({"error": str(e)}), 500 # ---- API: Routes (admin only, SQLite) ---- @app.route("/api/routes", methods=["GET"]) @_require_login def api_routes_get(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 rows = routes_list() out = [] for r in rows: out.append({ "id": r["id"], "destination": r["destination"] or "", "gateway": r["gateway"] or "", "dev": r["dev"] or "", "metric": r["metric"], "enabled": bool(r["enabled"]), "created_at": r["created_at"] or "", }) return jsonify({"routes": out}) @app.route("/api/routes/live", methods=["GET"]) @_require_login def api_routes_live(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 try: out = subprocess.run(["ip", "route", "show"], capture_output=True, text=True, timeout=5) lines = (out.stdout or "").strip().split("\n") return jsonify({"routes": lines}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/routes", methods=["POST"]) @_require_login def api_routes_post(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 data = request.get_json() or {} destination = (data.get("destination") or "").strip() if not destination: return jsonify({"error": "destination required"}), 400 gateway = (data.get("gateway") or "").strip() or None dev = (data.get("dev") or "").strip() or None metric = data.get("metric") if metric is not None: metric = int(metric) try: rid = routes_add(destination, gateway=gateway, dev=dev, metric=metric) return jsonify({"ok": True, "id": rid}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/routes/", methods=["PUT"]) @_require_login def api_routes_put_id(route_id): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 data = request.get_json() or {} kwargs = {} if "destination" in data: kwargs["destination"] = data["destination"] if "gateway" in data: kwargs["gateway"] = data["gateway"] if "dev" in data: kwargs["dev"] = data["dev"] if "metric" in data: kwargs["metric"] = int(data["metric"]) if data["metric"] is not None else None if "enabled" in data: kwargs["enabled"] = data["enabled"] routes_update(route_id, **kwargs) return jsonify({"ok": True}) @app.route("/api/routes/", methods=["DELETE"]) @_require_login def api_routes_delete_id(route_id): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 routes_delete(route_id) return jsonify({"ok": True}) @app.route("/api/routes/apply", methods=["POST"]) @_require_login def api_routes_apply(): if not can_access_firewall(_current_user().get("role")): return jsonify({"error": "Forbidden"}), 403 try: rows = [r for r in routes_list() if r.get("enabled")] for r in rows: dest = (r.get("destination") or "").strip() if not dest: continue args = ["ip", "route", "add", dest] if (r.get("gateway") or "").strip(): args.extend(["via", (r.get("gateway") or "").strip()]) if (r.get("dev") or "").strip(): args.extend(["dev", (r.get("dev") or "").strip()]) if r.get("metric") is not None: args.extend(["metric", str(int(r["metric"]))]) subprocess.run(args, timeout=5, capture_output=True, text=True) return jsonify({"ok": True, "message": "Routes applied"}) except Exception as e: return jsonify({"error": str(e)}), 500 # ---- API: Users (admin only) ---- @app.route("/api/users", methods=["GET"]) @_require_login @_require_role("admin") def api_users_list(): return jsonify({"users": list_users(), "roles": list(ROLES)}) @app.route("/api/users", methods=["PUT"]) @_require_login @_require_role("admin") def api_users_put(): data = request.get_json() if not data: return jsonify({"error": "JSON required"}), 400 action = data.get("action") # add | password | delete if action == "add": ok, err = add_user( data.get("username", "").strip(), data.get("password", ""), data.get("role", "support"), ) if not ok: return jsonify({"error": err or "Failed"}), 400 return jsonify({"ok": True}) if action == "password": username = data.get("username", "").strip() password = data.get("password", "") if not username or not password: return jsonify({"error": "username and password required"}), 400 if get_user(username) is None: return jsonify({"error": "User not found"}), 404 set_password(username, password) return jsonify({"ok": True}) if action == "delete": username = data.get("username", "").strip() if not username: return jsonify({"error": "username required"}), 400 ok, err = delete_user(username) if not ok: return jsonify({"error": err or "Failed"}), 400 return jsonify({"ok": True}) return jsonify({"error": "Invalid action"}), 400 # ---- Entry (default users created on first request or at import) ---- init_default_users() if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=False)