#!/usr/bin/env python3 """ TM AIS GPS data fetcher Fetches GPS coordinates from TM AIS GPS antenna via HTTP API """ import logging import requests from datetime import datetime, timezone from typing import Dict, Any, Optional from config import Config # Suppress SSL warnings for self-signed certificates import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger("gnss_guard.tm_ais_gps") class TMAISGPSFetcher: """Fetcher for TM AIS GPS coordinates""" def __init__(self, config: Config): self.config = config self.url = config.tm_ais_url self.token = config.tm_ais_token # Already trimmed in Config self.last_fetch_failed = False # Warn if token is empty if not self.token: logger.warning("TM AIS GPS token is empty - authentication will fail") def fetch(self) -> Optional[Dict[str, Any]]: """ Fetch GPS coordinates from TM AIS GPS antenna Returns: Dictionary with position data or None if fetch fails """ if not self.config.tm_ais_enabled: return None headers = {"Authorization": f"Bearer {self.token}"} max_retries = self.config.tm_ais_max_retries last_error = None # Log request details (mask token for security) token_preview = f"{self.token[:4]}..." if len(self.token) > 4 else "***" logger.debug(f"TM AIS GPS request: URL={self.url}, Token={token_preview}") # Try up to max_retries times for attempt_number in range(1, max_retries + 1): logger.info(f"TM AIS GPS fetch attempt {attempt_number}/{max_retries}") try: # Disable SSL verification for self-signed certificates (equivalent to curl -k) response = requests.get( self.url, headers=headers, verify=False, # Equivalent to curl -k flag timeout=5.0 ) # Log response status for debugging logger.debug(f"TM AIS GPS response status: {response.status_code}") response.raise_for_status() data = response.json() # Extract coordinates latitude = data.get("latitude") longitude = data.get("longitude") gps_timestamp = data.get("gps_timestamp") response_timestamp = data.get("response_timestamp") if latitude is None or longitude is None: logger.warning("TM AIS GPS response missing latitude or longitude") self.last_fetch_failed = True return None # Parse timestamps and convert to UTC gps_ts = None if gps_timestamp: try: # Parse timestamp (handles both Z and timezone offsets) parsed_ts = datetime.fromisoformat(gps_timestamp.replace("Z", "+00:00")) # Convert to UTC if timezone-aware, otherwise assume UTC if parsed_ts.tzinfo is not None: gps_ts = parsed_ts.astimezone(timezone.utc) else: gps_ts = parsed_ts.replace(tzinfo=timezone.utc) except Exception as e: logger.debug(f"Failed to parse GPS timestamp: {e}") response_ts = datetime.now(timezone.utc) if response_timestamp: try: # Parse timestamp (handles both Z and timezone offsets) parsed_ts = datetime.fromisoformat(response_timestamp.replace("Z", "+00:00")) # Convert to UTC if timezone-aware, otherwise assume UTC if parsed_ts.tzinfo is not None: response_ts = parsed_ts.astimezone(timezone.utc) else: response_ts = parsed_ts.replace(tzinfo=timezone.utc) except Exception as e: logger.debug(f"Failed to parse response timestamp: {e}") # Success - reset failure flag if self.last_fetch_failed: logger.info("TM AIS GPS connection restored") self.last_fetch_failed = False return { "source": "tm_ais", "latitude": float(latitude), "longitude": float(longitude), "altitude": None, "timestamp": gps_ts.isoformat() if gps_ts else response_ts.isoformat(), "timestamp_unix": (gps_ts or response_ts).timestamp(), "supplementary_data": { "gps_timestamp": gps_timestamp, "response_timestamp": response_timestamp, } } except requests.exceptions.HTTPError as e: # Log response body for 401 errors to help debug authentication issues if hasattr(e.response, 'status_code') and e.response.status_code == 401: try: error_body = e.response.text[:200] # Limit to first 200 chars logger.debug(f"TM AIS GPS 401 response body: {error_body}") except Exception: pass last_error = str(e) logger.info(f"TM AIS GPS attempt {attempt_number}/{max_retries} failed: {e}") # Continue to next attempt except requests.exceptions.RequestException as e: last_error = str(e) logger.info(f"TM AIS GPS attempt {attempt_number}/{max_retries} failed: {e}") # Continue to next attempt except Exception as e: last_error = str(e) logger.info(f"TM AIS GPS attempt {attempt_number}/{max_retries} unexpected error: {e}") # Continue to next attempt # All attempts failed if not self.last_fetch_failed: logger.error(f"Failed to fetch TM AIS GPS data after {max_retries} attempts. Last error: {last_error}") else: logger.debug(f"TM AIS GPS still unavailable after {max_retries} attempts") self.last_fetch_failed = True return None