Initial commit: Portal Auth Admin Dashboard
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
202
app.py
Normal file
202
app.py
Normal file
@@ -0,0 +1,202 @@
|
||||
from flask import Flask, render_template, request, redirect, url_for, session, flash
|
||||
import config
|
||||
from auth_helpers import verify_admin
|
||||
from db import get_cursor
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = config.SECRET_KEY
|
||||
|
||||
|
||||
def login_required(f):
|
||||
from functools import wraps
|
||||
@wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
if not session.get("admin_username"):
|
||||
flash("Please log in as an admin.", "error")
|
||||
return redirect(url_for("login"))
|
||||
return f(*args, **kwargs)
|
||||
return inner
|
||||
|
||||
|
||||
@app.route("/login", methods=["GET", "POST"])
|
||||
def login():
|
||||
if request.method == "GET":
|
||||
if session.get("admin_username"):
|
||||
return redirect(url_for("index"))
|
||||
return render_template("login.html")
|
||||
username = (request.form.get("username") or "").strip()
|
||||
password = request.form.get("password") or ""
|
||||
if not username or not password:
|
||||
flash("Username and password required.", "error")
|
||||
return render_template("login.html")
|
||||
if not verify_admin(username, password):
|
||||
flash("Invalid credentials or not an admin user.", "error")
|
||||
return render_template("login.html")
|
||||
session["admin_username"] = username
|
||||
session.permanent = True
|
||||
return redirect(url_for("index"))
|
||||
|
||||
|
||||
@app.route("/logout")
|
||||
def logout():
|
||||
session.pop("admin_username", None)
|
||||
flash("Logged out.", "info")
|
||||
return redirect(url_for("login"))
|
||||
|
||||
|
||||
@app.route("/")
|
||||
@login_required
|
||||
def index():
|
||||
return render_template("index.html")
|
||||
|
||||
|
||||
TABLE_COLUMNS = {
|
||||
"users": ["id", "username", "password_hash", "role", "created_at", "created_by", "is_active"],
|
||||
"sessions": ["session_id", "username", "created_at", "expires_at"],
|
||||
"auth_logs": ["id", "username", "event_type", "timestamp", "ip_address", "user_agent", "details"],
|
||||
"api_tokens": ["id", "token_hash", "token_prefix", "username", "description", "created_at", "created_by", "last_used_at", "expires_at", "is_active"],
|
||||
}
|
||||
|
||||
|
||||
@app.route("/table/<name>")
|
||||
@login_required
|
||||
def table_view(name):
|
||||
allowed = {"users", "sessions", "auth_logs", "api_tokens"}
|
||||
if name not in allowed:
|
||||
flash("Unknown table.", "error")
|
||||
return redirect(url_for("index"))
|
||||
with get_cursor() as cur:
|
||||
cur.execute(f'SELECT * FROM "{name}" ORDER BY 1 DESC LIMIT 500')
|
||||
rows = cur.fetchall()
|
||||
# Convert dict rows to list of dicts with string values for display; keep raw row for actions (e.g. session_id, id)
|
||||
data = []
|
||||
for r in rows:
|
||||
data.append({k: (str(v) if v is not None else "") for k, v in r.items()})
|
||||
# Keep original rows for action URLs (session_id, id) so we don't rely on truncated display
|
||||
raw_rows = rows
|
||||
columns = list(rows[0].keys()) if rows else TABLE_COLUMNS.get(name, [])
|
||||
return render_template("table.html", table_name=name, rows=data, raw_rows=raw_rows, columns=columns)
|
||||
|
||||
|
||||
@app.route("/table/<name>/edit/<int:pk>", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def table_edit_user(name, pk):
|
||||
if name != "users" or pk < 1:
|
||||
flash("Invalid edit target.", "error")
|
||||
return redirect(url_for("index"))
|
||||
with get_cursor() as cur:
|
||||
if request.method == "GET":
|
||||
cur.execute('SELECT id, username, role, is_active FROM users WHERE id = %s', (pk,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
flash("User not found.", "error")
|
||||
return redirect(url_for("table_view", name="users"))
|
||||
return render_template("edit_user.html", row=row)
|
||||
role = (request.form.get("role") or "support").strip()
|
||||
if role not in ("admin", "support"):
|
||||
role = "support"
|
||||
is_active = request.form.get("is_active") == "on"
|
||||
cur.execute(
|
||||
"UPDATE users SET role = %s, is_active = %s WHERE id = %s",
|
||||
(role, is_active, pk),
|
||||
)
|
||||
if cur.rowcount:
|
||||
flash("User updated.", "success")
|
||||
return redirect(url_for("table_view", name="users"))
|
||||
|
||||
|
||||
@app.route("/table/users/new", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def user_new():
|
||||
import bcrypt
|
||||
from datetime import datetime, timezone
|
||||
if request.method == "GET":
|
||||
return render_template("new_user.html")
|
||||
username = (request.form.get("username") or "").strip()
|
||||
password = request.form.get("password") or ""
|
||||
role = (request.form.get("role") or "support").strip()
|
||||
if role not in ("admin", "support"):
|
||||
role = "support"
|
||||
if not username:
|
||||
flash("Username required.", "error")
|
||||
return render_template("new_user.html")
|
||||
if len(password) < 8:
|
||||
flash("Password must be at least 8 characters.", "error")
|
||||
return render_template("new_user.html", username=username, role=role)
|
||||
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||||
now = datetime.now(timezone.utc)
|
||||
admin = session.get("admin_username") or ""
|
||||
with get_cursor() as cur:
|
||||
try:
|
||||
cur.execute(
|
||||
"""INSERT INTO users (username, password_hash, role, created_at, created_by, is_active)
|
||||
VALUES (%s, %s, %s, %s, %s, TRUE)""",
|
||||
(username, hashed, role, now, admin if admin else None),
|
||||
)
|
||||
except Exception as e:
|
||||
if "unique" in str(e).lower() or "duplicate" in str(e).lower():
|
||||
flash("Username already exists.", "error")
|
||||
else:
|
||||
flash("Could not create user.", "error")
|
||||
return render_template("new_user.html", username=username, role=role)
|
||||
flash("User created.", "success")
|
||||
return redirect(url_for("table_view", name="users"))
|
||||
|
||||
|
||||
@app.route("/table/users/password/<int:pk>", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def user_password(pk):
|
||||
import bcrypt
|
||||
with get_cursor() as cur:
|
||||
cur.execute("SELECT id, username FROM users WHERE id = %s", (pk,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
flash("User not found.", "error")
|
||||
return redirect(url_for("table_view", name="users"))
|
||||
if request.method == "GET":
|
||||
return render_template("edit_password.html", row=row)
|
||||
password = request.form.get("password") or ""
|
||||
if len(password) < 8:
|
||||
flash("Password must be at least 8 characters.", "error")
|
||||
return render_template("edit_password.html", row=row)
|
||||
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||||
cur.execute("UPDATE users SET password_hash = %s WHERE id = %s", (hashed, pk))
|
||||
flash("Password updated.", "success")
|
||||
return redirect(url_for("table_view", name="users"))
|
||||
|
||||
|
||||
@app.route("/table/sessions/delete", methods=["POST"])
|
||||
@login_required
|
||||
def session_delete():
|
||||
session_id = (request.form.get("session_id") or "").strip()
|
||||
if not session_id:
|
||||
flash("Session ID required.", "error")
|
||||
return redirect(url_for("table_view", name="sessions"))
|
||||
with get_cursor() as cur:
|
||||
cur.execute("DELETE FROM sessions WHERE session_id = %s", (session_id,))
|
||||
flash("Session deleted.", "success")
|
||||
return redirect(url_for("table_view", name="sessions"))
|
||||
|
||||
|
||||
@app.route("/table/api_tokens/toggle/<int:pk>", methods=["POST"])
|
||||
@login_required
|
||||
def api_token_toggle(pk):
|
||||
with get_cursor() as cur:
|
||||
cur.execute("UPDATE api_tokens SET is_active = NOT is_active WHERE id = %s RETURNING is_active", (pk,))
|
||||
row = cur.fetchone()
|
||||
if row:
|
||||
flash("Token " + ("activated" if row["is_active"] else "deactivated") + ".", "success")
|
||||
return redirect(url_for("table_view", name="api_tokens"))
|
||||
|
||||
|
||||
@app.route("/table/api_tokens/delete/<int:pk>", methods=["POST"])
|
||||
@login_required
|
||||
def api_token_delete(pk):
|
||||
with get_cursor() as cur:
|
||||
cur.execute("DELETE FROM api_tokens WHERE id = %s", (pk,))
|
||||
flash("Token deleted.", "success")
|
||||
return redirect(url_for("table_view", name="api_tokens"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=5000, debug=True)
|
||||
Reference in New Issue
Block a user