- Introduced a new public `/health` endpoint to verify database connectivity and list active admin users. - Updated `verify_admin` function to return the actual username on successful verification and handle various password hashing schemes, including legacy formats. - Modified login logic to use the returned username for session management. - Updated `login.html` form to support file uploads by adding `enctype` attribute.
133 lines
5.4 KiB
Python
133 lines
5.4 KiB
Python
import bcrypt
|
||
import hashlib
|
||
import sys
|
||
from db import get_cursor
|
||
|
||
|
||
def _verify_legacy_salt_hash(password: str, salt: str, expected: str) -> bool:
|
||
"""Try common salt:hash schemes used by other portals (same DB)."""
|
||
password_b = password.encode("utf-8")
|
||
salt_b = salt.encode("utf-8") if isinstance(salt, str) else salt
|
||
# SHA256(salt + password)
|
||
if hashlib.sha256(salt_b + password_b).hexdigest() == expected:
|
||
return True
|
||
# SHA256(password + salt)
|
||
if hashlib.sha256(password_b + salt_b).hexdigest() == expected:
|
||
return True
|
||
# MD5(salt + password)
|
||
if hashlib.md5(salt_b + password_b).hexdigest() == expected:
|
||
return True
|
||
# MD5(password + salt)
|
||
if hashlib.md5(password_b + salt_b).hexdigest() == expected:
|
||
return True
|
||
# MD5(password) only (no salt)
|
||
if salt == "" and hashlib.md5(password_b).hexdigest() == expected:
|
||
return True
|
||
# SHA1(salt + password) / SHA1(password + salt)
|
||
if hashlib.sha1(salt_b + password_b).hexdigest() == expected:
|
||
return True
|
||
if hashlib.sha1(password_b + salt_b).hexdigest() == expected:
|
||
return True
|
||
return False
|
||
|
||
|
||
def verify_admin(username: str, password: str):
|
||
"""
|
||
Verify that the user exists, is admin, is active, and password matches.
|
||
Returns the actual username (from DB) on success, None on failure.
|
||
Uses case-insensitive username lookup.
|
||
Supports: bcrypt ($2*), passlib/crypt formats, and legacy salt:hash (SHA256/MD5 variants).
|
||
"""
|
||
with get_cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT username, password_hash
|
||
FROM users
|
||
WHERE LOWER(TRIM(username)) = LOWER(TRIM(%s))
|
||
AND role = 'admin' AND is_active = TRUE
|
||
""",
|
||
(username,),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
print("login_failed: no admin user matching username", file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
# Normalize to str and strip (form/DB may add whitespace)
|
||
raw = row["password_hash"]
|
||
if raw is None:
|
||
raw = ""
|
||
if isinstance(raw, bytes):
|
||
raw = raw.decode("utf-8", errors="replace")
|
||
stored = raw.strip()
|
||
password = (password or "").strip()
|
||
|
||
try:
|
||
# FreePBX-API / shared portal format: "bcrypt:" + bcrypt hash
|
||
if stored.startswith("bcrypt:"):
|
||
hash_part = stored[7:].strip()
|
||
hash_bytes = hash_part.encode("ascii") if isinstance(hash_part, str) else hash_part
|
||
|
||
def _check(pwd: str):
|
||
return bcrypt.checkpw(pwd.encode("utf-8"), hash_bytes)
|
||
|
||
if _check(password):
|
||
return row["username"]
|
||
# Form encoding quirks: when the password was *set* (e.g. in another portal), + may have
|
||
# been decoded as space, so the stored hash is for " *1V" not "+*1V". Try both variants.
|
||
if " " in password and _check(password.replace(" ", "+")):
|
||
return row["username"]
|
||
if "+" in password and _check(password.replace("+", " ")):
|
||
return row["username"]
|
||
|
||
print("login_failed: password mismatch (bcrypt: hash)", file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
# Bcrypt: $2a$, $2b$, $2y$ (raw, no prefix)
|
||
if stored.startswith("$2"):
|
||
pwd_b = password.encode("utf-8")
|
||
stored_b = stored.encode("utf-8")
|
||
if bcrypt.checkpw(pwd_b, stored_b):
|
||
return row["username"]
|
||
if " " in password and bcrypt.checkpw(password.replace(" ", "+").encode("utf-8"), stored_b):
|
||
return row["username"]
|
||
if "+" in password and bcrypt.checkpw(password.replace("+", " ").encode("utf-8"), stored_b):
|
||
return row["username"]
|
||
print("login_failed: password mismatch for admin user", file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
# Passlib/crypt-style ($scheme$... e.g. pbkdf2-sha256, sha256_crypt, django)
|
||
if stored.startswith("$"):
|
||
try:
|
||
from passlib.hash import pbkdf2_sha256
|
||
if pbkdf2_sha256.verify(password, stored):
|
||
return row["username"]
|
||
except Exception:
|
||
pass
|
||
try:
|
||
from passlib.context import CryptContext
|
||
ctx = CryptContext(schemes=["pbkdf2_sha256", "sha256_crypt", "sha512_crypt", "bcrypt", "md5_crypt"])
|
||
if ctx.verify(password, stored):
|
||
return row["username"]
|
||
except Exception:
|
||
pass
|
||
print("login_failed: password mismatch (passlib hash)", file=sys.stderr, flush=True)
|
||
return None
|
||
|
||
# Legacy: "salt:hash" or "hash:salt" – try multiple common algorithms
|
||
if ":" in stored:
|
||
parts = stored.split(":", 1)
|
||
salt, expected = parts[0], parts[1]
|
||
if _verify_legacy_salt_hash(password, salt, expected):
|
||
return row["username"]
|
||
# Try reversed (hash:salt)
|
||
if _verify_legacy_salt_hash(password, expected, salt):
|
||
return row["username"]
|
||
print("login_failed: password mismatch (legacy hash)", file=sys.stderr, flush=True)
|
||
return None
|
||
except Exception as e:
|
||
print("login_failed: exception during verify", str(e), file=sys.stderr, flush=True)
|
||
return None
|
||
print("login_failed: unknown password hash format", file=sys.stderr, flush=True)
|
||
return None
|