#!/usr/bin/env python3 """ Server Sync Service for GNSS Guard Client Syncs validation data to the central GNSS Guard Server. Features: - Immediate sync on each validation - Offline queue for failed syncs - Batch catchup for queued records """ import json import logging import sqlite3 import time from datetime import datetime from pathlib import Path from typing import Dict, Any, List, Optional import requests logger = logging.getLogger("gnss_guard.server_sync") class ServerSync: """ Syncs validation data to the central GNSS Guard Server. Features: - Sends validation results to server after each iteration - Queues failed requests for retry - Batch sends queued records on successful connection """ def __init__( self, database_path: Path, server_url: str, server_token: str, asset_name: str, batch_size: int = 100, max_queue_size: int = 1000 ): """ Initialize server sync service. Args: database_path: Path to SQLite database (for sync queue) server_url: Base URL of GNSS Guard Server server_token: Authentication token for this asset asset_name: Name of this asset batch_size: Max records to send in batch catchup max_queue_size: Max records to keep in queue """ self.database_path = database_path self.server_url = server_url.rstrip('/') self.server_token = server_token self.asset_name = asset_name self.batch_size = batch_size self.max_queue_size = max_queue_size # Request timeout (seconds) self.timeout = 10 # Initialize sync queue table self._init_sync_queue_table() logger.info(f"Server sync initialized for asset '{asset_name}' -> {server_url}") def _init_sync_queue_table(self): """Create sync_queue table if it doesn't exist""" try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS sync_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, validation_timestamp_unix REAL NOT NULL, payload TEXT NOT NULL, created_at TEXT NOT NULL, attempts INTEGER DEFAULT 0, last_attempt_at TEXT, UNIQUE(validation_timestamp_unix) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_sync_queue_timestamp ON sync_queue(validation_timestamp_unix) """) conn.commit() conn.close() logger.debug("Sync queue table initialized") except Exception as e: logger.error(f"Failed to initialize sync queue table: {e}") def _get_headers(self) -> Dict[str, str]: """Get request headers with authentication""" return { "Authorization": f"Bearer {self.server_token}", "Content-Type": "application/json" } def sync_validation(self, validation_result: Dict[str, Any]) -> bool: """ Sync a validation result to the server. If sync fails, the record is queued for later retry. If sync succeeds, attempt to send any queued records. Args: validation_result: Validation result from CoordinateValidator Returns: bool: True if sync succeeded, False if queued """ # Prepare payload payload = { "validation_timestamp": validation_result.get("validation_timestamp"), "validation_timestamp_unix": validation_result.get("validation_timestamp_unix"), "is_valid": validation_result.get("is_valid", False), "sources_missing": validation_result.get("sources_missing", []), "sources_stale": validation_result.get("sources_stale", []), "coordinate_differences": validation_result.get("coordinate_differences", {}), "source_coordinates": validation_result.get("source_coordinates", {}), "validation_details": validation_result.get("validation_details", {}), } # Try to send success = self._send_validation(payload) if success: # On success, try to send queued records self._process_queue() else: # On failure, queue the record self._queue_record(payload) return success def _send_validation(self, payload: Dict[str, Any]) -> bool: """ Send a single validation record to the server. Args: payload: Validation data to send Returns: bool: True if successful """ try: url = f"{self.server_url}/api/v1/validation" response = requests.post( url, json=payload, headers=self._get_headers(), timeout=self.timeout ) if response.status_code == 201: logger.debug(f"Validation synced to server") return True elif response.status_code == 401: logger.error(f"Server auth failed - check SERVER_TOKEN") return False else: logger.warning(f"Server returned {response.status_code}: {response.text[:200]}") return False except requests.exceptions.Timeout: logger.warning(f"Server request timed out") return False except requests.exceptions.ConnectionError: logger.warning(f"Cannot connect to server at {self.server_url}") return False except Exception as e: logger.error(f"Server sync error: {e}") return False def _send_batch(self, records: List[Dict[str, Any]]) -> bool: """ Send a batch of validation records to the server. Args: records: List of validation payloads Returns: bool: True if successful """ try: url = f"{self.server_url}/api/v1/validation/batch" response = requests.post( url, json={"records": records}, headers=self._get_headers(), timeout=self.timeout * 3 # Longer timeout for batch ) if response.status_code == 201: result = response.json() logger.info(f"Batch sync: {result.get('saved', 0)} saved, {result.get('skipped', 0)} skipped") return True else: logger.warning(f"Batch sync failed: {response.status_code}") return False except Exception as e: logger.error(f"Batch sync error: {e}") return False def _queue_record(self, payload: Dict[str, Any]): """ Add a validation record to the sync queue. Args: payload: Validation data to queue """ try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() # Check queue size and remove oldest if full cursor.execute("SELECT COUNT(*) FROM sync_queue") count = cursor.fetchone()[0] if count >= self.max_queue_size: # Remove oldest records to make room remove_count = count - self.max_queue_size + 10 cursor.execute(""" DELETE FROM sync_queue WHERE id IN ( SELECT id FROM sync_queue ORDER BY validation_timestamp_unix ASC LIMIT ? ) """, (remove_count,)) logger.warning(f"Sync queue full, removed {remove_count} oldest records") # Insert new record cursor.execute(""" INSERT OR IGNORE INTO sync_queue (validation_timestamp_unix, payload, created_at) VALUES (?, ?, ?) """, ( payload["validation_timestamp_unix"], json.dumps(payload), datetime.utcnow().isoformat() )) conn.commit() conn.close() logger.debug(f"Queued validation record for later sync") except Exception as e: logger.error(f"Failed to queue record: {e}") def _process_queue(self): """Process queued records after successful connection""" try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() # Get queued records (oldest first) cursor.execute(""" SELECT id, payload FROM sync_queue ORDER BY validation_timestamp_unix ASC LIMIT ? """, (self.batch_size,)) rows = cursor.fetchall() conn.close() if not rows: return logger.info(f"Processing {len(rows)} queued records") # Parse payloads records = [] record_ids = [] for row_id, payload_json in rows: try: records.append(json.loads(payload_json)) record_ids.append(row_id) except json.JSONDecodeError: record_ids.append(row_id) # Still mark for deletion if corrupt if not records: return # Send batch if self._send_batch(records): # Remove sent records from queue self._remove_from_queue(record_ids) else: # Update attempt count self._update_attempt_count(record_ids) except Exception as e: logger.error(f"Error processing queue: {e}") def _remove_from_queue(self, record_ids: List[int]): """Remove successfully sent records from queue""" if not record_ids: return try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() placeholders = ','.join('?' * len(record_ids)) cursor.execute(f"DELETE FROM sync_queue WHERE id IN ({placeholders})", record_ids) conn.commit() conn.close() logger.debug(f"Removed {len(record_ids)} records from sync queue") except Exception as e: logger.error(f"Failed to remove records from queue: {e}") def _update_attempt_count(self, record_ids: List[int]): """Update attempt count for failed records""" if not record_ids: return try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() now = datetime.utcnow().isoformat() placeholders = ','.join('?' * len(record_ids)) cursor.execute(f""" UPDATE sync_queue SET attempts = attempts + 1, last_attempt_at = ? WHERE id IN ({placeholders}) """, [now] + record_ids) conn.commit() conn.close() except Exception as e: logger.error(f"Failed to update attempt count: {e}") def get_queue_status(self) -> Dict[str, Any]: """ Get current sync queue status. Returns: Dictionary with queue stats """ try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM sync_queue") count = cursor.fetchone()[0] cursor.execute("SELECT MIN(validation_timestamp_unix), MAX(validation_timestamp_unix) FROM sync_queue") oldest, newest = cursor.fetchone() conn.close() return { "queued_count": count, "oldest_timestamp": oldest, "newest_timestamp": newest, "queue_full": count >= self.max_queue_size } except Exception as e: logger.error(f"Failed to get queue status: {e}") return {"error": str(e)} def force_sync(self) -> bool: """ Force a sync of all queued records. Returns: bool: True if all records synced successfully """ logger.info("Starting forced sync of queued records") try: conn = sqlite3.connect(str(self.database_path), timeout=5.0) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM sync_queue") total = cursor.fetchone()[0] conn.close() if total == 0: logger.info("No records to sync") return True synced = 0 while True: # Check if queue is empty status = self.get_queue_status() if status.get("queued_count", 0) == 0: break # Process a batch before_count = status["queued_count"] self._process_queue() # Check if we made progress after_status = self.get_queue_status() if after_status.get("queued_count", 0) >= before_count: # No progress, connection likely failed logger.warning("Sync stalled, connection issue") break synced += before_count - after_status.get("queued_count", 0) logger.info(f"Force sync completed: {synced}/{total} records synced") return synced == total except Exception as e: logger.error(f"Force sync error: {e}") return False