#!/usr/bin/env python3 """ Starlink GPS data fetcher Fetches GPS coordinates from Starlink terminal via gRPC Reuses logic from _old_project/starlink_location.py """ import sys import logging from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, Optional, List from config import Config logger = logging.getLogger("gnss_guard.starlink_gps") # Add starlink-grpc-tools to path starlink_tools_path = Path(__file__).parent.parent / "starlink-grpc-tools" if str(starlink_tools_path) not in sys.path: sys.path.insert(0, str(starlink_tools_path)) try: import starlink_grpc except ImportError: logger.error("Failed to import starlink_grpc. Make sure starlink-grpc-tools is available.") starlink_grpc = None class StarlinkGPSFetcher: """Fetcher for Starlink GPS coordinates""" def __init__(self, config: Config): self.config = config self.target_ip = f"{config.starlink_ip}:{config.starlink_port}" def fetch(self) -> List[Dict[str, Any]]: """ Fetch GPS coordinates from Starlink terminal Returns: List of dictionaries with position data (starlink_location and starlink_gps) Returns empty list if fetch fails """ if not self.config.starlink_enabled: return [] if starlink_grpc is None: logger.error("starlink_grpc module not available") return [] max_retries = self.config.starlink_max_retries results = [] for attempt in range(1, max_retries + 1): try: # Create channel context context = starlink_grpc.ChannelContext(target=self.target_ip) # Get location data try: raw_location = starlink_grpc.get_location(context) location_info = starlink_grpc.location_data(context) # Extract Starlink Location coordinates if location_info.get("latitude") is not None and location_info.get("longitude") is not None: timestamp = datetime.now(timezone.utc) position_uncertainty = None if hasattr(raw_location, 'sigma_m'): try: position_uncertainty = float(raw_location.sigma_m) except (ValueError, TypeError): pass results.append({ "source": "starlink_location", "latitude": float(location_info.get("latitude")), "longitude": float(location_info.get("longitude")), "altitude": float(location_info.get("altitude", 0)), "position_uncertainty_m": position_uncertainty, "timestamp": timestamp.isoformat(), "timestamp_unix": timestamp.timestamp(), "supplementary_data": { "location_source": str(raw_location.source) if hasattr(raw_location, 'source') else None, "horizontal_speed_mps": raw_location.horizontal_speed_mps if hasattr(raw_location, 'horizontal_speed_mps') else None, "vertical_speed_mps": raw_location.vertical_speed_mps if hasattr(raw_location, 'vertical_speed_mps') else None, } }) # Extract Starlink GPS (LLA) coordinates if hasattr(raw_location, 'lla'): lla = raw_location.lla lla_data = {} for attr in dir(lla): if not attr.startswith('_') and not callable(getattr(lla, attr)): try: lla_data[attr] = getattr(lla, attr) except: pass if lla_data.get('lat') is not None and lla_data.get('lon') is not None: timestamp = datetime.now(timezone.utc) results.append({ "source": "starlink_gps", "latitude": float(lla_data.get('lat')), "longitude": float(lla_data.get('lon')), "altitude": float(lla_data.get('alt', 0)), "timestamp": timestamp.isoformat(), "timestamp_unix": timestamp.timestamp(), "supplementary_data": { **{k: v for k, v in lla_data.items() if k not in ['lat', 'lon', 'alt', 'DESCRIPTOR']} } }) except starlink_grpc.GrpcError as e: if attempt < max_retries: logger.debug(f"Starlink GPS fetch attempt {attempt}/{max_retries} failed: {e}, retrying...") continue else: logger.error(f"Failed to fetch Starlink location data after {max_retries} attempts: {e}") return [] # Success - return results return results except Exception as e: if attempt < max_retries: logger.debug(f"Starlink GPS fetch attempt {attempt}/{max_retries} failed: {e}, retrying...") continue else: logger.error(f"Unexpected error fetching Starlink GPS data after {max_retries} attempts: {e}") return [] return []