Files
Alpine_5G/web/db.py
nearxos 160ad641ce Add web GUI, docs, scripts, and 5G router config
- Web app (Flask): status, config, firewall, logs, users, restart
- Docs: AT commands, deploy, DNS, quickstart, web GUI
- Scripts: connect, deploy, diag, healthcheck, modem-status, speedtest, status, troubleshoot
- Init and iptables: 5g-router, 5g-webgui, rules.v4
- CHANGELOG, TODO, REVISION; config and README updates
2026-02-02 09:38:23 +02:00

233 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Alpine 5G Web GUI SQLite database.
Tables: users, iptables_rules, static_routes.
"""
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from datetime import datetime
DATA_DIR = Path(__file__).resolve().parent / "data"
DB_PATH = DATA_DIR / "alpine5g.db"
def _ensure_data_dir():
DATA_DIR.mkdir(parents=True, exist_ok=True)
def get_conn():
_ensure_data_dir()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def cursor():
conn = get_conn()
try:
cur = conn.cursor()
yield cur
conn.commit()
finally:
conn.close()
def init_db():
"""Create tables if they don't exist."""
_ensure_data_dir()
with cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('admin', 'support')),
created_at TEXT NOT NULL DEFAULT ''
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS iptables_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL DEFAULT 'filter',
rule_line TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
order_idx INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT ''
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS static_routes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
destination TEXT NOT NULL,
gateway TEXT,
dev TEXT,
metric INTEGER,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT ''
)
""")
# ---- Users ----
def user_get(username):
with cursor() as cur:
cur.execute("SELECT username, role FROM users WHERE username = ?", (username,))
row = cur.fetchone()
return dict(row) if row else None
def user_verify(username, password_hash_check_fn):
with cursor() as cur:
cur.execute("SELECT username, password_hash, role FROM users WHERE username = ?", (username,))
row = cur.fetchone()
if not row:
return None
if not password_hash_check_fn(row["password_hash"]):
return None
return {"username": row["username"], "role": row["role"]}
def user_list():
with cursor() as cur:
cur.execute("SELECT username, role FROM users ORDER BY username")
return [dict(r) for r in cur.fetchall()]
def user_add(username, password_hash, role):
try:
with cursor() as cur:
cur.execute(
"INSERT INTO users (username, password_hash, role, created_at) VALUES (?, ?, ?, ?)",
(username, password_hash, role, datetime.utcnow().isoformat()),
)
return True, None
except sqlite3.IntegrityError as e:
if "UNIQUE" in str(e):
return False, "User exists"
return False, str(e)
def user_set_password(username, password_hash):
with cursor() as cur:
cur.execute("UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username))
return cur.rowcount > 0
def user_delete(username):
with cursor() as cur:
cur.execute("DELETE FROM users WHERE username = ?", (username,))
return cur.rowcount > 0
# ---- iptables_rules ----
def iptables_list():
with cursor() as cur:
cur.execute(
"SELECT id, table_name, rule_line, enabled, order_idx, created_at FROM iptables_rules ORDER BY table_name, order_idx, id"
)
return [dict(r) for r in cur.fetchall()]
def iptables_add(table_name, rule_line, enabled=1, order_idx=0):
with cursor() as cur:
cur.execute(
"INSERT INTO iptables_rules (table_name, rule_line, enabled, order_idx, created_at) VALUES (?, ?, ?, ?, ?)",
(table_name, rule_line, 1 if enabled else 0, order_idx, datetime.utcnow().isoformat()),
)
return cur.lastrowid
def iptables_update(rule_id, table_name=None, rule_line=None, enabled=None, order_idx=None):
with cursor() as cur:
updates = []
args = []
if table_name is not None:
updates.append("table_name = ?")
args.append(table_name)
if rule_line is not None:
updates.append("rule_line = ?")
args.append(rule_line)
if enabled is not None:
updates.append("enabled = ?")
args.append(1 if enabled else 0)
if order_idx is not None:
updates.append("order_idx = ?")
args.append(order_idx)
if not updates:
return False
args.append(rule_id)
cur.execute("UPDATE iptables_rules SET " + ", ".join(updates) + " WHERE id = ?", args)
return cur.rowcount > 0
def iptables_delete(rule_id):
with cursor() as cur:
cur.execute("DELETE FROM iptables_rules WHERE id = ?", (rule_id,))
return cur.rowcount > 0
def iptables_get(rule_id):
with cursor() as cur:
cur.execute("SELECT id, table_name, rule_line, enabled, order_idx, created_at FROM iptables_rules WHERE id = ?", (rule_id,))
row = cur.fetchone()
return dict(row) if row else None
# ---- static_routes ----
def routes_list():
with cursor() as cur:
cur.execute(
"SELECT id, destination, gateway, dev, metric, enabled, created_at FROM static_routes ORDER BY id"
)
return [dict(r) for r in cur.fetchall()]
def routes_add(destination, gateway=None, dev=None, metric=None, enabled=1):
with cursor() as cur:
cur.execute(
"INSERT INTO static_routes (destination, gateway, dev, metric, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(destination, gateway or "", dev or "", metric, 1 if enabled else 0, datetime.utcnow().isoformat()),
)
return cur.lastrowid
def routes_update(route_id, destination=None, gateway=None, dev=None, metric=None, enabled=None):
with cursor() as cur:
updates = []
args = []
if destination is not None:
updates.append("destination = ?")
args.append(destination)
if gateway is not None:
updates.append("gateway = ?")
args.append(gateway)
if dev is not None:
updates.append("dev = ?")
args.append(dev)
if metric is not None:
updates.append("metric = ?")
args.append(metric)
if enabled is not None:
updates.append("enabled = ?")
args.append(1 if enabled else 0)
if not updates:
return False
args.append(route_id)
cur.execute("UPDATE static_routes SET " + ", ".join(updates) + " WHERE id = ?", args)
return cur.rowcount > 0
def routes_delete(route_id):
with cursor() as cur:
cur.execute("DELETE FROM static_routes WHERE id = ?", (route_id,))
return cur.rowcount > 0
def routes_get(route_id):
with cursor() as cur:
cur.execute("SELECT id, destination, gateway, dev, metric, enabled, created_at FROM static_routes WHERE id = ?", (route_id,))
row = cur.fetchone()
return dict(row) if row else None