<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
409 lines
14 KiB
Python
409 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
FastAPI main application for GNSS Guard Server
|
|
Centralized monitoring server for multiple GNSS Guard assets
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
import random
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, Request, Depends, HTTPException
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy.orm import Session
|
|
from slowapi import Limiter, _rate_limit_exceeded_handler
|
|
from slowapi.util import get_remote_address
|
|
from slowapi.errors import RateLimitExceeded
|
|
|
|
from config import get_config
|
|
from database import init_db, get_db, get_session_factory
|
|
from routes import api, auth
|
|
from routes.auth import get_optional_user, get_current_user
|
|
from services.asset_service import AssetService
|
|
from services.telegram_service import get_telegram_service
|
|
from models import Asset, AssetNotificationState
|
|
|
|
# Initialize rate limiter
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
logger = logging.getLogger("gnss_guard.server")
|
|
|
|
# Create FastAPI app
|
|
app = FastAPI(
|
|
title="GNSS Guard Server",
|
|
description="Centralized monitoring server for GNSS Guard assets",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# Setup rate limiting
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
|
|
|
# Add CORS middleware - restricted to same-origin only
|
|
# Since the dashboard is served from the same domain, we only need
|
|
# to allow requests from the same origin. This prevents CSRF attacks.
|
|
config = get_config()
|
|
allowed_origins = []
|
|
if config.server_domain:
|
|
allowed_origins = [
|
|
f"https://{config.server_domain}",
|
|
f"http://{config.server_domain}", # For initial setup before SSL
|
|
]
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=allowed_origins,
|
|
allow_credentials=True,
|
|
allow_methods=["GET", "POST", "DELETE"],
|
|
allow_headers=["Content-Type", "Authorization", "Cookie"],
|
|
)
|
|
|
|
# Setup static files and templates
|
|
static_path = Path(__file__).parent / "static"
|
|
templates_path = Path(__file__).parent / "templates"
|
|
|
|
if static_path.exists():
|
|
app.mount("/static", StaticFiles(directory=str(static_path)), name="static")
|
|
|
|
templates = Jinja2Templates(directory=str(templates_path)) if templates_path.exists() else None
|
|
|
|
# Include routers
|
|
app.include_router(api.router)
|
|
app.include_router(auth.router)
|
|
|
|
|
|
# =============================================================================
|
|
# Health Check Endpoint (public, no auth required)
|
|
# =============================================================================
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint - always accessible"""
|
|
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
|
|
|
|
|
|
async def check_offline_assets():
|
|
"""Background task to check for assets that have gone offline"""
|
|
config = get_config()
|
|
telegram_service = get_telegram_service()
|
|
|
|
if not telegram_service.enabled:
|
|
return
|
|
|
|
threshold = datetime.utcnow() - timedelta(seconds=config.asset_offline_seconds)
|
|
|
|
SessionLocal = get_session_factory()
|
|
db = SessionLocal()
|
|
try:
|
|
# Find assets that are marked online but haven't reported recently
|
|
states = db.query(AssetNotificationState).join(Asset).filter(
|
|
AssetNotificationState.is_online == True,
|
|
AssetNotificationState.last_validation_at != None,
|
|
AssetNotificationState.last_validation_at < threshold,
|
|
Asset.is_active == True,
|
|
Asset.telegram_enabled == True
|
|
).all()
|
|
|
|
for state in states:
|
|
chat_id = state.asset.telegram_chat_id or telegram_service.default_chat_id
|
|
if chat_id:
|
|
logger.info(f"Asset '{state.asset.name}' detected as offline (last seen: {state.last_validation_at})")
|
|
telegram_service.send_asset_offline_alert(
|
|
chat_id=chat_id,
|
|
asset_name=state.asset.name,
|
|
last_seen=state.last_validation_at,
|
|
offline_threshold_seconds=config.asset_offline_seconds
|
|
)
|
|
state.is_online = False
|
|
|
|
if states:
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking offline assets: {e}")
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def offline_checker_loop():
|
|
"""Background loop that periodically checks for offline assets"""
|
|
while True:
|
|
await asyncio.sleep(30) # Check every 30 seconds
|
|
try:
|
|
await check_offline_assets()
|
|
except Exception as e:
|
|
logger.error(f"Error in offline checker loop: {e}")
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Initialize database and background tasks on startup"""
|
|
logger.info("Starting GNSS Guard Server...")
|
|
init_db()
|
|
logger.info("Database initialized")
|
|
|
|
# Start background task for offline detection
|
|
asyncio.create_task(offline_checker_loop())
|
|
logger.info("Offline asset checker started")
|
|
|
|
|
|
# =============================================================================
|
|
# Web UI Routes
|
|
# =============================================================================
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def index(request: Request, user: Optional[str] = Depends(get_optional_user)):
|
|
"""Main dashboard page"""
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
if not templates:
|
|
return HTMLResponse("<h1>GNSS Guard Server</h1><p>Templates not configured</p>")
|
|
|
|
return templates.TemplateResponse("dashboard.html", {
|
|
"request": request,
|
|
"username": user,
|
|
"cache_buster": random.randint(100000, 999999)
|
|
})
|
|
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request, user: Optional[str] = Depends(get_optional_user)):
|
|
"""Login page"""
|
|
if user:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
if not templates:
|
|
return HTMLResponse("""
|
|
<h1>GNSS Guard Server - Login</h1>
|
|
<form method="post" action="/login">
|
|
<input name="username" placeholder="Username"><br>
|
|
<input name="password" type="password" placeholder="Password"><br>
|
|
<button type="submit">Login</button>
|
|
</form>
|
|
""")
|
|
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"cache_buster": random.randint(100000, 999999)
|
|
})
|
|
|
|
|
|
@app.get("/api/dashboard/assets")
|
|
async def dashboard_assets(
|
|
user: str = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get all assets status for dashboard"""
|
|
service = AssetService(db)
|
|
return service.get_all_assets_status()
|
|
|
|
|
|
@app.get("/api/dashboard/asset/{asset_name}/status")
|
|
async def dashboard_asset_status(
|
|
asset_name: str,
|
|
at: Optional[float] = None,
|
|
user: str = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Get detailed status for a specific asset (for dashboard display).
|
|
Matches the format expected by the client dashboard.
|
|
|
|
Args:
|
|
at: Optional Unix timestamp to get historical data at that time.
|
|
If not provided, returns the latest data.
|
|
"""
|
|
service = AssetService(db)
|
|
asset = service.get_asset_by_name(asset_name)
|
|
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
|
|
|
|
if at is not None:
|
|
# Get historical validation at specified timestamp
|
|
latest = service.get_validation_at_timestamp(asset.id, at)
|
|
else:
|
|
latest = service.get_latest_validation(asset.id)
|
|
|
|
if not latest:
|
|
return {
|
|
"error": "No validation data available",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Parse JSON fields
|
|
sources_missing = json.loads(latest.sources_missing or "[]")
|
|
sources_stale = json.loads(latest.sources_stale or "[]")
|
|
coordinate_differences = json.loads(latest.coordinate_differences or "{}")
|
|
source_coordinates = json.loads(latest.source_coordinates or "{}")
|
|
validation_details = json.loads(latest.validation_details or "{}")
|
|
|
|
# Get enabled sources from validation_details
|
|
expected_sources = validation_details.get("expected_sources", [])
|
|
|
|
# Build sources status (matching client format)
|
|
source_display_names = {
|
|
"nmea_primary": "Primary GPS",
|
|
"nmea_secondary": "Secondary GPS",
|
|
"tm_ais": "TM AIS GPS",
|
|
"starlink_gps": "Starlink GPS",
|
|
"starlink_location": "Starlink Location"
|
|
}
|
|
|
|
sources = {}
|
|
all_source_names = ["nmea_primary", "nmea_secondary", "tm_ais", "starlink_gps", "starlink_location"]
|
|
|
|
for source_name in all_source_names:
|
|
display_name = source_display_names.get(source_name, source_name)
|
|
|
|
if source_name not in expected_sources:
|
|
sources[source_name] = {
|
|
"display_name": display_name,
|
|
"enabled": False,
|
|
"status": "not_configured",
|
|
"is_stale": False,
|
|
"coordinates": None,
|
|
"last_update": None,
|
|
"last_update_unix": None
|
|
}
|
|
continue
|
|
|
|
source_data = source_coordinates.get(source_name)
|
|
is_stale = source_name in sources_stale
|
|
|
|
if not source_data:
|
|
sources[source_name] = {
|
|
"display_name": display_name,
|
|
"enabled": True,
|
|
"status": "missing",
|
|
"is_stale": is_stale,
|
|
"coordinates": None,
|
|
"last_update": None,
|
|
"last_update_unix": None
|
|
}
|
|
else:
|
|
status = "stale" if is_stale else "ok"
|
|
sources[source_name] = {
|
|
"display_name": display_name,
|
|
"enabled": True,
|
|
"status": status,
|
|
"is_stale": is_stale,
|
|
"coordinates": {
|
|
"latitude": source_data.get("latitude"),
|
|
"longitude": source_data.get("longitude")
|
|
},
|
|
"last_update": source_data.get("timestamp"),
|
|
"last_update_unix": source_data.get("timestamp_unix")
|
|
}
|
|
|
|
# Calculate max distance
|
|
threshold_meters = validation_details.get("threshold_meters", 200.0)
|
|
max_distance_km = None
|
|
max_distance_m = 0.0
|
|
|
|
if not latest.is_valid and coordinate_differences:
|
|
for diff_data in coordinate_differences.values():
|
|
if isinstance(diff_data, dict):
|
|
distance = diff_data.get("distance_meters", diff_data.get("distance_m", 0))
|
|
if distance > max_distance_m:
|
|
max_distance_m = distance
|
|
|
|
if max_distance_m > threshold_meters:
|
|
max_distance_km = max_distance_m / 1000.0
|
|
|
|
has_alert = (not latest.is_valid and max_distance_km is not None) or len(sources_missing) > 0
|
|
|
|
# Find map center
|
|
map_center = None
|
|
for priority_source in ["nmea_primary", "tm_ais", "starlink_location"]:
|
|
if sources.get(priority_source, {}).get("coordinates"):
|
|
coords = sources[priority_source]["coordinates"]
|
|
if coords.get("latitude") and coords.get("longitude"):
|
|
map_center = coords
|
|
break
|
|
|
|
if not map_center:
|
|
for source_data in sources.values():
|
|
if source_data.get("coordinates"):
|
|
coords = source_data["coordinates"]
|
|
if coords.get("latitude") and coords.get("longitude"):
|
|
map_center = coords
|
|
break
|
|
|
|
return {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"validation_timestamp": latest.validation_timestamp,
|
|
"validation_timestamp_unix": latest.validation_timestamp_unix,
|
|
"is_valid": latest.is_valid,
|
|
"has_alert": has_alert,
|
|
"max_distance_km": max_distance_km,
|
|
"threshold_meters": threshold_meters,
|
|
"sources": sources,
|
|
"sources_stale": sources_stale,
|
|
"map_center": map_center,
|
|
"asset_name": asset_name
|
|
}
|
|
|
|
|
|
@app.get("/api/dashboard/asset/{asset_name}/route")
|
|
async def dashboard_asset_route(
|
|
asset_name: str,
|
|
hours: int = 72,
|
|
until: Optional[float] = None,
|
|
user: str = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Get route data for map visualization.
|
|
|
|
Args:
|
|
hours: Number of hours of history (default 72)
|
|
until: Optional Unix timestamp to show route up to this time.
|
|
If not provided, shows route up to current time.
|
|
"""
|
|
service = AssetService(db)
|
|
asset = service.get_asset_by_name(asset_name)
|
|
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail=f"Asset '{asset_name}' not found")
|
|
|
|
return service.get_route_data(asset.id, hours, until_timestamp=until)
|
|
|
|
|
|
# =============================================================================
|
|
# Main entry point
|
|
# =============================================================================
|
|
|
|
def run_server():
|
|
"""Run the server using uvicorn"""
|
|
import uvicorn
|
|
config = get_config()
|
|
|
|
uvicorn.run(
|
|
"server.main:app",
|
|
host=config.server_host,
|
|
port=config.server_port,
|
|
reload=config.debug,
|
|
log_level="info"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_server()
|
|
|