<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
135 lines
5.8 KiB
Python
135 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Starlink GPS data fetcher
|
|
Fetches GPS coordinates from Starlink terminal via gRPC
|
|
Reuses logic from _old_project/starlink_location.py
|
|
"""
|
|
|
|
import sys
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, Optional, List
|
|
from config import Config
|
|
|
|
logger = logging.getLogger("gnss_guard.starlink_gps")
|
|
|
|
# Add starlink-grpc-tools to path
|
|
starlink_tools_path = Path(__file__).parent.parent / "starlink-grpc-tools"
|
|
if str(starlink_tools_path) not in sys.path:
|
|
sys.path.insert(0, str(starlink_tools_path))
|
|
|
|
try:
|
|
import starlink_grpc
|
|
except ImportError:
|
|
logger.error("Failed to import starlink_grpc. Make sure starlink-grpc-tools is available.")
|
|
starlink_grpc = None
|
|
|
|
|
|
class StarlinkGPSFetcher:
|
|
"""Fetcher for Starlink GPS coordinates"""
|
|
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.target_ip = f"{config.starlink_ip}:{config.starlink_port}"
|
|
|
|
def fetch(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetch GPS coordinates from Starlink terminal
|
|
|
|
Returns:
|
|
List of dictionaries with position data (starlink_location and starlink_gps)
|
|
Returns empty list if fetch fails
|
|
"""
|
|
if not self.config.starlink_enabled:
|
|
return []
|
|
|
|
if starlink_grpc is None:
|
|
logger.error("starlink_grpc module not available")
|
|
return []
|
|
|
|
max_retries = self.config.starlink_max_retries
|
|
results = []
|
|
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
# Create channel context
|
|
context = starlink_grpc.ChannelContext(target=self.target_ip)
|
|
|
|
# Get location data
|
|
try:
|
|
raw_location = starlink_grpc.get_location(context)
|
|
location_info = starlink_grpc.location_data(context)
|
|
|
|
# Extract Starlink Location coordinates
|
|
if location_info.get("latitude") is not None and location_info.get("longitude") is not None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
position_uncertainty = None
|
|
if hasattr(raw_location, 'sigma_m'):
|
|
try:
|
|
position_uncertainty = float(raw_location.sigma_m)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
results.append({
|
|
"source": "starlink_location",
|
|
"latitude": float(location_info.get("latitude")),
|
|
"longitude": float(location_info.get("longitude")),
|
|
"altitude": float(location_info.get("altitude", 0)),
|
|
"position_uncertainty_m": position_uncertainty,
|
|
"timestamp": timestamp.isoformat(),
|
|
"timestamp_unix": timestamp.timestamp(),
|
|
"supplementary_data": {
|
|
"location_source": str(raw_location.source) if hasattr(raw_location, 'source') else None,
|
|
"horizontal_speed_mps": raw_location.horizontal_speed_mps if hasattr(raw_location, 'horizontal_speed_mps') else None,
|
|
"vertical_speed_mps": raw_location.vertical_speed_mps if hasattr(raw_location, 'vertical_speed_mps') else None,
|
|
}
|
|
})
|
|
|
|
# Extract Starlink GPS (LLA) coordinates
|
|
if hasattr(raw_location, 'lla'):
|
|
lla = raw_location.lla
|
|
lla_data = {}
|
|
for attr in dir(lla):
|
|
if not attr.startswith('_') and not callable(getattr(lla, attr)):
|
|
try:
|
|
lla_data[attr] = getattr(lla, attr)
|
|
except:
|
|
pass
|
|
|
|
if lla_data.get('lat') is not None and lla_data.get('lon') is not None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
results.append({
|
|
"source": "starlink_gps",
|
|
"latitude": float(lla_data.get('lat')),
|
|
"longitude": float(lla_data.get('lon')),
|
|
"altitude": float(lla_data.get('alt', 0)),
|
|
"timestamp": timestamp.isoformat(),
|
|
"timestamp_unix": timestamp.timestamp(),
|
|
"supplementary_data": {
|
|
**{k: v for k, v in lla_data.items() if k not in ['lat', 'lon', 'alt', 'DESCRIPTOR']}
|
|
}
|
|
})
|
|
|
|
except starlink_grpc.GrpcError as e:
|
|
if attempt < max_retries:
|
|
logger.debug(f"Starlink GPS fetch attempt {attempt}/{max_retries} failed: {e}, retrying...")
|
|
continue
|
|
else:
|
|
logger.error(f"Failed to fetch Starlink location data after {max_retries} attempts: {e}")
|
|
return []
|
|
|
|
# Success - return results
|
|
return results
|
|
|
|
except Exception as e:
|
|
if attempt < max_retries:
|
|
logger.debug(f"Starlink GPS fetch attempt {attempt}/{max_retries} failed: {e}, retrying...")
|
|
continue
|
|
else:
|
|
logger.error(f"Unexpected error fetching Starlink GPS data after {max_retries} attempts: {e}")
|
|
return []
|
|
|
|
return []
|
|
|