""" Alpine 5G Web GUI – SQLite database. Tables: users, iptables_rules, static_routes. """ import sqlite3 from contextlib import contextmanager from pathlib import Path from datetime import datetime DATA_DIR = Path(__file__).resolve().parent / "data" DB_PATH = DATA_DIR / "alpine5g.db" def _ensure_data_dir(): DATA_DIR.mkdir(parents=True, exist_ok=True) def get_conn(): _ensure_data_dir() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn @contextmanager def cursor(): conn = get_conn() try: cur = conn.cursor() yield cur conn.commit() finally: conn.close() def init_db(): """Create tables if they don't exist.""" _ensure_data_dir() with cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL CHECK(role IN ('admin', 'support')), created_at TEXT NOT NULL DEFAULT '' ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS iptables_rules ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL DEFAULT 'filter', rule_line TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, order_idx INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT '' ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS static_routes ( id INTEGER PRIMARY KEY AUTOINCREMENT, destination TEXT NOT NULL, gateway TEXT, dev TEXT, metric INTEGER, enabled INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT '' ) """) # ---- Users ---- def user_get(username): with cursor() as cur: cur.execute("SELECT username, role FROM users WHERE username = ?", (username,)) row = cur.fetchone() return dict(row) if row else None def user_verify(username, password_hash_check_fn): with cursor() as cur: cur.execute("SELECT username, password_hash, role FROM users WHERE username = ?", (username,)) row = cur.fetchone() if not row: return None if not password_hash_check_fn(row["password_hash"]): return None return {"username": row["username"], "role": row["role"]} def user_list(): with cursor() as cur: cur.execute("SELECT username, role FROM users ORDER BY username") return [dict(r) for r in cur.fetchall()] def user_add(username, password_hash, role): try: with cursor() as cur: cur.execute( "INSERT INTO users (username, password_hash, role, created_at) VALUES (?, ?, ?, ?)", (username, password_hash, role, datetime.utcnow().isoformat()), ) return True, None except sqlite3.IntegrityError as e: if "UNIQUE" in str(e): return False, "User exists" return False, str(e) def user_set_password(username, password_hash): with cursor() as cur: cur.execute("UPDATE users SET password_hash = ? WHERE username = ?", (password_hash, username)) return cur.rowcount > 0 def user_delete(username): with cursor() as cur: cur.execute("DELETE FROM users WHERE username = ?", (username,)) return cur.rowcount > 0 # ---- iptables_rules ---- def iptables_list(): with cursor() as cur: cur.execute( "SELECT id, table_name, rule_line, enabled, order_idx, created_at FROM iptables_rules ORDER BY table_name, order_idx, id" ) return [dict(r) for r in cur.fetchall()] def iptables_add(table_name, rule_line, enabled=1, order_idx=0): with cursor() as cur: cur.execute( "INSERT INTO iptables_rules (table_name, rule_line, enabled, order_idx, created_at) VALUES (?, ?, ?, ?, ?)", (table_name, rule_line, 1 if enabled else 0, order_idx, datetime.utcnow().isoformat()), ) return cur.lastrowid def iptables_update(rule_id, table_name=None, rule_line=None, enabled=None, order_idx=None): with cursor() as cur: updates = [] args = [] if table_name is not None: updates.append("table_name = ?") args.append(table_name) if rule_line is not None: updates.append("rule_line = ?") args.append(rule_line) if enabled is not None: updates.append("enabled = ?") args.append(1 if enabled else 0) if order_idx is not None: updates.append("order_idx = ?") args.append(order_idx) if not updates: return False args.append(rule_id) cur.execute("UPDATE iptables_rules SET " + ", ".join(updates) + " WHERE id = ?", args) return cur.rowcount > 0 def iptables_delete(rule_id): with cursor() as cur: cur.execute("DELETE FROM iptables_rules WHERE id = ?", (rule_id,)) return cur.rowcount > 0 def iptables_get(rule_id): with cursor() as cur: cur.execute("SELECT id, table_name, rule_line, enabled, order_idx, created_at FROM iptables_rules WHERE id = ?", (rule_id,)) row = cur.fetchone() return dict(row) if row else None # ---- static_routes ---- def routes_list(): with cursor() as cur: cur.execute( "SELECT id, destination, gateway, dev, metric, enabled, created_at FROM static_routes ORDER BY id" ) return [dict(r) for r in cur.fetchall()] def routes_add(destination, gateway=None, dev=None, metric=None, enabled=1): with cursor() as cur: cur.execute( "INSERT INTO static_routes (destination, gateway, dev, metric, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?)", (destination, gateway or "", dev or "", metric, 1 if enabled else 0, datetime.utcnow().isoformat()), ) return cur.lastrowid def routes_update(route_id, destination=None, gateway=None, dev=None, metric=None, enabled=None): with cursor() as cur: updates = [] args = [] if destination is not None: updates.append("destination = ?") args.append(destination) if gateway is not None: updates.append("gateway = ?") args.append(gateway) if dev is not None: updates.append("dev = ?") args.append(dev) if metric is not None: updates.append("metric = ?") args.append(metric) if enabled is not None: updates.append("enabled = ?") args.append(1 if enabled else 0) if not updates: return False args.append(route_id) cur.execute("UPDATE static_routes SET " + ", ".join(updates) + " WHERE id = ?", args) return cur.rowcount > 0 def routes_delete(route_id): with cursor() as cur: cur.execute("DELETE FROM static_routes WHERE id = ?", (route_id,)) return cur.rowcount > 0 def routes_get(route_id): with cursor() as cur: cur.execute("SELECT id, destination, gateway, dev, metric, enabled, created_at FROM static_routes WHERE id = ?", (route_id,)) row = cur.fetchone() return dict(row) if row else None