#!/usr/bin/env python3 """ SQLite database storage for GNSS Guard Manages positions_raw and positions_validation tables """ import json import logging import sqlite3 import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, Optional, List logger = logging.getLogger("gnss_guard.database") class Database: """SQLite database manager for GNSS Guard""" def __init__(self, database_path: Path): """ Initialize database Args: database_path: Path to SQLite database file """ self.database_path = Path(database_path) self.database_path.parent.mkdir(parents=True, exist_ok=True) self._init_database() def _init_database(self): """Initialize database schema and configure SQLite for optimal performance""" try: conn = sqlite3.connect(str(self.database_path), check_same_thread=False) cursor = conn.cursor() # Configure SQLite for better performance and concurrency # WAL mode allows concurrent reads during writes cursor.execute("PRAGMA journal_mode=WAL") # Set busy timeout to 30 seconds (in milliseconds) cursor.execute("PRAGMA busy_timeout=30000") # NORMAL synchronous is faster than FULL while still being safe with WAL cursor.execute("PRAGMA synchronous=NORMAL") # Enable foreign key constraints (good practice) cursor.execute("PRAGMA foreign_keys=ON") # Create positions_raw table cursor.execute( """ CREATE TABLE IF NOT EXISTS positions_raw ( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT NOT NULL, timestamp TEXT NOT NULL, timestamp_unix REAL NOT NULL, latitude REAL, longitude REAL, altitude REAL, position_uncertainty_m REAL, supplementary_data TEXT, created_at REAL NOT NULL, UNIQUE(source, timestamp_unix) ) """ ) # Add position_uncertainty_m column if it doesn't exist (migration for existing databases) try: cursor.execute("ALTER TABLE positions_raw ADD COLUMN position_uncertainty_m REAL") except sqlite3.OperationalError: # Column already exists, ignore pass # Create positions_validation table cursor.execute( """ CREATE TABLE IF NOT EXISTS positions_validation ( id INTEGER PRIMARY KEY AUTOINCREMENT, validation_timestamp TEXT NOT NULL, validation_timestamp_unix REAL NOT NULL, is_valid INTEGER NOT NULL, sources_missing TEXT, sources_stale TEXT, coordinate_differences TEXT, source_coordinates TEXT, validation_details TEXT, created_at REAL NOT NULL ) """ ) # Create indexes cursor.execute( """ CREATE INDEX IF NOT EXISTS idx_positions_raw_source_timestamp ON positions_raw(source, timestamp_unix DESC) """ ) cursor.execute( """ CREATE INDEX IF NOT EXISTS idx_positions_validation_timestamp ON positions_validation(validation_timestamp_unix DESC) """ ) conn.commit() conn.close() logger.info(f"Database initialized at {self.database_path}") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise def store_position(self, position: Dict[str, Any]) -> bool: """ Store or update a position in positions_raw table Args: position: Dictionary with position data (source, latitude, longitude, etc.) Returns: True if successful, False otherwise """ try: conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0) cursor = conn.cursor() # Use INSERT OR REPLACE to update latest position per source cursor.execute( """ INSERT OR REPLACE INTO positions_raw (source, timestamp, timestamp_unix, latitude, longitude, altitude, position_uncertainty_m, supplementary_data, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( position.get("source"), position.get("timestamp"), position.get("timestamp_unix"), position.get("latitude"), position.get("longitude"), position.get("altitude"), position.get("position_uncertainty_m"), json.dumps(position.get("supplementary_data", {})), time.time(), ), ) conn.commit() conn.close() return True except sqlite3.OperationalError as e: if "database is locked" in str(e): # Retry once after short delay time.sleep(0.01) try: conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0) cursor = conn.cursor() cursor.execute( """ INSERT OR REPLACE INTO positions_raw (source, timestamp, timestamp_unix, latitude, longitude, altitude, position_uncertainty_m, supplementary_data, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( position.get("source"), position.get("timestamp"), position.get("timestamp_unix"), position.get("latitude"), position.get("longitude"), position.get("altitude"), position.get("position_uncertainty_m"), json.dumps(position.get("supplementary_data", {})), time.time(), ), ) conn.commit() conn.close() return True except Exception: pass logger.error(f"Failed to store position: {e}") return False except Exception as e: logger.error(f"Failed to store position: {e}") return False def store_validation(self, validation_result: Dict[str, Any]) -> bool: """ Store validation result in positions_validation table Args: validation_result: Dictionary with validation data Returns: True if successful, False otherwise """ try: conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0) cursor = conn.cursor() cursor.execute( """ INSERT INTO positions_validation (validation_timestamp, validation_timestamp_unix, is_valid, sources_missing, sources_stale, coordinate_differences, source_coordinates, validation_details, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( validation_result.get("validation_timestamp"), validation_result.get("validation_timestamp_unix"), 1 if validation_result.get("is_valid") else 0, json.dumps(validation_result.get("sources_missing", [])), json.dumps(validation_result.get("sources_stale", [])), json.dumps(validation_result.get("coordinate_differences", {})), json.dumps(validation_result.get("source_coordinates", {})), json.dumps(validation_result.get("validation_details", {})), time.time(), ), ) conn.commit() conn.close() return True except Exception as e: logger.error(f"Failed to store validation result: {e}") return False def get_latest_positions(self) -> Dict[str, Dict[str, Any]]: """ Get latest position for each source Returns: Dictionary mapping source names to their latest positions """ try: conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0) cursor = conn.cursor() cursor.execute( """ SELECT source, timestamp, timestamp_unix, latitude, longitude, altitude, position_uncertainty_m, supplementary_data FROM positions_raw WHERE (source, timestamp_unix) IN ( SELECT source, MAX(timestamp_unix) FROM positions_raw GROUP BY source ) """ ) positions = {} for row in cursor.fetchall(): source, timestamp, timestamp_unix, lat, lon, alt, pos_uncertainty, supp_data = row positions[source] = { "source": source, "timestamp": timestamp, "timestamp_unix": timestamp_unix, "latitude": lat, "longitude": lon, "altitude": alt, "position_uncertainty_m": pos_uncertainty, "supplementary_data": json.loads(supp_data) if supp_data else {}, } conn.close() return positions except Exception as e: logger.error(f"Failed to get latest positions: {e}") return {} def get_latest_validation(self) -> Optional[Dict[str, Any]]: """ Get the most recent validation result from the database. Used to restore state after app restart. Returns: Dictionary with validation data or None if not found """ try: conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0) cursor = conn.cursor() cursor.execute( """ SELECT validation_timestamp, validation_timestamp_unix, is_valid, sources_missing, sources_stale, coordinate_differences, source_coordinates, validation_details FROM positions_validation ORDER BY validation_timestamp_unix DESC LIMIT 1 """ ) row = cursor.fetchone() conn.close() if row: return { "validation_timestamp": row[0], "validation_timestamp_unix": row[1], "is_valid": row[2] == 1, "sources_missing": json.loads(row[3]) if row[3] else [], "sources_stale": json.loads(row[4]) if row[4] else [], "coordinate_differences": json.loads(row[5]) if row[5] else {}, "source_coordinates": json.loads(row[6]) if row[6] else {}, "validation_details": json.loads(row[7]) if row[7] else {}, } return None except Exception as e: logger.error(f"Failed to get latest validation: {e}") return None