Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

409 lines
14 KiB
Python

#!/usr/bin/env python3
"""
FastAPI main application for GNSS Guard Server
Centralized monitoring server for multiple GNSS Guard assets
"""
import asyncio
import logging
import json
import random
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from config import get_config
from database import init_db, get_db, get_session_factory
from routes import api, auth
from routes.auth import get_optional_user, get_current_user
from services.asset_service import AssetService
from services.telegram_service import get_telegram_service
from models import Asset, AssetNotificationState
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("gnss_guard.server")
# Create FastAPI app
app = FastAPI(
title="GNSS Guard Server",
description="Centralized monitoring server for GNSS Guard assets",
version="1.0.0"
)
# Setup rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Add CORS middleware - restricted to same-origin only
# Since the dashboard is served from the same domain, we only need
# to allow requests from the same origin. This prevents CSRF attacks.
config = get_config()
allowed_origins = []
if config.server_domain:
allowed_origins = [
f"https://{config.server_domain}",
f"http://{config.server_domain}", # For initial setup before SSL
]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["Content-Type", "Authorization", "Cookie"],
)
# Setup static files and templates
static_path = Path(__file__).parent / "static"
templates_path = Path(__file__).parent / "templates"
if static_path.exists():
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
templates = Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None
# Include routers
app.include_router(api.router)
app.include_router(auth.router)
# =============================================================================
# Health Check Endpoint (public, no auth required)
# =============================================================================
@app.get("/health")
async def health_check():
"""Health check endpoint - always accessible"""
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
async def check_offline_assets():
"""Background task to check for assets that have gone offline"""
config = get_config()
telegram_service = get_telegram_service()
if not telegram_service.enabled:
return
threshold = datetime.utcnow() - timedelta(seconds=config.asset_offline_seconds)
SessionLocal = get_session_factory()
db = SessionLocal()
try:
# Find assets that are marked online but haven't reported recently
states = db.query(AssetNotificationState).join(Asset).filter(
AssetNotificationState.is_online == True,
AssetNotificationState.last_validation_at != None,
AssetNotificationState.last_validation_at < threshold,
Asset.is_active == True,
Asset.telegram_enabled == True
).all()
for state in states:
chat_id = state.asset.telegram_chat_id or telegram_service.default_chat_id
if chat_id:
logger.info(f"Asset '{state.asset.name}' detected as offline (last seen: {state.last_validation_at})")
telegram_service.send_asset_offline_alert(
chat_id=chat_id,
asset_name=state.asset.name,
last_seen=state.last_validation_at,
offline_threshold_seconds=config.asset_offline_seconds
)
state.is_online = False
if states:
db.commit()
except Exception as e:
logger.error(f"Error checking offline assets: {e}")
db.rollback()
finally:
db.close()
async def offline_checker_loop():
"""Background loop that periodically checks for offline assets"""
while True:
await asyncio.sleep(30) # Check every 30 seconds
try:
await check_offline_assets()
except Exception as e:
logger.error(f"Error in offline checker loop: {e}")
@app.on_event("startup")
async def startup_event():
"""Initialize database and background tasks on startup"""
logger.info("Starting GNSS Guard Server...")
init_db()
logger.info("Database initialized")
# Start background task for offline detection
asyncio.create_task(offline_checker_loop())
logger.info("Offline asset checker started")
# =============================================================================
# Web UI Routes
# =============================================================================
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, user: Optional[str] = Depends(get_optional_user)):
"""Main dashboard page"""
if not user:
return RedirectResponse(url="/login", status_code=302)
if not templates:
return HTMLResponse("<h1>GNSS Guard Server</h1><p>Templates not configured</p>")
return templates.TemplateResponse("dashboard.html", {
"request": request,
"username": user,
"cache_buster": random.randint(100000, 999999)
})
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, user: Optional[str] = Depends(get_optional_user)):
"""Login page"""
if user:
return RedirectResponse(url="/", status_code=302)
if not templates:
return HTMLResponse("""
<h1>GNSS Guard Server - Login</h1>
<form method="post" action="/login">
<input name="username" placeholder="Username"><br>
<input name="password" type="password" placeholder="Password"><br>
<button type="submit">Login</button>
</form>
""")
return templates.TemplateResponse("login.html", {
"request": request,
"cache_buster": random.randint(100000, 999999)
})
@app.get("/api/dashboard/assets")
async def dashboard_assets(
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get all assets status for dashboard"""
service = AssetService(db)
return service.get_all_assets_status()
@app.get("/api/dashboard/asset/{asset_name}/status")
async def dashboard_asset_status(
asset_name: str,
at: Optional[float] = None,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get detailed status for a specific asset (for dashboard display).
Matches the format expected by the client dashboard.
Args:
at: Optional Unix timestamp to get historical data at that time.
If not provided, returns the latest data.
"""
service = AssetService(db)
asset = service.get_asset_by_name(asset_name)
if not asset:
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
if at is not None:
# Get historical validation at specified timestamp
latest = service.get_validation_at_timestamp(asset.id, at)
else:
latest = service.get_latest_validation(asset.id)
if not latest:
return {
"error": "No validation data available",
"timestamp": datetime.utcnow().isoformat()
}
# Parse JSON fields
sources_missing = json.loads(latest.sources_missing or "[]")
sources_stale = json.loads(latest.sources_stale or "[]")
coordinate_differences = json.loads(latest.coordinate_differences or "{}")
source_coordinates = json.loads(latest.source_coordinates or "{}")
validation_details = json.loads(latest.validation_details or "{}")
# Get enabled sources from validation_details
expected_sources = validation_details.get("expected_sources", [])
# Build sources status (matching client format)
source_display_names = {
"nmea_primary": "Primary GPS",
"nmea_secondary": "Secondary GPS",
"tm_ais": "TM AIS GPS",
"starlink_gps": "Starlink GPS",
"starlink_location": "Starlink Location"
}
sources = {}
all_source_names = ["nmea_primary", "nmea_secondary", "tm_ais", "starlink_gps", "starlink_location"]
for source_name in all_source_names:
display_name = source_display_names.get(source_name, source_name)
if source_name not in expected_sources:
sources[source_name] = {
"display_name": display_name,
"enabled": False,
"status": "not_configured",
"is_stale": False,
"coordinates": None,
"last_update": None,
"last_update_unix": None
}
continue
source_data = source_coordinates.get(source_name)
is_stale = source_name in sources_stale
if not source_data:
sources[source_name] = {
"display_name": display_name,
"enabled": True,
"status": "missing",
"is_stale": is_stale,
"coordinates": None,
"last_update": None,
"last_update_unix": None
}
else:
status = "stale" if is_stale else "ok"
sources[source_name] = {
"display_name": display_name,
"enabled": True,
"status": status,
"is_stale": is_stale,
"coordinates": {
"latitude": source_data.get("latitude"),
"longitude": source_data.get("longitude")
},
"last_update": source_data.get("timestamp"),
"last_update_unix": source_data.get("timestamp_unix")
}
# Calculate max distance
threshold_meters = validation_details.get("threshold_meters", 200.0)
max_distance_km = None
max_distance_m = 0.0
if not latest.is_valid and coordinate_differences:
for diff_data in coordinate_differences.values():
if isinstance(diff_data, dict):
distance = diff_data.get("distance_meters", diff_data.get("distance_m", 0))
if distance > max_distance_m:
max_distance_m = distance
if max_distance_m > threshold_meters:
max_distance_km = max_distance_m / 1000.0
has_alert = (not latest.is_valid and max_distance_km is not None) or len(sources_missing) > 0
# Find map center
map_center = None
for priority_source in ["nmea_primary", "tm_ais", "starlink_location"]:
if sources.get(priority_source, {}).get("coordinates"):
coords = sources[priority_source]["coordinates"]
if coords.get("latitude") and coords.get("longitude"):
map_center = coords
break
if not map_center:
for source_data in sources.values():
if source_data.get("coordinates"):
coords = source_data["coordinates"]
if coords.get("latitude") and coords.get("longitude"):
map_center = coords
break
return {
"timestamp": datetime.utcnow().isoformat(),
"validation_timestamp": latest.validation_timestamp,
"validation_timestamp_unix": latest.validation_timestamp_unix,
"is_valid": latest.is_valid,
"has_alert": has_alert,
"max_distance_km": max_distance_km,
"threshold_meters": threshold_meters,
"sources": sources,
"sources_stale": sources_stale,
"map_center": map_center,
"asset_name": asset_name
}
@app.get("/api/dashboard/asset/{asset_name}/route")
async def dashboard_asset_route(
asset_name: str,
hours: int = 72,
until: Optional[float] = None,
user: str = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get route data for map visualization.
Args:
hours: Number of hours of history (default 72)
until: Optional Unix timestamp to show route up to this time.
If not provided, shows route up to current time.
"""
service = AssetService(db)
asset = service.get_asset_by_name(asset_name)
if not asset:
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
return service.get_route_data(asset.id, hours, until_timestamp=until)
# =============================================================================
# Main entry point
# =============================================================================
def run_server():
"""Run the server using uvicorn"""
import uvicorn
config = get_config()
uvicorn.run(
"server.main:app",
host=config.server_host,
port=config.server_port,
reload=config.debug,
log_level="info"
)
if __name__ == "__main__":
run_server()