Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

428 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Server Sync Service for GNSS Guard Client
Syncs validation data to the central GNSS Guard Server.
Features:
- Immediate sync on each validation
- Offline queue for failed syncs
- Batch catchup for queued records
"""
import json
import logging
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional
import requests
logger = logging.getLogger("gnss_guard.server_sync")
class ServerSync:
"""
Syncs validation data to the central GNSS Guard Server.
Features:
- Sends validation results to server after each iteration
- Queues failed requests for retry
- Batch sends queued records on successful connection
"""
def __init__(
self,
database_path: Path,
server_url: str,
server_token: str,
asset_name: str,
batch_size: int = 100,
max_queue_size: int = 1000
):
"""
Initialize server sync service.
Args:
database_path: Path to SQLite database (for sync queue)
server_url: Base URL of GNSS Guard Server
server_token: Authentication token for this asset
asset_name: Name of this asset
batch_size: Max records to send in batch catchup
max_queue_size: Max records to keep in queue
"""
self.database_path = database_path
self.server_url = server_url.rstrip('/')
self.server_token = server_token
self.asset_name = asset_name
self.batch_size = batch_size
self.max_queue_size = max_queue_size
# Request timeout (seconds)
self.timeout = 10
# Initialize sync queue table
self._init_sync_queue_table()
logger.info(f"Server sync initialized for asset '{asset_name}' -> {server_url}")
def _init_sync_queue_table(self):
"""Create sync_queue table if it doesn't exist"""
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
validation_timestamp_unix REAL NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
attempts INTEGER DEFAULT 0,
last_attempt_at TEXT,
UNIQUE(validation_timestamp_unix)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_sync_queue_timestamp
ON sync_queue(validation_timestamp_unix)
""")
conn.commit()
conn.close()
logger.debug("Sync queue table initialized")
except Exception as e:
logger.error(f"Failed to initialize sync queue table: {e}")
def _get_headers(self) -> Dict[str, str]:
"""Get request headers with authentication"""
return {
"Authorization": f"Bearer {self.server_token}",
"Content-Type": "application/json"
}
def sync_validation(self, validation_result: Dict[str, Any]) -> bool:
"""
Sync a validation result to the server.
If sync fails, the record is queued for later retry.
If sync succeeds, attempt to send any queued records.
Args:
validation_result: Validation result from CoordinateValidator
Returns:
bool: True if sync succeeded, False if queued
"""
# Prepare payload
payload = {
"validation_timestamp": validation_result.get("validation_timestamp"),
"validation_timestamp_unix": validation_result.get("validation_timestamp_unix"),
"is_valid": validation_result.get("is_valid", False),
"sources_missing": validation_result.get("sources_missing", []),
"sources_stale": validation_result.get("sources_stale", []),
"coordinate_differences": validation_result.get("coordinate_differences", {}),
"source_coordinates": validation_result.get("source_coordinates", {}),
"validation_details": validation_result.get("validation_details", {}),
}
# Try to send
success = self._send_validation(payload)
if success:
# On success, try to send queued records
self._process_queue()
else:
# On failure, queue the record
self._queue_record(payload)
return success
def _send_validation(self, payload: Dict[str, Any]) -> bool:
"""
Send a single validation record to the server.
Args:
payload: Validation data to send
Returns:
bool: True if successful
"""
try:
url = f"{self.server_url}/api/v1/validation"
response = requests.post(
url,
json=payload,
headers=self._get_headers(),
timeout=self.timeout
)
if response.status_code == 201:
logger.debug(f"Validation synced to server")
return True
elif response.status_code == 401:
logger.error(f"Server auth failed - check SERVER_TOKEN")
return False
else:
logger.warning(f"Server returned {response.status_code}: {response.text[:200]}")
return False
except requests.exceptions.Timeout:
logger.warning(f"Server request timed out")
return False
except requests.exceptions.ConnectionError:
logger.warning(f"Cannot connect to server at {self.server_url}")
return False
except Exception as e:
logger.error(f"Server sync error: {e}")
return False
def _send_batch(self, records: List[Dict[str, Any]]) -> bool:
"""
Send a batch of validation records to the server.
Args:
records: List of validation payloads
Returns:
bool: True if successful
"""
try:
url = f"{self.server_url}/api/v1/validation/batch"
response = requests.post(
url,
json={"records": records},
headers=self._get_headers(),
timeout=self.timeout * 3 # Longer timeout for batch
)
if response.status_code == 201:
result = response.json()
logger.info(f"Batch sync: {result.get('saved', 0)} saved, {result.get('skipped', 0)} skipped")
return True
else:
logger.warning(f"Batch sync failed: {response.status_code}")
return False
except Exception as e:
logger.error(f"Batch sync error: {e}")
return False
def _queue_record(self, payload: Dict[str, Any]):
"""
Add a validation record to the sync queue.
Args:
payload: Validation data to queue
"""
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
# Check queue size and remove oldest if full
cursor.execute("SELECT COUNT(*) FROM sync_queue")
count = cursor.fetchone()[0]
if count >= self.max_queue_size:
# Remove oldest records to make room
remove_count = count - self.max_queue_size + 10
cursor.execute("""
DELETE FROM sync_queue
WHERE id IN (
SELECT id FROM sync_queue
ORDER BY validation_timestamp_unix ASC
LIMIT ?
)
""", (remove_count,))
logger.warning(f"Sync queue full, removed {remove_count} oldest records")
# Insert new record
cursor.execute("""
INSERT OR IGNORE INTO sync_queue
(validation_timestamp_unix, payload, created_at)
VALUES (?, ?, ?)
""", (
payload["validation_timestamp_unix"],
json.dumps(payload),
datetime.utcnow().isoformat()
))
conn.commit()
conn.close()
logger.debug(f"Queued validation record for later sync")
except Exception as e:
logger.error(f"Failed to queue record: {e}")
def _process_queue(self):
"""Process queued records after successful connection"""
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
# Get queued records (oldest first)
cursor.execute("""
SELECT id, payload FROM sync_queue
ORDER BY validation_timestamp_unix ASC
LIMIT ?
""", (self.batch_size,))
rows = cursor.fetchall()
conn.close()
if not rows:
return
logger.info(f"Processing {len(rows)} queued records")
# Parse payloads
records = []
record_ids = []
for row_id, payload_json in rows:
try:
records.append(json.loads(payload_json))
record_ids.append(row_id)
except json.JSONDecodeError:
record_ids.append(row_id) # Still mark for deletion if corrupt
if not records:
return
# Send batch
if self._send_batch(records):
# Remove sent records from queue
self._remove_from_queue(record_ids)
else:
# Update attempt count
self._update_attempt_count(record_ids)
except Exception as e:
logger.error(f"Error processing queue: {e}")
def _remove_from_queue(self, record_ids: List[int]):
"""Remove successfully sent records from queue"""
if not record_ids:
return
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
placeholders = ','.join('?' * len(record_ids))
cursor.execute(f"DELETE FROM sync_queue WHERE id IN ({placeholders})", record_ids)
conn.commit()
conn.close()
logger.debug(f"Removed {len(record_ids)} records from sync queue")
except Exception as e:
logger.error(f"Failed to remove records from queue: {e}")
def _update_attempt_count(self, record_ids: List[int]):
"""Update attempt count for failed records"""
if not record_ids:
return
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
now = datetime.utcnow().isoformat()
placeholders = ','.join('?' * len(record_ids))
cursor.execute(f"""
UPDATE sync_queue
SET attempts = attempts + 1, last_attempt_at = ?
WHERE id IN ({placeholders})
""", [now] + record_ids)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Failed to update attempt count: {e}")
def get_queue_status(self) -> Dict[str, Any]:
"""
Get current sync queue status.
Returns:
Dictionary with queue stats
"""
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sync_queue")
count = cursor.fetchone()[0]
cursor.execute("SELECT MIN(validation_timestamp_unix), MAX(validation_timestamp_unix) FROM sync_queue")
oldest, newest = cursor.fetchone()
conn.close()
return {
"queued_count": count,
"oldest_timestamp": oldest,
"newest_timestamp": newest,
"queue_full": count >= self.max_queue_size
}
except Exception as e:
logger.error(f"Failed to get queue status: {e}")
return {"error": str(e)}
def force_sync(self) -> bool:
"""
Force a sync of all queued records.
Returns:
bool: True if all records synced successfully
"""
logger.info("Starting forced sync of queued records")
try:
conn = sqlite3.connect(str(self.database_path), timeout=5.0)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sync_queue")
total = cursor.fetchone()[0]
conn.close()
if total == 0:
logger.info("No records to sync")
return True
synced = 0
while True:
# Check if queue is empty
status = self.get_queue_status()
if status.get("queued_count", 0) == 0:
break
# Process a batch
before_count = status["queued_count"]
self._process_queue()
# Check if we made progress
after_status = self.get_queue_status()
if after_status.get("queued_count", 0) >= before_count:
# No progress, connection likely failed
logger.warning("Sync stalled, connection issue")
break
synced += before_count - after_status.get("queued_count", 0)
logger.info(f"Force sync completed: {synced}/{total} records synced")
return synced == total
except Exception as e:
logger.error(f"Force sync error: {e}")
return False