Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

679 lines
31 KiB
Python

#!/usr/bin/env python3
"""
GNSS Guard - Main orchestrator
Coordinates data collection from multiple GPS sources and validation
"""
import asyncio
import json
import logging
import os
import signal
import sys
import threading
import time
from datetime import datetime, timezone
from typing import Dict, Any, Optional
from config import Config
from sources.tm_ais_gps import TMAISGPSFetcher
from sources.starlink_gps import StarlinkGPSFetcher
from sources.nmea_gps import NMEAGPSCollector
from storage.database import Database
from storage.logger import StructuredLogger
from storage.cleanup import CleanupManager
from validation.coordinate_validator import CoordinateValidator
from web.server import WebServer
from services.server_sync import ServerSync
from services.buzzer import get_buzzer_service
logger = logging.getLogger("gnss_guard.main")
class GNSSGuard:
"""Main orchestrator for GNSS Guard system"""
def __init__(self, config: Config):
"""Initialize GNSS Guard"""
self.config = config
self.running = False
# Initialize components
self.database = Database(config.database_path)
self.structured_logger = StructuredLogger(
config.logs_base_path,
config.log_retention_days
)
# Path to injected positions file (in same directory as main.py)
script_dir = os.path.dirname(os.path.abspath(__file__))
self.injected_positions_path = os.path.join(script_dir, "injected_positions.json")
# Initialize data sources
self.tm_ais_fetcher = TMAISGPSFetcher(config) if config.tm_ais_enabled else None
self.starlink_fetcher = StarlinkGPSFetcher(config) if config.starlink_enabled else None
# Initialize NMEA collectors
self.nmea_primary_collector = None
if config.nmea_primary_enabled and config.nmea_primary_ip and config.nmea_primary_port > 0:
self.nmea_primary_collector = NMEAGPSCollector(
config,
"nmea_primary",
config.nmea_primary_ip,
config.nmea_primary_port,
structured_logger=self.structured_logger
)
self.nmea_secondary_collector = None
if config.nmea_secondary_enabled and config.nmea_secondary_ip and config.nmea_secondary_port > 0:
self.nmea_secondary_collector = NMEAGPSCollector(
config,
"nmea_secondary",
config.nmea_secondary_ip,
config.nmea_secondary_port,
structured_logger=self.structured_logger
)
# Initialize validator
expected_sources = config.get_enabled_sources()
self.validator = CoordinateValidator(
config.validation_threshold_meters,
config.stale_threshold_seconds,
expected_sources
)
# Initialize buzzer service for hardware alarm (must be before web server)
# Buzzer sounds with 1 second on / 1 second off pattern during GNSS alerts
self.buzzer_service = get_buzzer_service(on_duration=1.0, off_duration=1.0)
# Track previous alert level to detect status changes
# Alert levels: "healthy", "degraded", "at_risk"
self._previous_alert_level = "healthy"
# Initialize web server (if enabled)
self.web_server = None
self.web_thread = None
if config.web_enabled:
try:
self.web_server = WebServer(config, self.database, self.buzzer_service)
logger.info("Web server initialized")
except Exception as e:
logger.warning(f"Failed to initialize web server: {e}")
self.web_server = None
# Initialize cleanup manager
# In demo mode, skip database cleanup since data isn't growing
# (demo mode creates and deletes records, maintaining a fixed dataset)
self.cleanup_manager = CleanupManager(
database_path=config.database_path,
logs_base_path=config.logs_base_path,
positions_raw_retention_days=config.positions_raw_retention_days,
positions_validation_retention_days=config.positions_validation_retention_days,
logs_retention_days=config.log_retention_days,
demo_mode=config.demo_unit
)
if config.demo_unit:
logger.info(
f"Cleanup manager initialized in DEMO mode (logs only: {config.log_retention_days}d)"
)
else:
logger.info(
f"Cleanup manager initialized (raw: {config.positions_raw_retention_days}d, "
f"validation: {config.positions_validation_retention_days}d, logs: {config.log_retention_days}d)"
)
# Initialize server sync (if enabled)
self.server_sync = None
if config.server_enabled and config.server_url and config.server_token:
try:
self.server_sync = ServerSync(
database_path=config.database_path,
server_url=config.server_url,
server_token=config.server_token,
asset_name=config.asset_name,
batch_size=config.server_sync_batch_size,
max_queue_size=config.server_sync_max_queue
)
logger.info(f"Server sync enabled -> {config.server_url}")
except Exception as e:
logger.warning(f"Failed to initialize server sync: {e}")
self.server_sync = None
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}, shutting down gracefully...")
self.running = False
def _load_injected_positions(self) -> Optional[Dict[str, Dict[str, Any]]]:
"""
Load injected positions from JSON file if it exists
Returns:
Dictionary mapping source names to position dictionaries, or None if file doesn't exist
"""
if not os.path.exists(self.injected_positions_path):
return None
try:
with open(self.injected_positions_path, 'r') as f:
data = json.load(f)
# Validate and normalize positions
injected = {}
for source, position in data.items():
# Skip metadata fields (those starting with underscore)
if source.startswith("_"):
continue
# Skip commented-out sources (those starting with //)
if source.startswith("//"):
continue
if position is None:
# Null value means this source should be absent
# Store it as None so we know to skip fetching for this source
injected[source] = None
continue
# Ensure required fields are present
if not isinstance(position, dict):
logger.warning(f"Invalid position format for {source} in injected_positions.json")
continue
# Ensure source field matches the key
position["source"] = source
# Ensure timestamp_unix is set if timestamp is provided
if "timestamp" in position and "timestamp_unix" not in position:
try:
ts = datetime.fromisoformat(position["timestamp"].replace("Z", "+00:00"))
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
position["timestamp_unix"] = ts.timestamp()
except Exception as e:
logger.warning(f"Failed to parse timestamp for {source}: {e}")
# Use current time as fallback
now = datetime.now(timezone.utc)
position["timestamp"] = now.isoformat()
position["timestamp_unix"] = now.timestamp()
# Ensure timestamp is set if timestamp_unix is provided
if "timestamp_unix" in position and "timestamp" not in position:
try:
ts = datetime.fromtimestamp(position["timestamp_unix"], tz=timezone.utc)
position["timestamp"] = ts.isoformat()
except Exception as e:
logger.warning(f"Failed to convert timestamp_unix for {source}: {e}")
position["timestamp"] = datetime.now(timezone.utc).isoformat()
# Ensure both exist (use current time if neither provided)
if "timestamp_unix" not in position:
now = datetime.now(timezone.utc)
position["timestamp"] = now.isoformat()
position["timestamp_unix"] = now.timestamp()
injected[source] = position
logger.info(f"Loaded {len(injected)} injected position(s) from {self.injected_positions_path}")
return injected
except json.JSONDecodeError as e:
logger.error(f"Failed to parse injected_positions.json: {e}")
return None
except Exception as e:
logger.error(f"Error loading injected positions: {e}")
return None
def _store_demo_validation(self, validation_result: Dict[str, Any]):
"""
Store validation in DEMO_UNIT mode.
Keeps only the latest validation record to show live status on dashboard,
while preserving historical data for route display.
Deletes any validation records from the last 5 minutes before inserting new one.
"""
import sqlite3 as sqlite3_module
try:
conn = sqlite3_module.connect(str(self.database.database_path), timeout=5.0)
cursor = conn.cursor()
# Delete recent "live" records (last 5 minutes) to prevent accumulation
# This keeps historical data intact while allowing fresh dashboard display
five_minutes_ago = time.time() - 300
cursor.execute(
"DELETE FROM positions_validation WHERE validation_timestamp_unix > ?",
(five_minutes_ago,)
)
conn.commit()
conn.close()
# Now store the new validation record
self.database.store_validation(validation_result)
except Exception as e:
logger.error(f"Error storing demo validation: {e}")
def _handle_buzzer_alarm(self, is_valid: bool, missing_sources: list, stale_sources: list, distance_exceeded: bool):
"""
Handle buzzer alarm based on validation status.
Buzzer triggers when GNSS status is:
- "at risk" (GPS jamming/spoofing detected - distance exceeds threshold)
- "degraded" (sources missing or stale)
- "no connection" (all sources missing)
Buzzer stops when:
- Status returns to healthy (validation passes)
- User acknowledges the alarm via the dashboard button
Buzzer restarts when:
- Alert level changes (e.g., degraded → at_risk or vice versa)
Args:
is_valid: Whether validation passed
missing_sources: List of missing source names
stale_sources: List of stale source names
distance_exceeded: Whether coordinate distance exceeded threshold
"""
try:
# Determine current alert level
# "at_risk" = GPS spoofing/jamming (distance exceeded)
# "degraded" = sources missing or stale but no distance issue
# "healthy" = validation passed
if is_valid:
current_alert_level = "healthy"
elif distance_exceeded:
current_alert_level = "at_risk"
else:
current_alert_level = "degraded"
# Check if alert level changed
alert_level_changed = current_alert_level != self._previous_alert_level
if alert_level_changed:
logger.info(f"Alert level changed: {self._previous_alert_level}{current_alert_level}")
# Reset acknowledged state when alert level changes
# This allows buzzer to restart even if previously acknowledged
if self.buzzer_service.is_alarm_acknowledged():
logger.info("Resetting alarm acknowledged state (alert level changed)")
self.buzzer_service.reset_acknowledged()
# Stop current alarm if running (will restart below if needed)
if self.buzzer_service.is_alarm_active():
self.buzzer_service.stop_alarm()
# Handle alarm based on current alert level
if current_alert_level != "healthy":
# Status is degraded or at risk
# Start alarm if not already active and not acknowledged
if not self.buzzer_service.is_alarm_active():
if not self.buzzer_service.is_alarm_acknowledged():
# Determine alarm reason for logging
if current_alert_level == "at_risk":
reason = "GPS jamming/spoofing detected (distance exceeded threshold)"
elif missing_sources:
reason = f"Sources missing: {', '.join(missing_sources)}"
elif stale_sources:
reason = f"Sources stale: {', '.join(stale_sources)}"
else:
reason = "Validation failed"
logger.warning(f"Starting buzzer alarm: {reason}")
self.structured_logger.warning("buzzer", f"Alarm started: {reason}")
self.buzzer_service.start_alarm()
else:
logger.debug("Alarm acknowledged, not restarting until alert level changes")
else:
# Status is healthy
# Stop alarm if active
if self.buzzer_service.is_alarm_active():
logger.info("Status returned to healthy, stopping buzzer alarm")
self.structured_logger.info("buzzer", "Alarm stopped (status healthy)")
self.buzzer_service.stop_alarm()
# Reset acknowledged state when healthy
if self.buzzer_service.is_alarm_acknowledged():
logger.debug("Resetting alarm acknowledged state (status healthy)")
self.buzzer_service.reset_acknowledged()
# Track alert level for next iteration
self._previous_alert_level = current_alert_level
except Exception as e:
logger.error(f"Error handling buzzer alarm: {e}")
async def start(self):
"""Start GNSS Guard system"""
logger.info("Starting GNSS Guard system")
self.structured_logger.info("system", "GNSS Guard starting", {"config": self.config.to_dict()})
# Start web server in separate thread
if self.web_server:
self.web_thread = threading.Thread(
target=self.web_server.run,
kwargs={
'host': self.config.web_host,
'port': self.config.web_port,
'debug': False
},
daemon=True
)
self.web_thread.start()
logger.info(f"Web server started on {self.config.web_host}:{self.config.web_port}")
# Log DEMO_UNIT mode if enabled
if self.config.demo_unit:
logger.info("DEMO_UNIT mode enabled - data collection active but database writes disabled")
self.structured_logger.info("system", "DEMO_UNIT mode - no database writes")
# Start NMEA collectors
if self.nmea_primary_collector:
await self.nmea_primary_collector.start()
logger.info("Started NMEA primary collector")
if self.nmea_secondary_collector:
await self.nmea_secondary_collector.start()
logger.info("Started NMEA secondary collector")
# Startup warm-up period: wait for data sources to connect and receive initial data
# This prevents false "missing" alerts on first validation after restart/deploy
if self.config.startup_warmup_seconds > 0:
logger.info(f"Waiting {self.config.startup_warmup_seconds}s for data sources to initialize...")
self.structured_logger.info(
"system",
"Startup warm-up period",
{"warmup_seconds": self.config.startup_warmup_seconds}
)
await asyncio.sleep(self.config.startup_warmup_seconds)
logger.info("Warm-up complete, starting validation cycle")
self.running = True
# Main collection loop - ensure iterations start at regular intervals
while self.running:
iteration_start = time.time()
try:
await self._iteration()
except Exception as e:
logger.error(f"Error in main loop: {e}")
self.structured_logger.error("system", f"Error in main loop: {e}")
# Calculate how long the iteration took
iteration_duration = time.time() - iteration_start
# Sleep for the remaining time to maintain the iteration period
sleep_time = self.config.iteration_period_seconds - iteration_duration
if sleep_time > 0:
logger.debug(f"Iteration took {iteration_duration:.2f}s, sleeping for {sleep_time:.2f}s")
await asyncio.sleep(sleep_time)
else:
logger.warning(
f"Iteration took {iteration_duration:.2f}s, which exceeds the configured period "
f"of {self.config.iteration_period_seconds}s. Starting next iteration immediately."
)
# No sleep, start next iteration immediately
async def _iteration(self):
"""Execute one iteration of data collection and validation"""
# Run daily cleanup if needed (runs once per day)
self.cleanup_manager.run_cleanup_if_needed()
logger.info("Starting data collection iteration")
positions = {}
# Check for injected positions (per-source injection)
injected_positions = self._load_injected_positions() or {}
# Add injected positions (if any)
if injected_positions:
injected_sources = [s for s, p in injected_positions.items() if p is not None]
if injected_sources:
logger.info(f"Using injected positions for: {', '.join(injected_sources)}")
# DEMO_UNIT mode: skip database writes
skip_db_writes = self.config.demo_unit
# Fetch from TM AIS GPS (skip if injected)
if "tm_ais" not in injected_positions and self.tm_ais_fetcher:
try:
position = self.tm_ais_fetcher.fetch()
if position:
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info("tm_ais", "Fetched position", {"position": position})
except Exception as e:
logger.error(f"Error fetching TM AIS GPS: {e}")
self.structured_logger.error("tm_ais", f"Fetch error: {e}")
elif "tm_ais" in injected_positions:
# Use injected position for tm_ais
if injected_positions["tm_ais"] is not None:
position = injected_positions["tm_ais"]
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info("tm_ais", "Injected position", {"position": position})
# Fetch from Starlink GPS (always fetch, then override with injected if present)
if self.starlink_fetcher:
# Only fetch if at least one Starlink source is not injected
if "starlink_location" not in injected_positions or "starlink_gps" not in injected_positions:
logger.info("Fetching from Starlink GPS...")
try:
starlink_positions = self.starlink_fetcher.fetch()
for position in starlink_positions:
# Only use fetched position if this source is not injected
if position["source"] not in injected_positions:
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info(
position["source"],
"Fetched position",
{"position": position}
)
except Exception as e:
logger.error(f"Error fetching Starlink GPS: {e}")
self.structured_logger.error("starlink", f"Fetch error: {e}")
# Use injected positions for Starlink sources (if any)
for starlink_source in ["starlink_location", "starlink_gps"]:
if starlink_source in injected_positions and injected_positions[starlink_source] is not None:
position = injected_positions[starlink_source]
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info(starlink_source, "Injected position", {"position": position})
# Get latest positions from NMEA collectors (skip if injected)
if "nmea_primary" not in injected_positions and self.nmea_primary_collector:
try:
position = await self.nmea_primary_collector.get_latest_position()
if position:
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info("nmea_primary", "Updated position", {"position": position})
except Exception as e:
logger.error(f"Error getting NMEA primary position: {e}")
self.structured_logger.error("nmea_primary", f"Position error: {e}")
elif "nmea_primary" in injected_positions:
# Use injected position for nmea_primary
if injected_positions["nmea_primary"] is not None:
position = injected_positions["nmea_primary"]
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info("nmea_primary", "Injected position", {"position": position})
if "nmea_secondary" not in injected_positions and self.nmea_secondary_collector:
try:
position = await self.nmea_secondary_collector.get_latest_position()
if position:
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info("nmea_secondary", "Updated position", {"position": position})
except Exception as e:
logger.error(f"Error getting NMEA secondary position: {e}")
self.structured_logger.error("nmea_secondary", f"Position error: {e}")
elif "nmea_secondary" in injected_positions:
# Use injected position for nmea_secondary
if injected_positions["nmea_secondary"] is not None:
position = injected_positions["nmea_secondary"]
positions[position["source"]] = position
if not skip_db_writes:
self.database.store_position(position)
self.structured_logger.info("nmea_secondary", "Injected position", {"position": position})
# Run validation
logger.info(f"Collected {len(positions)} positions, running validation")
try:
validation_result = self.validator.validate_positions(positions)
if skip_db_writes:
# DEMO_UNIT mode: store validation for live dashboard display
# but delete recent "live" records to prevent accumulation
# (keeps only last few minutes of live data, historical data untouched)
self._store_demo_validation(validation_result)
else:
self.database.store_validation(validation_result)
# Sync to server if enabled (only when not in DEMO_UNIT mode)
if self.server_sync:
try:
if self.server_sync.sync_validation(validation_result):
logger.debug("Validation synced to server")
else:
logger.debug("Validation queued for later sync")
except Exception as e:
logger.warning(f"Server sync error: {e}")
# Log validation result to terminal
is_valid = validation_result["is_valid"]
missing_sources = validation_result.get("sources_missing", [])
stale_sources = validation_result.get("sources_stale", [])
coordinate_differences = validation_result.get("coordinate_differences", {})
validation_details = validation_result.get("validation_details", {})
max_distance = validation_details.get("max_distance_meters", 0.0)
if is_valid:
logger.info("✓ Validation PASSED")
if missing_sources:
logger.info(f" Missing sources: {', '.join(missing_sources)}")
if stale_sources:
logger.info(f" Stale sources: {', '.join(stale_sources)}")
if coordinate_differences:
logger.info(f" Max distance difference: {max_distance:.2f}m")
else:
logger.info(" All sources within threshold")
else:
logger.warning("✗ Validation FAILED")
# Check if failure is due to distance (GPS jamming/spoofing alert)
threshold = validation_details.get('threshold_meters', 0)
if max_distance > threshold:
distance_km = max_distance / 1000.0
logger.warning("")
logger.warning("=" * 60)
logger.warning("🚨 GPS Jamming or Spoofing Alert! 🚨")
logger.warning(f" Location Distance: {distance_km:.1f} km")
logger.warning("=" * 60)
logger.warning("")
if missing_sources:
logger.warning(f" Missing sources: {', '.join(missing_sources)}")
if stale_sources:
logger.warning(f" Stale sources: {', '.join(stale_sources)}")
if coordinate_differences:
logger.warning(f" Max distance difference: {max_distance:.2f}m (threshold: {threshold}m)")
# Log individual differences if there are any
for pair, diff_info in coordinate_differences.items():
logger.warning(f" {pair}: {diff_info.get('distance_meters', 0):.2f}m")
# Log to structured logger
if is_valid:
self.structured_logger.info(
"validation",
"Validation passed",
{"validation": validation_result}
)
else:
self.structured_logger.warning(
"validation",
"Validation failed",
{"validation": validation_result}
)
# Handle buzzer alarm based on validation status
# Alarm triggers when: degraded, at risk, or no connection (any validation failure)
# Status changes:
# - "at risk" (crit): has_alert AND distance exceeds threshold
# - "degraded" (warn): validation failed but no distance alert
# - "healthy": validation passed
self._handle_buzzer_alarm(is_valid, missing_sources, stale_sources, max_distance > validation_details.get('threshold_meters', 0))
except Exception as e:
logger.error(f"Error during validation: {e}")
self.structured_logger.error("validation", f"Validation error: {e}")
logger.info("Iteration complete")
async def stop(self):
"""Stop GNSS Guard system"""
logger.info("Stopping GNSS Guard system")
self.running = False
# Stop buzzer service
if self.buzzer_service:
self.buzzer_service.shutdown()
# Stop NMEA collectors
if self.nmea_primary_collector:
await self.nmea_primary_collector.stop()
if self.nmea_secondary_collector:
await self.nmea_secondary_collector.stop()
# Log shutdown before closing logger
self.structured_logger.info("system", "GNSS Guard stopped")
# Close logger
self.structured_logger.close()
async def main():
"""Main entry point"""
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Load configuration
config = Config()
# Create and start GNSS Guard
guard = GNSSGuard(config)
try:
await guard.start()
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
finally:
await guard.stop()
if __name__ == "__main__":
asyncio.run(main())