203 lines
7.8 KiB
Python
203 lines
7.8 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, session, flash
|
|
import config
|
|
from auth_helpers import verify_admin
|
|
from db import get_cursor
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = config.SECRET_KEY
|
|
|
|
|
|
def login_required(f):
|
|
from functools import wraps
|
|
@wraps(f)
|
|
def inner(*args, **kwargs):
|
|
if not session.get("admin_username"):
|
|
flash("Please log in as an admin.", "error")
|
|
return redirect(url_for("login"))
|
|
return f(*args, **kwargs)
|
|
return inner
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "GET":
|
|
if session.get("admin_username"):
|
|
return redirect(url_for("index"))
|
|
return render_template("login.html")
|
|
username = (request.form.get("username") or "").strip()
|
|
password = request.form.get("password") or ""
|
|
if not username or not password:
|
|
flash("Username and password required.", "error")
|
|
return render_template("login.html")
|
|
if not verify_admin(username, password):
|
|
flash("Invalid credentials or not an admin user.", "error")
|
|
return render_template("login.html")
|
|
session["admin_username"] = username
|
|
session.permanent = True
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
session.pop("admin_username", None)
|
|
flash("Logged out.", "info")
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/")
|
|
@login_required
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
|
|
TABLE_COLUMNS = {
|
|
"users": ["id", "username", "password_hash", "role", "created_at", "created_by", "is_active"],
|
|
"sessions": ["session_id", "username", "created_at", "expires_at"],
|
|
"auth_logs": ["id", "username", "event_type", "timestamp", "ip_address", "user_agent", "details"],
|
|
"api_tokens": ["id", "token_hash", "token_prefix", "username", "description", "created_at", "created_by", "last_used_at", "expires_at", "is_active"],
|
|
}
|
|
|
|
|
|
@app.route("/table/<name>")
|
|
@login_required
|
|
def table_view(name):
|
|
allowed = {"users", "sessions", "auth_logs", "api_tokens"}
|
|
if name not in allowed:
|
|
flash("Unknown table.", "error")
|
|
return redirect(url_for("index"))
|
|
with get_cursor() as cur:
|
|
cur.execute(f'SELECT * FROM "{name}" ORDER BY 1 DESC LIMIT 500')
|
|
rows = cur.fetchall()
|
|
# Convert dict rows to list of dicts with string values for display; keep raw row for actions (e.g. session_id, id)
|
|
data = []
|
|
for r in rows:
|
|
data.append({k: (str(v) if v is not None else "") for k, v in r.items()})
|
|
# Keep original rows for action URLs (session_id, id) so we don't rely on truncated display
|
|
raw_rows = rows
|
|
columns = list(rows[0].keys()) if rows else TABLE_COLUMNS.get(name, [])
|
|
return render_template("table.html", table_name=name, rows=data, raw_rows=raw_rows, columns=columns)
|
|
|
|
|
|
@app.route("/table/<name>/edit/<int:pk>", methods=["GET", "POST"])
|
|
@login_required
|
|
def table_edit_user(name, pk):
|
|
if name != "users" or pk < 1:
|
|
flash("Invalid edit target.", "error")
|
|
return redirect(url_for("index"))
|
|
with get_cursor() as cur:
|
|
if request.method == "GET":
|
|
cur.execute('SELECT id, username, role, is_active FROM users WHERE id = %s', (pk,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
flash("User not found.", "error")
|
|
return redirect(url_for("table_view", name="users"))
|
|
return render_template("edit_user.html", row=row)
|
|
role = (request.form.get("role") or "support").strip()
|
|
if role not in ("admin", "support"):
|
|
role = "support"
|
|
is_active = request.form.get("is_active") == "on"
|
|
cur.execute(
|
|
"UPDATE users SET role = %s, is_active = %s WHERE id = %s",
|
|
(role, is_active, pk),
|
|
)
|
|
if cur.rowcount:
|
|
flash("User updated.", "success")
|
|
return redirect(url_for("table_view", name="users"))
|
|
|
|
|
|
@app.route("/table/users/new", methods=["GET", "POST"])
|
|
@login_required
|
|
def user_new():
|
|
import bcrypt
|
|
from datetime import datetime, timezone
|
|
if request.method == "GET":
|
|
return render_template("new_user.html")
|
|
username = (request.form.get("username") or "").strip()
|
|
password = request.form.get("password") or ""
|
|
role = (request.form.get("role") or "support").strip()
|
|
if role not in ("admin", "support"):
|
|
role = "support"
|
|
if not username:
|
|
flash("Username required.", "error")
|
|
return render_template("new_user.html")
|
|
if len(password) < 8:
|
|
flash("Password must be at least 8 characters.", "error")
|
|
return render_template("new_user.html", username=username, role=role)
|
|
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
now = datetime.now(timezone.utc)
|
|
admin = session.get("admin_username") or ""
|
|
with get_cursor() as cur:
|
|
try:
|
|
cur.execute(
|
|
"""INSERT INTO users (username, password_hash, role, created_at, created_by, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, TRUE)""",
|
|
(username, hashed, role, now, admin if admin else None),
|
|
)
|
|
except Exception as e:
|
|
if "unique" in str(e).lower() or "duplicate" in str(e).lower():
|
|
flash("Username already exists.", "error")
|
|
else:
|
|
flash("Could not create user.", "error")
|
|
return render_template("new_user.html", username=username, role=role)
|
|
flash("User created.", "success")
|
|
return redirect(url_for("table_view", name="users"))
|
|
|
|
|
|
@app.route("/table/users/password/<int:pk>", methods=["GET", "POST"])
|
|
@login_required
|
|
def user_password(pk):
|
|
import bcrypt
|
|
with get_cursor() as cur:
|
|
cur.execute("SELECT id, username FROM users WHERE id = %s", (pk,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
flash("User not found.", "error")
|
|
return redirect(url_for("table_view", name="users"))
|
|
if request.method == "GET":
|
|
return render_template("edit_password.html", row=row)
|
|
password = request.form.get("password") or ""
|
|
if len(password) < 8:
|
|
flash("Password must be at least 8 characters.", "error")
|
|
return render_template("edit_password.html", row=row)
|
|
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
cur.execute("UPDATE users SET password_hash = %s WHERE id = %s", (hashed, pk))
|
|
flash("Password updated.", "success")
|
|
return redirect(url_for("table_view", name="users"))
|
|
|
|
|
|
@app.route("/table/sessions/delete", methods=["POST"])
|
|
@login_required
|
|
def session_delete():
|
|
session_id = (request.form.get("session_id") or "").strip()
|
|
if not session_id:
|
|
flash("Session ID required.", "error")
|
|
return redirect(url_for("table_view", name="sessions"))
|
|
with get_cursor() as cur:
|
|
cur.execute("DELETE FROM sessions WHERE session_id = %s", (session_id,))
|
|
flash("Session deleted.", "success")
|
|
return redirect(url_for("table_view", name="sessions"))
|
|
|
|
|
|
@app.route("/table/api_tokens/toggle/<int:pk>", methods=["POST"])
|
|
@login_required
|
|
def api_token_toggle(pk):
|
|
with get_cursor() as cur:
|
|
cur.execute("UPDATE api_tokens SET is_active = NOT is_active WHERE id = %s RETURNING is_active", (pk,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
flash("Token " + ("activated" if row["is_active"] else "deactivated") + ".", "success")
|
|
return redirect(url_for("table_view", name="api_tokens"))
|
|
|
|
|
|
@app.route("/table/api_tokens/delete/<int:pk>", methods=["POST"])
|
|
@login_required
|
|
def api_token_delete(pk):
|
|
with get_cursor() as cur:
|
|
cur.execute("DELETE FROM api_tokens WHERE id = %s", (pk,))
|
|
flash("Token deleted.", "success")
|
|
return redirect(url_for("table_view", name="api_tokens"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=True)
|