Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

489 lines
17 KiB
Python

#!/usr/bin/env python3
"""
REST API endpoints for GNSS Guard Server
Handles validation data submission and retrieval
"""
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Header, Query
from sqlalchemy.orm import Session
from sqlalchemy import desc
from database import get_db
from models import (
Asset, ValidationHistory, AssetNotificationState,
ValidationSubmission, ValidationBatchSubmission,
ValidationResponse, AssetStatus, AssetResponse, AssetCreate, AssetWithToken,
AssetImport, AssetBatchImport
)
from routes.auth import get_current_user
from services.telegram_service import get_telegram_service
logger = logging.getLogger("gnss_guard.server.api")
router = APIRouter(prefix="/api/v1", tags=["api"])
# =============================================================================
# Asset Token Authentication Dependency
# =============================================================================
async def get_current_asset(
authorization: str = Header(..., description="Bearer token for asset authentication"),
db: Session = Depends(get_db)
) -> Asset:
"""
Dependency to authenticate asset using Bearer token.
Returns the authenticated asset or raises 401.
"""
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization header format")
token = authorization[7:] # Remove "Bearer " prefix
token_hash = Asset.hash_token(token)
asset = db.query(Asset).filter(
Asset.token_hash == token_hash,
Asset.is_active == True
).first()
if not asset:
raise HTTPException(status_code=401, detail="Invalid or inactive token")
return asset
# =============================================================================
# Validation Endpoints (Asset Authentication Required)
# =============================================================================
@router.post("/validation", status_code=201)
async def submit_validation(
data: ValidationSubmission,
asset: Asset = Depends(get_current_asset),
db: Session = Depends(get_db)
) -> dict:
"""
Submit a single validation record from an asset.
Also triggers Telegram notifications if state changed.
"""
try:
validation = ValidationHistory(
asset_id=asset.id,
validation_timestamp=data.validation_timestamp,
validation_timestamp_unix=data.validation_timestamp_unix,
is_valid=data.is_valid,
sources_missing=json.dumps(data.sources_missing),
sources_stale=json.dumps(data.sources_stale),
coordinate_differences=json.dumps(data.coordinate_differences),
source_coordinates=json.dumps(data.source_coordinates),
validation_details=json.dumps(data.validation_details),
)
db.add(validation)
db.commit()
logger.info(f"Validation received from asset '{asset.name}' at {data.validation_timestamp}")
# Process Telegram notification (will only send if state changed)
try:
telegram_service = get_telegram_service()
validation_data = {
"sources_missing": data.sources_missing,
"sources_stale": data.sources_stale,
"validation_details": data.validation_details,
"source_coordinates": data.source_coordinates,
}
telegram_service.process_validation(db, asset, validation_data)
except Exception as e:
logger.warning(f"Telegram notification error for {asset.name}: {e}")
return {
"status": "success",
"message": "Validation record saved",
"id": validation.id
}
except Exception as e:
logger.error(f"Error saving validation from {asset.name}: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@router.post("/validation/batch", status_code=201)
async def submit_validation_batch(
data: ValidationBatchSubmission,
asset: Asset = Depends(get_current_asset),
db: Session = Depends(get_db)
) -> dict:
"""
Submit multiple validation records (for catching up after offline period).
Only sends Telegram notification for the most recent record to avoid spam.
"""
try:
saved_count = 0
skipped_count = 0
latest_record = None
latest_timestamp = 0
for record in data.records:
# Check if this timestamp already exists for this asset
existing = db.query(ValidationHistory).filter(
ValidationHistory.asset_id == asset.id,
ValidationHistory.validation_timestamp_unix == record.validation_timestamp_unix
).first()
if existing:
skipped_count += 1
continue
validation = ValidationHistory(
asset_id=asset.id,
validation_timestamp=record.validation_timestamp,
validation_timestamp_unix=record.validation_timestamp_unix,
is_valid=record.is_valid,
sources_missing=json.dumps(record.sources_missing),
sources_stale=json.dumps(record.sources_stale),
coordinate_differences=json.dumps(record.coordinate_differences),
source_coordinates=json.dumps(record.source_coordinates),
validation_details=json.dumps(record.validation_details),
)
db.add(validation)
saved_count += 1
# Track the most recent record for notification
if record.validation_timestamp_unix > latest_timestamp:
latest_timestamp = record.validation_timestamp_unix
latest_record = record
db.commit()
logger.info(f"Batch validation from '{asset.name}': {saved_count} saved, {skipped_count} skipped")
# Process Telegram notification for the most recent record only
if latest_record:
try:
telegram_service = get_telegram_service()
validation_data = {
"sources_missing": latest_record.sources_missing,
"sources_stale": latest_record.sources_stale,
"validation_details": latest_record.validation_details,
"source_coordinates": latest_record.source_coordinates,
}
telegram_service.process_validation(db, asset, validation_data)
except Exception as e:
logger.warning(f"Telegram notification error for {asset.name}: {e}")
return {
"status": "success",
"saved": saved_count,
"skipped": skipped_count
}
except Exception as e:
logger.error(f"Error saving batch validation from {asset.name}: {e}")
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Read Endpoints (Session Authentication Required)
# =============================================================================
@router.get("/assets", response_model=List[AssetResponse])
async def list_assets(
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> List[AssetResponse]:
"""
List all registered assets.
Requires user session authentication.
"""
assets = db.query(Asset).filter(Asset.is_active == True).all()
return assets
@router.get("/assets/{asset_name}/status")
async def get_asset_status(
asset_name: str,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> AssetStatus:
"""
Get current status of an asset (latest validation).
Requires user session authentication.
"""
asset = db.query(Asset).filter(
Asset.name == asset_name,
Asset.is_active == True
).first()
if not asset:
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
# Get latest validation
latest = db.query(ValidationHistory).filter(
ValidationHistory.asset_id == asset.id
).order_by(desc(ValidationHistory.validation_timestamp_unix)).first()
# Get online status from notification state (consistent with Telegram alerts)
notification_state = db.query(AssetNotificationState).filter(
AssetNotificationState.asset_id == asset.id
).first()
is_online = notification_state.is_online if notification_state else False
last_seen = notification_state.last_validation_at if notification_state else None
# Fall back to validation timestamp if no notification state
if not last_seen and latest and latest.received_at:
last_seen = latest.received_at
latest_validation = None
if latest:
latest_validation = ValidationResponse(
id=latest.id,
asset_name=asset.name,
validation_timestamp=latest.validation_timestamp,
validation_timestamp_unix=latest.validation_timestamp_unix,
is_valid=latest.is_valid,
sources_missing=json.loads(latest.sources_missing or "[]"),
sources_stale=json.loads(latest.sources_stale or "[]"),
coordinate_differences=json.loads(latest.coordinate_differences or "{}"),
source_coordinates=json.loads(latest.source_coordinates or "{}"),
validation_details=json.loads(latest.validation_details or "{}"),
received_at=latest.received_at
)
return AssetStatus(
asset_name=asset.name,
is_online=is_online,
last_seen=last_seen,
latest_validation=latest_validation
)
@router.get("/assets/{asset_name}/history")
async def get_asset_history(
asset_name: str,
hours: int = Query(default=72, ge=1, le=168, description="Hours of history (max 168 = 7 days)"),
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> List[ValidationResponse]:
"""
Get validation history for an asset (default: 72 hours).
Requires user session authentication.
"""
asset = db.query(Asset).filter(
Asset.name == asset_name,
Asset.is_active == True
).first()
if not asset:
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
# Calculate cutoff timestamp
cutoff = datetime.utcnow() - timedelta(hours=hours)
cutoff_unix = cutoff.timestamp()
# Get validation history
validations = db.query(ValidationHistory).filter(
ValidationHistory.asset_id == asset.id,
ValidationHistory.validation_timestamp_unix >= cutoff_unix
).order_by(desc(ValidationHistory.validation_timestamp_unix)).all()
return [
ValidationResponse(
id=v.id,
asset_name=asset.name,
validation_timestamp=v.validation_timestamp,
validation_timestamp_unix=v.validation_timestamp_unix,
is_valid=v.is_valid,
sources_missing=json.loads(v.sources_missing or "[]"),
sources_stale=json.loads(v.sources_stale or "[]"),
coordinate_differences=json.loads(v.coordinate_differences or "{}"),
source_coordinates=json.loads(v.source_coordinates or "{}"),
validation_details=json.loads(v.validation_details or "{}"),
received_at=v.received_at
)
for v in validations
]
# =============================================================================
# Admin Endpoints (Session Authentication Required)
# =============================================================================
@router.post("/admin/assets", response_model=AssetWithToken, status_code=201)
async def create_asset(
data: AssetCreate,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> AssetWithToken:
"""
Create a new asset and return its token.
Requires user session authentication.
"""
# Check if asset already exists
existing = db.query(Asset).filter(Asset.name == data.name).first()
if existing:
raise HTTPException(status_code=400, detail=f"Asset '{data.name}' already exists")
# Generate token
token = Asset.generate_token()
token_hash = Asset.hash_token(token)
asset = Asset(
name=data.name,
token_hash=token_hash,
description=data.description,
telegram_chat_id=data.telegram_chat_id,
telegram_enabled=data.telegram_enabled
)
db.add(asset)
db.commit()
db.refresh(asset)
logger.info(f"Created new asset: {data.name}")
# Return asset with the unhashed token (only shown once!)
return AssetWithToken(
id=asset.id,
name=asset.name,
is_active=asset.is_active,
created_at=asset.created_at,
description=asset.description,
telegram_chat_id=asset.telegram_chat_id,
telegram_enabled=asset.telegram_enabled,
token=token
)
@router.delete("/admin/assets/{asset_name}")
async def deactivate_asset(
asset_name: str,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> dict:
"""
Deactivate an asset (soft delete).
Requires user session authentication.
"""
asset = db.query(Asset).filter(Asset.name == asset_name).first()
if not asset:
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
asset.is_active = False
db.commit()
logger.info(f"Deactivated asset: {asset_name}")
return {"status": "success", "message": f"Asset '{asset_name}' deactivated"}
@router.post("/admin/assets/import", response_model=AssetResponse, status_code=201)
async def import_asset(
data: AssetImport,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> AssetResponse:
"""
Import an asset with a specific token.
If asset exists, updates its token. If not, creates it.
Requires user session authentication.
"""
# Hash the provided token
token_hash = Asset.hash_token(data.token)
# Check if asset already exists
existing = db.query(Asset).filter(Asset.name == data.name).first()
if existing:
# Update existing asset's token
existing.token_hash = token_hash
existing.is_active = True
if data.description:
existing.description = data.description
if data.telegram_chat_id is not None:
existing.telegram_chat_id = data.telegram_chat_id
existing.telegram_enabled = data.telegram_enabled
db.commit()
db.refresh(existing)
logger.info(f"Updated token for existing asset: {data.name}")
return existing
else:
# Create new asset with provided token
asset = Asset(
name=data.name,
token_hash=token_hash,
description=data.description,
telegram_chat_id=data.telegram_chat_id,
telegram_enabled=data.telegram_enabled
)
db.add(asset)
db.commit()
db.refresh(asset)
logger.info(f"Imported new asset: {data.name}")
return asset
@router.post("/admin/assets/import/batch")
async def import_assets_batch(
data: AssetBatchImport,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
) -> dict:
"""
Batch import assets with specific tokens.
Creates new assets or updates existing ones.
Requires user session authentication.
"""
created = 0
updated = 0
errors = []
for asset_data in data.assets:
try:
token_hash = Asset.hash_token(asset_data.token)
existing = db.query(Asset).filter(Asset.name == asset_data.name).first()
if existing:
existing.token_hash = token_hash
existing.is_active = True
if asset_data.description:
existing.description = asset_data.description
if asset_data.telegram_chat_id is not None:
existing.telegram_chat_id = asset_data.telegram_chat_id
existing.telegram_enabled = asset_data.telegram_enabled
updated += 1
logger.info(f"Updated token for asset: {asset_data.name}")
else:
asset = Asset(
name=asset_data.name,
token_hash=token_hash,
description=asset_data.description,
telegram_chat_id=asset_data.telegram_chat_id,
telegram_enabled=asset_data.telegram_enabled
)
db.add(asset)
created += 1
logger.info(f"Created asset: {asset_data.name}")
except Exception as e:
errors.append({"name": asset_data.name, "error": str(e)})
logger.error(f"Failed to import asset {asset_data.name}: {e}")
db.commit()
return {
"status": "success",
"created": created,
"updated": updated,
"errors": errors
}