Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

317 lines
12 KiB
Python

#!/usr/bin/env python3
"""
SQLite database storage for GNSS Guard
Manages positions_raw and positions_validation tables
"""
import json
import logging
import sqlite3
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, Optional, List
logger = logging.getLogger("gnss_guard.database")
class Database:
"""SQLite database manager for GNSS Guard"""
def __init__(self, database_path: Path):
"""
Initialize database
Args:
database_path: Path to SQLite database file
"""
self.database_path = Path(database_path)
self.database_path.parent.mkdir(parents=True, exist_ok=True)
self._init_database()
def _init_database(self):
"""Initialize database schema and configure SQLite for optimal performance"""
try:
conn = sqlite3.connect(str(self.database_path), check_same_thread=False)
cursor = conn.cursor()
# Configure SQLite for better performance and concurrency
# WAL mode allows concurrent reads during writes
cursor.execute("PRAGMA journal_mode=WAL")
# Set busy timeout to 30 seconds (in milliseconds)
cursor.execute("PRAGMA busy_timeout=30000")
# NORMAL synchronous is faster than FULL while still being safe with WAL
cursor.execute("PRAGMA synchronous=NORMAL")
# Enable foreign key constraints (good practice)
cursor.execute("PRAGMA foreign_keys=ON")
# Create positions_raw table
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS positions_raw (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
timestamp TEXT NOT NULL,
timestamp_unix REAL NOT NULL,
latitude REAL,
longitude REAL,
altitude REAL,
position_uncertainty_m REAL,
supplementary_data TEXT,
created_at REAL NOT NULL,
UNIQUE(source, timestamp_unix)
)
"""
)
# Add position_uncertainty_m column if it doesn't exist (migration for existing databases)
try:
cursor.execute("ALTER TABLE positions_raw ADD COLUMN position_uncertainty_m REAL")
except sqlite3.OperationalError:
# Column already exists, ignore
pass
# Create positions_validation table
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS positions_validation (
id INTEGER PRIMARY KEY AUTOINCREMENT,
validation_timestamp TEXT NOT NULL,
validation_timestamp_unix REAL NOT NULL,
is_valid INTEGER NOT NULL,
sources_missing TEXT,
sources_stale TEXT,
coordinate_differences TEXT,
source_coordinates TEXT,
validation_details TEXT,
created_at REAL NOT NULL
)
"""
)
# Create indexes
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_positions_raw_source_timestamp
ON positions_raw(source, timestamp_unix DESC)
"""
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_positions_validation_timestamp
ON positions_validation(validation_timestamp_unix DESC)
"""
)
conn.commit()
conn.close()
logger.info(f"Database initialized at {self.database_path}")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
def store_position(self, position: Dict[str, Any]) -> bool:
"""
Store or update a position in positions_raw table
Args:
position: Dictionary with position data (source, latitude, longitude, etc.)
Returns:
True if successful, False otherwise
"""
try:
conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0)
cursor = conn.cursor()
# Use INSERT OR REPLACE to update latest position per source
cursor.execute(
"""
INSERT OR REPLACE INTO positions_raw
(source, timestamp, timestamp_unix, latitude, longitude, altitude, position_uncertainty_m, supplementary_data, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
position.get("source"),
position.get("timestamp"),
position.get("timestamp_unix"),
position.get("latitude"),
position.get("longitude"),
position.get("altitude"),
position.get("position_uncertainty_m"),
json.dumps(position.get("supplementary_data", {})),
time.time(),
),
)
conn.commit()
conn.close()
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e):
# Retry once after short delay
time.sleep(0.01)
try:
conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0)
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR REPLACE INTO positions_raw
(source, timestamp, timestamp_unix, latitude, longitude, altitude, position_uncertainty_m, supplementary_data, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
position.get("source"),
position.get("timestamp"),
position.get("timestamp_unix"),
position.get("latitude"),
position.get("longitude"),
position.get("altitude"),
position.get("position_uncertainty_m"),
json.dumps(position.get("supplementary_data", {})),
time.time(),
),
)
conn.commit()
conn.close()
return True
except Exception:
pass
logger.error(f"Failed to store position: {e}")
return False
except Exception as e:
logger.error(f"Failed to store position: {e}")
return False
def store_validation(self, validation_result: Dict[str, Any]) -> bool:
"""
Store validation result in positions_validation table
Args:
validation_result: Dictionary with validation data
Returns:
True if successful, False otherwise
"""
try:
conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0)
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO positions_validation
(validation_timestamp, validation_timestamp_unix, is_valid, sources_missing,
sources_stale, coordinate_differences, source_coordinates, validation_details, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
validation_result.get("validation_timestamp"),
validation_result.get("validation_timestamp_unix"),
1 if validation_result.get("is_valid") else 0,
json.dumps(validation_result.get("sources_missing", [])),
json.dumps(validation_result.get("sources_stale", [])),
json.dumps(validation_result.get("coordinate_differences", {})),
json.dumps(validation_result.get("source_coordinates", {})),
json.dumps(validation_result.get("validation_details", {})),
time.time(),
),
)
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"Failed to store validation result: {e}")
return False
def get_latest_positions(self) -> Dict[str, Dict[str, Any]]:
"""
Get latest position for each source
Returns:
Dictionary mapping source names to their latest positions
"""
try:
conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0)
cursor = conn.cursor()
cursor.execute(
"""
SELECT source, timestamp, timestamp_unix, latitude, longitude, altitude, position_uncertainty_m, supplementary_data
FROM positions_raw
WHERE (source, timestamp_unix) IN (
SELECT source, MAX(timestamp_unix)
FROM positions_raw
GROUP BY source
)
"""
)
positions = {}
for row in cursor.fetchall():
source, timestamp, timestamp_unix, lat, lon, alt, pos_uncertainty, supp_data = row
positions[source] = {
"source": source,
"timestamp": timestamp,
"timestamp_unix": timestamp_unix,
"latitude": lat,
"longitude": lon,
"altitude": alt,
"position_uncertainty_m": pos_uncertainty,
"supplementary_data": json.loads(supp_data) if supp_data else {},
}
conn.close()
return positions
except Exception as e:
logger.error(f"Failed to get latest positions: {e}")
return {}
def get_latest_validation(self) -> Optional[Dict[str, Any]]:
"""
Get the most recent validation result from the database.
Used to restore state after app restart.
Returns:
Dictionary with validation data or None if not found
"""
try:
conn = sqlite3.connect(str(self.database_path), check_same_thread=False, timeout=5.0)
cursor = conn.cursor()
cursor.execute(
"""
SELECT validation_timestamp, validation_timestamp_unix, is_valid,
sources_missing, sources_stale, coordinate_differences,
source_coordinates, validation_details
FROM positions_validation
ORDER BY validation_timestamp_unix DESC
LIMIT 1
"""
)
row = cursor.fetchone()
conn.close()
if row:
return {
"validation_timestamp": row[0],
"validation_timestamp_unix": row[1],
"is_valid": row[2] == 1,
"sources_missing": json.loads(row[3]) if row[3] else [],
"sources_stale": json.loads(row[4]) if row[4] else [],
"coordinate_differences": json.loads(row[5]) if row[5] else {},
"source_coordinates": json.loads(row[6]) if row[6] else {},
"validation_details": json.loads(row[7]) if row[7] else {},
}
return None
except Exception as e:
logger.error(f"Failed to get latest validation: {e}")
return None