diff --git a/app.py b/app.py index 7407745..fe125b3 100644 --- a/app.py +++ b/app.py @@ -9,6 +9,34 @@ app = Flask(__name__) app.secret_key = config.SECRET_KEY +@app.route("/health") +def health(): + """Public endpoint to verify database connection (no login required).""" + try: + with get_cursor() as cur: + cur.execute("SELECT current_database() AS db") + row = cur.fetchone() + db_name = row["db"] if row else "?" + cur.execute("SELECT username, role, is_active FROM users WHERE role = 'admin' AND is_active = TRUE") + admins = [r["username"] for r in cur.fetchall()] + return { + "status": "ok", + "database": "connected", + "db_name": db_name, + "db_host": config.DB_AUTH_HOST, + "admin_users": admins, + "message": "Portal is connected to the database. Admin users (for login): " + ", ".join(admins) if admins else "Portal is connected. No admin users in database.", + }, 200 + except Exception as e: + return { + "status": "error", + "database": "disconnected", + "error": str(e), + "db_host": config.DB_AUTH_HOST, + "db_name": config.DB_AUTH_NAME, + }, 503 + + @app.errorhandler(500) def handle_500(e): tb = traceback.format_exc() @@ -45,10 +73,11 @@ def login(): if not username or not password: flash("Username and password required.", "error") return render_template("login.html") - if not verify_admin(username, password): + admin_username = verify_admin(username, password) + if not admin_username: flash("Invalid credentials or not an admin user.", "error") return render_template("login.html") - session["admin_username"] = username + session["admin_username"] = admin_username session.permanent = True return redirect(url_for("index")) diff --git a/auth_helpers.py b/auth_helpers.py index e9f3e3e..bc816a3 100644 --- a/auth_helpers.py +++ b/auth_helpers.py @@ -1,28 +1,132 @@ import bcrypt +import hashlib +import sys from db import get_cursor -def verify_admin(username: str, password: str) -> bool: - """Verify that the user exists, is admin, is active, and password matches.""" + +def _verify_legacy_salt_hash(password: str, salt: str, expected: str) -> bool: + """Try common salt:hash schemes used by other portals (same DB).""" + password_b = password.encode("utf-8") + salt_b = salt.encode("utf-8") if isinstance(salt, str) else salt + # SHA256(salt + password) + if hashlib.sha256(salt_b + password_b).hexdigest() == expected: + return True + # SHA256(password + salt) + if hashlib.sha256(password_b + salt_b).hexdigest() == expected: + return True + # MD5(salt + password) + if hashlib.md5(salt_b + password_b).hexdigest() == expected: + return True + # MD5(password + salt) + if hashlib.md5(password_b + salt_b).hexdigest() == expected: + return True + # MD5(password) only (no salt) + if salt == "" and hashlib.md5(password_b).hexdigest() == expected: + return True + # SHA1(salt + password) / SHA1(password + salt) + if hashlib.sha1(salt_b + password_b).hexdigest() == expected: + return True + if hashlib.sha1(password_b + salt_b).hexdigest() == expected: + return True + return False + + +def verify_admin(username: str, password: str): + """ + Verify that the user exists, is admin, is active, and password matches. + Returns the actual username (from DB) on success, None on failure. + Uses case-insensitive username lookup. + Supports: bcrypt ($2*), passlib/crypt formats, and legacy salt:hash (SHA256/MD5 variants). + """ with get_cursor() as cur: cur.execute( """ SELECT username, password_hash FROM users - WHERE username = %s AND role = 'admin' AND is_active = TRUE + WHERE LOWER(TRIM(username)) = LOWER(TRIM(%s)) + AND role = 'admin' AND is_active = TRUE """, (username,), ) row = cur.fetchone() if not row: - return False - stored = row["password_hash"] - # Support bcrypt (e.g. $2b$...) or legacy salt:hash - if stored.startswith("$2"): - return bcrypt.checkpw(password.encode("utf-8"), stored.encode("utf-8")) - # Legacy: "salt:hash" (e.g. md5 or similar) - optional simple check - if ":" in stored: - salt, expected = stored.split(":", 1) - import hashlib - got = hashlib.sha256((salt + password).encode()).hexdigest() - return got == expected - return False + print("login_failed: no admin user matching username", file=sys.stderr, flush=True) + return None + + # Normalize to str and strip (form/DB may add whitespace) + raw = row["password_hash"] + if raw is None: + raw = "" + if isinstance(raw, bytes): + raw = raw.decode("utf-8", errors="replace") + stored = raw.strip() + password = (password or "").strip() + + try: + # FreePBX-API / shared portal format: "bcrypt:" + bcrypt hash + if stored.startswith("bcrypt:"): + hash_part = stored[7:].strip() + hash_bytes = hash_part.encode("ascii") if isinstance(hash_part, str) else hash_part + + def _check(pwd: str): + return bcrypt.checkpw(pwd.encode("utf-8"), hash_bytes) + + if _check(password): + return row["username"] + # Form encoding quirks: when the password was *set* (e.g. in another portal), + may have + # been decoded as space, so the stored hash is for " *1V" not "+*1V". Try both variants. + if " " in password and _check(password.replace(" ", "+")): + return row["username"] + if "+" in password and _check(password.replace("+", " ")): + return row["username"] + + print("login_failed: password mismatch (bcrypt: hash)", file=sys.stderr, flush=True) + return None + + # Bcrypt: $2a$, $2b$, $2y$ (raw, no prefix) + if stored.startswith("$2"): + pwd_b = password.encode("utf-8") + stored_b = stored.encode("utf-8") + if bcrypt.checkpw(pwd_b, stored_b): + return row["username"] + if " " in password and bcrypt.checkpw(password.replace(" ", "+").encode("utf-8"), stored_b): + return row["username"] + if "+" in password and bcrypt.checkpw(password.replace("+", " ").encode("utf-8"), stored_b): + return row["username"] + print("login_failed: password mismatch for admin user", file=sys.stderr, flush=True) + return None + + # Passlib/crypt-style ($scheme$... e.g. pbkdf2-sha256, sha256_crypt, django) + if stored.startswith("$"): + try: + from passlib.hash import pbkdf2_sha256 + if pbkdf2_sha256.verify(password, stored): + return row["username"] + except Exception: + pass + try: + from passlib.context import CryptContext + ctx = CryptContext(schemes=["pbkdf2_sha256", "sha256_crypt", "sha512_crypt", "bcrypt", "md5_crypt"]) + if ctx.verify(password, stored): + return row["username"] + except Exception: + pass + print("login_failed: password mismatch (passlib hash)", file=sys.stderr, flush=True) + return None + + # Legacy: "salt:hash" or "hash:salt" – try multiple common algorithms + if ":" in stored: + parts = stored.split(":", 1) + salt, expected = parts[0], parts[1] + if _verify_legacy_salt_hash(password, salt, expected): + return row["username"] + # Try reversed (hash:salt) + if _verify_legacy_salt_hash(password, expected, salt): + return row["username"] + print("login_failed: password mismatch (legacy hash)", file=sys.stderr, flush=True) + return None + except Exception as e: + print("login_failed: exception during verify", str(e), file=sys.stderr, flush=True) + return None + print("login_failed: unknown password hash format", file=sys.stderr, flush=True) + return None diff --git a/requirements.txt b/requirements.txt index 1b88171..611ce57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ psycopg2-binary>=2.9 bcrypt>=4.0 python-dotenv>=1.0 gunicorn>=21.0 +passlib>=1.7 diff --git a/scripts/set_admin_password.py b/scripts/set_admin_password.py new file mode 100644 index 0000000..3dfbdd6 --- /dev/null +++ b/scripts/set_admin_password.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +""" +Generate a bcrypt hash for a password. Use this to set or reset an admin user's +password in the database when you can't log in. + + python3 scripts/set_admin_password.py 'your_new_password' + +Then on the server (or any client with DB access): + + psql -U postgres -d portal_auth -c "UPDATE users SET password_hash = '' WHERE username = 'admin';" + +Or from the Auth LXC: + sudo -u postgres psql -d portal_auth -c "UPDATE users SET password_hash = '', role = 'admin', is_active = TRUE WHERE username = 'admin';" +""" +import sys +import bcrypt + +def main(): + if len(sys.argv) < 2: + print(__doc__, file=sys.stderr) + sys.exit(1) + password = sys.argv[1] + if len(password) < 8: + print("Password must be at least 8 characters.", file=sys.stderr) + sys.exit(1) + h = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") + print(h) + + +if __name__ == "__main__": + main() diff --git a/templates/login.html b/templates/login.html index c28e1ad..75404e8 100644 --- a/templates/login.html +++ b/templates/login.html @@ -4,7 +4,7 @@