<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
734 lines
32 KiB
Python
734 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
NMEA GPS data collector
|
|
Continuously collects GPS coordinates from NMEA devices via TCP connection
|
|
Filters for GGA sentences only and maintains latest position per source
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, Optional, List
|
|
from queue import Queue
|
|
|
|
from config import Config
|
|
from storage.logger import StructuredLogger
|
|
|
|
logger = logging.getLogger("gnss_guard.nmea_gps")
|
|
|
|
|
|
def strip_telnet_iac(data: bytes, diagnostic_mode: bool = False) -> bytes:
|
|
"""Remove Telnet IAC (Interpret As Command) sequences from data stream.
|
|
|
|
Telnet IAC sequences are 0xFF followed by command bytes:
|
|
- 0xFF 0xFB (WILL)
|
|
- 0xFF 0xFC (WONT)
|
|
- 0xFF 0xFD (DO)
|
|
- 0xFF 0xFE (DONT)
|
|
- 0xFF 0xFF (IAC escape - becomes single 0xFF)
|
|
|
|
These sequences are negotiation bytes and should be stripped before
|
|
processing NMEA data.
|
|
"""
|
|
if not data:
|
|
return data
|
|
|
|
result = bytearray()
|
|
i = 0
|
|
|
|
while i < len(data):
|
|
if data[i] == 0xFF: # IAC byte
|
|
if i + 1 < len(data):
|
|
cmd = data[i + 1]
|
|
|
|
# IAC IAC (0xFF 0xFF) is escaped IAC - keep single 0xFF
|
|
if cmd == 0xFF:
|
|
result.append(0xFF)
|
|
i += 2
|
|
continue
|
|
|
|
# IAC command sequences (WILL/WONT/DO/DONT)
|
|
if cmd in (0xFB, 0xFC, 0xFD, 0xFE):
|
|
if diagnostic_mode:
|
|
cmd_names = {0xFB: "WILL", 0xFC: "WONT", 0xFD: "DO", 0xFE: "DONT"}
|
|
logger.debug(f"[DIAGNOSTIC] Telnet IAC: 0xFF 0x{cmd:02X} ({cmd_names.get(cmd, 'UNKNOWN')})")
|
|
|
|
i += 2 # Skip IAC + command
|
|
|
|
# Some commands have an option byte
|
|
if i < len(data):
|
|
opt = data[i]
|
|
if diagnostic_mode:
|
|
logger.debug(f"[DIAGNOSTIC] Option: 0x{opt:02X}")
|
|
i += 1
|
|
else:
|
|
# Unknown IAC command - skip it
|
|
if diagnostic_mode:
|
|
logger.debug(f"[DIAGNOSTIC] Telnet IAC: 0xFF 0x{cmd:02X} (unknown, skipped)")
|
|
i += 2
|
|
else:
|
|
# Incomplete IAC at end of buffer - skip it
|
|
i += 1
|
|
else:
|
|
result.append(data[i])
|
|
i += 1
|
|
|
|
return bytes(result)
|
|
|
|
|
|
class NMEAParser:
|
|
"""Parser for NMEA 0183 sentences"""
|
|
|
|
@staticmethod
|
|
def validate_checksum(sentence: str) -> bool:
|
|
"""Validate NMEA sentence checksum"""
|
|
if "*" not in sentence:
|
|
return False
|
|
|
|
try:
|
|
data, checksum = sentence.split("*")
|
|
calculated = 0
|
|
for char in data[1:]: # Skip the '$'
|
|
calculated ^= ord(char)
|
|
return format(calculated, "02X") == checksum.upper()
|
|
except (ValueError, IndexError):
|
|
return False
|
|
|
|
@staticmethod
|
|
def parse_sentence(sentence: str) -> Dict[str, Any]:
|
|
"""Parse NMEA sentence into structured data"""
|
|
sentence = sentence.strip()
|
|
|
|
if not sentence.startswith("$"):
|
|
return {"error": "Invalid sentence format"}
|
|
|
|
# Validate checksum
|
|
checksum_valid = NMEAParser.validate_checksum(sentence)
|
|
|
|
try:
|
|
# Remove checksum if present
|
|
if "*" in sentence:
|
|
sentence = sentence.split("*")[0]
|
|
|
|
# Split into fields
|
|
fields = sentence[1:].split(",") # Remove $ and split
|
|
|
|
if len(fields) < 1:
|
|
return {"error": "Empty sentence"}
|
|
|
|
# Extract talker ID and sentence type
|
|
identifier = fields[0]
|
|
if len(identifier) >= 5:
|
|
# Handle special cases like SHEROT (should be S + HEROT)
|
|
if identifier.startswith("SHEROT"):
|
|
talker_id = "S"
|
|
sentence_type = "HEROT"
|
|
else:
|
|
talker_id = identifier[:2]
|
|
sentence_type = identifier[2:]
|
|
else:
|
|
talker_id = "UN"
|
|
sentence_type = identifier
|
|
|
|
parsed_data = {
|
|
"sentence_type": sentence_type,
|
|
"talker_id": talker_id,
|
|
"checksum_valid": checksum_valid,
|
|
"fields": fields[1:] if len(fields) > 1 else [],
|
|
}
|
|
|
|
# Parse specific sentence types for enhanced data extraction
|
|
if sentence_type == "GGA":
|
|
parsed_data.update(NMEAParser.parse_gga(fields))
|
|
else:
|
|
# For non-GGA sentences, just return basic parsing
|
|
pass
|
|
|
|
return parsed_data
|
|
|
|
except Exception as e:
|
|
return {"error": f"Parse error: {str(e)}"}
|
|
|
|
@staticmethod
|
|
def parse_gga(fields: List[str]) -> Dict[str, Any]:
|
|
"""Parse GGA (Global Positioning System Fix Data) sentence"""
|
|
result = {}
|
|
try:
|
|
# Time
|
|
if fields[1]:
|
|
result["time"] = fields[1]
|
|
|
|
# Latitude
|
|
if fields[2] and fields[3]:
|
|
lat_deg = float(fields[2][:2])
|
|
lat_min = float(fields[2][2:])
|
|
latitude = lat_deg + lat_min / 60
|
|
if fields[3] == "S":
|
|
latitude = -latitude
|
|
result["latitude"] = latitude
|
|
|
|
# Longitude
|
|
if fields[4] and fields[5]:
|
|
lon_deg = float(fields[4][:3])
|
|
lon_min = float(fields[4][3:])
|
|
longitude = lon_deg + lon_min / 60
|
|
if fields[5] == "W":
|
|
longitude = -longitude
|
|
result["longitude"] = longitude
|
|
|
|
# Quality and satellites
|
|
if len(fields) > 6 and fields[6]:
|
|
result["quality"] = int(fields[6])
|
|
if len(fields) > 7 and fields[7]:
|
|
result["satellites"] = int(fields[7])
|
|
if len(fields) > 8 and fields[8]:
|
|
result["hdop"] = float(fields[8])
|
|
if len(fields) > 9 and fields[9]:
|
|
result["altitude"] = float(fields[9])
|
|
|
|
return result
|
|
except (ValueError, IndexError):
|
|
return {}
|
|
|
|
|
|
class DeviceConnection:
|
|
"""Handles connection to a single NMEA device"""
|
|
|
|
def __init__(
|
|
self,
|
|
device_config: Dict[str, Any],
|
|
data_queue: Queue,
|
|
parser: NMEAParser,
|
|
vessel_info: Dict[str, Any],
|
|
diagnostic_mode: bool = False,
|
|
structured_logger: Optional[StructuredLogger] = None,
|
|
source_name: Optional[str] = None,
|
|
verbose_logging: bool = False,
|
|
):
|
|
self.device_config = device_config
|
|
self.data_queue = data_queue
|
|
self.parser = parser
|
|
self.vessel_info = vessel_info
|
|
self.diagnostic_mode = diagnostic_mode
|
|
self.structured_logger = structured_logger
|
|
self.source_name = source_name or device_config.get("id", "unknown")
|
|
self.verbose_logging = verbose_logging
|
|
self.running = False
|
|
self.sequence_number = 1
|
|
self.sentences_received = 0
|
|
self.last_sentence_log_time = time.time()
|
|
|
|
async def connect_and_collect(self):
|
|
"""Connect to device and start collecting data"""
|
|
self.running = True
|
|
device_ip = self.device_config["ip"]
|
|
device_port = self.device_config["port"]
|
|
device_id = self.device_config["id"]
|
|
|
|
logger.info(f"Starting connection to device {device_id} at {device_ip}:{device_port}")
|
|
if self.structured_logger:
|
|
self.structured_logger.info(
|
|
self.source_name,
|
|
f"Starting connection to device {device_id}",
|
|
{"device_ip": device_ip, "device_port": device_port}
|
|
)
|
|
|
|
if self.diagnostic_mode or self.verbose_logging:
|
|
logger.info(f"[DEBUG] Enhanced connection logging enabled for device {device_id}")
|
|
logger.info(f"[DEBUG] Target: {device_ip}:{device_port}")
|
|
|
|
while self.running:
|
|
try:
|
|
# Connect to device with timeout
|
|
connection_timeout = 10 # 10 seconds timeout for connection
|
|
if self.verbose_logging:
|
|
logger.info(f"[DEBUG] Attempting TCP connection to {device_ip}:{device_port} (timeout: {connection_timeout}s)...")
|
|
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(device_ip, device_port),
|
|
timeout=connection_timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.error(f"Connection TIMEOUT for device {device_id} at {device_ip}:{device_port} (no response in {connection_timeout}s)")
|
|
if self.verbose_logging:
|
|
logger.error(f"[DEBUG] TCP connection attempt timed out after {connection_timeout} seconds")
|
|
logger.error(f"[DEBUG] Possible causes: wrong IP, firewall blocking, device offline, network issue")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
f"Connection timeout for device {device_id}",
|
|
{"device_ip": device_ip, "device_port": device_port, "timeout": connection_timeout}
|
|
)
|
|
if self.running:
|
|
reconnect_delay = self.device_config.get("reconnect_delay", 5)
|
|
logger.info(f"Retrying connection to device {device_id} in {reconnect_delay} seconds...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
continue
|
|
|
|
# Log socket details if verbose
|
|
if self.verbose_logging:
|
|
sock = writer.get_extra_info('socket')
|
|
if sock:
|
|
local_addr = sock.getsockname()
|
|
peer_addr = sock.getpeername()
|
|
logger.info(f"[DEBUG] TCP connection established: local={local_addr} -> remote={peer_addr}")
|
|
|
|
logger.info(f"Connected to device {device_id} at {device_ip}:{device_port}")
|
|
if self.structured_logger:
|
|
self.structured_logger.info(
|
|
self.source_name,
|
|
f"Connected to device {device_id}",
|
|
{"device_ip": device_ip, "device_port": device_port}
|
|
)
|
|
|
|
# Buffer for accumulating data and extracting complete lines
|
|
buffer = b""
|
|
|
|
# Keep connection alive and read continuously
|
|
while self.running:
|
|
try:
|
|
# Read raw bytes from device with timeout
|
|
data = await asyncio.wait_for(reader.read(4096), timeout=30.0)
|
|
|
|
if not data:
|
|
logger.warning(f"No data received from device {device_id}, connection may be closed")
|
|
if self.verbose_logging:
|
|
logger.warning(f"[DEBUG] TCP read returned empty data - server closed connection or EOF")
|
|
if self.structured_logger:
|
|
self.structured_logger.warning(
|
|
self.source_name,
|
|
f"No data received from device {device_id}, connection may be closed"
|
|
)
|
|
break
|
|
|
|
# Strip Telnet IAC sequences before processing
|
|
cleaned_data = strip_telnet_iac(data, self.diagnostic_mode)
|
|
|
|
# Log data reception periodically (every 10 seconds) to show activity
|
|
current_time = time.time()
|
|
if current_time - self.last_sentence_log_time >= 10:
|
|
logger.debug(f"Received {len(cleaned_data)} bytes from {device_id} (total sentences: {self.sentences_received})")
|
|
self.last_sentence_log_time = current_time
|
|
|
|
# Add cleaned data to buffer
|
|
buffer += cleaned_data
|
|
|
|
# Process complete lines from buffer
|
|
while b"\n" in buffer or b"\r" in buffer:
|
|
# Find line ending (CRLF, LF, or CR)
|
|
line_end = -1
|
|
if b"\r\n" in buffer:
|
|
line_end = buffer.find(b"\r\n")
|
|
line = buffer[:line_end]
|
|
buffer = buffer[line_end + 2 :]
|
|
elif b"\n" in buffer:
|
|
line_end = buffer.find(b"\n")
|
|
line = buffer[:line_end]
|
|
buffer = buffer[line_end + 1 :]
|
|
elif b"\r" in buffer:
|
|
line_end = buffer.find(b"\r")
|
|
line = buffer[:line_end]
|
|
buffer = buffer[line_end + 1 :]
|
|
else:
|
|
break
|
|
|
|
# Decode and process NMEA sentence
|
|
try:
|
|
line_str = line.decode("ascii", errors="ignore").strip()
|
|
if line_str.startswith("$"):
|
|
self.sentences_received += 1
|
|
# Log first sentence and every 10th sentence to show activity (unless verbose logging is enabled)
|
|
# Verbose logging will be handled in the processing task
|
|
if not self.verbose_logging:
|
|
if self.sentences_received == 1:
|
|
logger.info(f"NMEA {device_id}: First sentence received: {line_str[:80]}")
|
|
elif self.sentences_received % 10 == 0:
|
|
logger.debug(f"NMEA {device_id}: Received sentence #{self.sentences_received}: {line_str[:50]}...")
|
|
await self.process_nmea_sentence(line_str, device_ip, device_port, device_id)
|
|
except Exception as e:
|
|
logger.debug(f"Error decoding line: {e}")
|
|
|
|
# Small delay to avoid overwhelming the system
|
|
read_delay = float(os.getenv("READ_DELAY_SECONDS", "0.1"))
|
|
await asyncio.sleep(read_delay)
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Timeout reading from device {device_id} (30s no data)")
|
|
if self.verbose_logging:
|
|
logger.warning(f"[DEBUG] Read timeout - device may be disconnected or not sending data")
|
|
logger.warning(f"[DEBUG] Total sentences received this session: {self.sentences_received}")
|
|
if self.structured_logger:
|
|
self.structured_logger.warning(
|
|
self.source_name,
|
|
f"Timeout reading from device {device_id}"
|
|
)
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error reading from device {device_id}: {e}")
|
|
if self.verbose_logging:
|
|
logger.error(f"[DEBUG] Read error type: {type(e).__name__}")
|
|
logger.error(f"[DEBUG] Read error details: {e}")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
f"Error reading from device {device_id}",
|
|
{"error": str(e)}
|
|
)
|
|
break
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
logger.info(f"Disconnected from device {device_id}")
|
|
if self.structured_logger:
|
|
self.structured_logger.info(
|
|
self.source_name,
|
|
f"Disconnected from device {device_id}"
|
|
)
|
|
|
|
except ConnectionRefusedError as e:
|
|
logger.error(f"Connection REFUSED for device {device_id} at {device_ip}:{device_port} - Is the device running?")
|
|
if self.verbose_logging:
|
|
logger.error(f"[DEBUG] ConnectionRefusedError: {e}")
|
|
logger.error(f"[DEBUG] This usually means: port is closed, no service listening, or firewall blocking")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
f"Connection refused for device {device_id}",
|
|
{"error": str(e), "device_ip": device_ip, "device_port": device_port}
|
|
)
|
|
|
|
if self.running:
|
|
reconnect_delay = self.device_config.get("reconnect_delay", 5)
|
|
logger.info(f"Retrying connection to device {device_id} in {reconnect_delay} seconds...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
except OSError as e:
|
|
# Catch network-level errors (no route, network unreachable, etc.)
|
|
logger.error(f"Network error for device {device_id} at {device_ip}:{device_port}: {e}")
|
|
if self.verbose_logging:
|
|
logger.error(f"[DEBUG] OSError: {e}")
|
|
logger.error(f"[DEBUG] Error code: {e.errno if hasattr(e, 'errno') else 'N/A'}")
|
|
logger.error(f"[DEBUG] This may indicate: wrong IP, network unreachable, or routing issue")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
f"Network error for device {device_id}",
|
|
{"error": str(e), "device_ip": device_ip, "device_port": device_port}
|
|
)
|
|
|
|
if self.running:
|
|
reconnect_delay = self.device_config.get("reconnect_delay", 5)
|
|
logger.info(f"Retrying connection to device {device_id} in {reconnect_delay} seconds...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
except asyncio.TimeoutError as e:
|
|
logger.error(f"Connection TIMEOUT for device {device_id} at {device_ip}:{device_port}")
|
|
if self.verbose_logging:
|
|
logger.error(f"[DEBUG] Connection attempt timed out")
|
|
logger.error(f"[DEBUG] This may indicate: wrong IP, firewall, or device not responding")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
f"Connection timeout for device {device_id}",
|
|
{"device_ip": device_ip, "device_port": device_port}
|
|
)
|
|
|
|
if self.running:
|
|
reconnect_delay = self.device_config.get("reconnect_delay", 5)
|
|
logger.info(f"Retrying connection to device {device_id} in {reconnect_delay} seconds...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Connection error for device {device_id}: {e}")
|
|
if self.verbose_logging:
|
|
logger.error(f"[DEBUG] Exception type: {type(e).__name__}")
|
|
logger.error(f"[DEBUG] Exception details: {e}")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
f"Connection error for device {device_id}",
|
|
{"error": str(e), "error_type": type(e).__name__, "device_ip": device_ip, "device_port": device_port}
|
|
)
|
|
|
|
if self.running:
|
|
reconnect_delay = self.device_config.get("reconnect_delay", 5)
|
|
logger.info(f"Retrying connection to device {device_id} in {reconnect_delay} seconds...")
|
|
if self.structured_logger:
|
|
self.structured_logger.info(
|
|
self.source_name,
|
|
f"Retrying connection to device {device_id}",
|
|
{"reconnect_delay": reconnect_delay}
|
|
)
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
async def process_nmea_sentence(self, sentence: str, source_ip: str, source_port: int, device_id: str):
|
|
"""Process a single NMEA sentence"""
|
|
try:
|
|
start_time = time.time()
|
|
|
|
# Parse the sentence
|
|
parsed_data = self.parser.parse_sentence(sentence)
|
|
|
|
# Create record
|
|
now = datetime.now(timezone.utc)
|
|
record = {
|
|
"timestamp": now.isoformat(),
|
|
"timestamp_unix": now.timestamp() * 1000, # milliseconds
|
|
"vessel": self.vessel_info,
|
|
"source_ip": source_ip,
|
|
"source_port": source_port,
|
|
"device_id": device_id,
|
|
"raw_nmea": sentence,
|
|
"parsed_data": parsed_data,
|
|
"validation": {
|
|
"checksum_valid": parsed_data.get("checksum_valid", False),
|
|
"parse_successful": "error" not in parsed_data,
|
|
"errors": ([parsed_data.get("error")] if "error" in parsed_data else []),
|
|
},
|
|
"collection_metadata": {
|
|
"collector_version": "1.0.0",
|
|
"processing_delay_ms": int((time.time() - start_time) * 1000),
|
|
"sequence_number": self.sequence_number,
|
|
},
|
|
}
|
|
|
|
self.sequence_number += 1
|
|
|
|
# Add to queue for processing
|
|
self.data_queue.put(record)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing NMEA sentence from device {device_id}: {e}")
|
|
|
|
def stop(self):
|
|
"""Stop device connection"""
|
|
self.running = False
|
|
|
|
|
|
class NMEAGPSCollector:
|
|
"""Collector for NMEA GPS coordinates from vessel GPS devices"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: Config,
|
|
source_name: str,
|
|
device_ip: str,
|
|
device_port: int,
|
|
structured_logger: Optional[StructuredLogger] = None
|
|
):
|
|
"""
|
|
Initialize NMEA GPS collector
|
|
|
|
Args:
|
|
config: Configuration object
|
|
source_name: Source identifier (e.g., "nmea_primary", "nmea_secondary")
|
|
device_ip: IP address of NMEA device
|
|
device_port: Port of NMEA device
|
|
structured_logger: Optional StructuredLogger instance for JSON logging
|
|
"""
|
|
self.config = config
|
|
self.source_name = source_name
|
|
self.device_ip = device_ip
|
|
self.device_port = device_port
|
|
self.structured_logger = structured_logger
|
|
self.latest_position: Optional[Dict[str, Any]] = None
|
|
self.lock = asyncio.Lock()
|
|
|
|
self.parser = NMEAParser()
|
|
self.data_queue = Queue()
|
|
self.device_config = {
|
|
"id": source_name,
|
|
"ip": device_ip,
|
|
"port": device_port,
|
|
"reconnect_delay": 5
|
|
}
|
|
self.vessel_info = {"serial": source_name}
|
|
self.connection = None
|
|
self.running = False
|
|
self.gga_count_period = 0
|
|
self.last_activity_log_time = time.time()
|
|
|
|
async def start(self):
|
|
"""Start the NMEA collector as an async task"""
|
|
if not self.device_ip or self.device_port == 0:
|
|
logger.warning(f"NMEA collector {self.source_name} not configured (missing IP/port)")
|
|
if self.structured_logger:
|
|
self.structured_logger.warning(
|
|
self.source_name,
|
|
"NMEA collector not configured",
|
|
{"reason": "missing IP/port"}
|
|
)
|
|
return
|
|
|
|
self.running = True
|
|
|
|
# Log verbose mode settings
|
|
if self.config.nmea_verbose_logging:
|
|
logger.info(f"[DEBUG] ========== NMEA DEBUG MODE ENABLED for {self.source_name} ==========")
|
|
logger.info(f"[DEBUG] Device configuration:")
|
|
logger.info(f"[DEBUG] IP: {self.device_ip}")
|
|
logger.info(f"[DEBUG] Port: {self.device_port}")
|
|
logger.info(f"[DEBUG] Source name: {self.source_name}")
|
|
logger.info(f"[DEBUG] Will show: connection attempts, TCP details, all NMEA sentences, errors")
|
|
|
|
self.connection = DeviceConnection(
|
|
device_config=self.device_config,
|
|
data_queue=self.data_queue,
|
|
parser=self.parser,
|
|
vessel_info=self.vessel_info,
|
|
diagnostic_mode=self.config.nmea_verbose_logging, # Enable diagnostic mode when verbose
|
|
structured_logger=self.structured_logger,
|
|
source_name=self.source_name,
|
|
verbose_logging=self.config.nmea_verbose_logging
|
|
)
|
|
|
|
# Start connection task
|
|
asyncio.create_task(self._connection_task())
|
|
# Start processing task
|
|
asyncio.create_task(self._processing_task())
|
|
|
|
async def _connection_task(self):
|
|
"""Task that manages the device connection"""
|
|
await self.connection.connect_and_collect()
|
|
|
|
async def _processing_task(self):
|
|
"""Task that processes NMEA sentences from the queue"""
|
|
while self.running:
|
|
try:
|
|
# Check if queue has items (non-blocking)
|
|
try:
|
|
record = self.data_queue.get_nowait()
|
|
except:
|
|
# Queue is empty, sleep and continue
|
|
# Log periodic activity summary (every 30 seconds)
|
|
current_time = time.time()
|
|
if current_time - self.last_activity_log_time >= 30:
|
|
if self.gga_count_period > 0:
|
|
# Only log activity summary if verbose logging is enabled
|
|
if self.config.nmea_verbose_logging:
|
|
logger.info(f"NMEA {self.source_name}: Activity - {self.gga_count_period} GGA sentences processed in last 30s")
|
|
else:
|
|
# Always log warnings if no GGA sentences received (important for diagnostics)
|
|
logger.warning(f"NMEA {self.source_name}: No GGA sentences received in last 30s (checking connection...)")
|
|
self.gga_count_period = 0
|
|
self.last_activity_log_time = current_time
|
|
await asyncio.sleep(0.1)
|
|
continue
|
|
|
|
# Process only GGA sentences
|
|
parsed_data = record.get("parsed_data", {})
|
|
sentence_type = parsed_data.get("sentence_type", "")
|
|
|
|
# Log all sentences if verbose logging is enabled
|
|
if self.config.nmea_verbose_logging:
|
|
raw_nmea = record.get("raw_nmea", "")
|
|
logger.info(f"NMEA {self.source_name}: [{sentence_type}] {raw_nmea[:100]}")
|
|
|
|
if sentence_type == "GGA":
|
|
self.gga_count_period += 1
|
|
# Only log GGA count if verbose logging is enabled
|
|
if self.config.nmea_verbose_logging:
|
|
logger.info(f"NMEA {self.source_name}: Received GGA sentence (total this period: {self.gga_count_period})")
|
|
await self._process_gga(record)
|
|
else:
|
|
# Log non-GGA sentences at debug level (unless verbose logging is enabled)
|
|
if not self.config.nmea_verbose_logging:
|
|
logger.debug(f"Received {sentence_type} sentence from {self.source_name} (not processing)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in NMEA processing task for {self.source_name}: {e}")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
"Error in NMEA processing task",
|
|
{"error": str(e)}
|
|
)
|
|
await asyncio.sleep(1.0)
|
|
|
|
async def _process_gga(self, record: Dict[str, Any]):
|
|
"""Process a GGA sentence and update latest position"""
|
|
try:
|
|
parsed_data = record.get("parsed_data", {})
|
|
|
|
# Extract coordinates from parsed GGA data
|
|
latitude = parsed_data.get("latitude")
|
|
longitude = parsed_data.get("longitude")
|
|
altitude = parsed_data.get("altitude")
|
|
|
|
if latitude is None or longitude is None:
|
|
logger.debug(f"GGA sentence from {self.source_name} missing coordinates")
|
|
return
|
|
|
|
# Get timestamp
|
|
timestamp_str = record.get("timestamp", "")
|
|
try:
|
|
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
|
|
except:
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
# Update latest position
|
|
async with self.lock:
|
|
self.latest_position = {
|
|
"source": self.source_name,
|
|
"latitude": float(latitude),
|
|
"longitude": float(longitude),
|
|
"altitude": float(altitude) if altitude is not None else None,
|
|
"timestamp": timestamp.isoformat(),
|
|
"timestamp_unix": timestamp.timestamp(),
|
|
"supplementary_data": {
|
|
"satellites": parsed_data.get("satellites"),
|
|
"quality": parsed_data.get("quality"),
|
|
"hdop": parsed_data.get("hdop"),
|
|
"time": parsed_data.get("time"),
|
|
"raw_nmea": record.get("raw_nmea"),
|
|
}
|
|
}
|
|
|
|
# Log successful position update only if verbose logging is enabled
|
|
if self.config.nmea_verbose_logging:
|
|
logger.info(
|
|
f"NMEA {self.source_name}: Updated position - "
|
|
f"Lat: {latitude:.6f}, Lon: {longitude:.6f}, "
|
|
f"Alt: {altitude:.1f}m, Satellites: {parsed_data.get('satellites', 'N/A')}, "
|
|
f"Quality: {parsed_data.get('quality', 'N/A')}"
|
|
)
|
|
if self.structured_logger:
|
|
self.structured_logger.info(
|
|
self.source_name,
|
|
"Position updated from GGA sentence",
|
|
{
|
|
"latitude": latitude,
|
|
"longitude": longitude,
|
|
"altitude": altitude,
|
|
"satellites": parsed_data.get("satellites"),
|
|
"quality": parsed_data.get("quality"),
|
|
"hdop": parsed_data.get("hdop")
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing GGA sentence from {self.source_name}: {e}")
|
|
if self.structured_logger:
|
|
self.structured_logger.error(
|
|
self.source_name,
|
|
"Error processing GGA sentence",
|
|
{"error": str(e)}
|
|
)
|
|
|
|
async def get_latest_position(self) -> Optional[Dict[str, Any]]:
|
|
"""Get the latest position from this collector"""
|
|
async with self.lock:
|
|
if self.latest_position:
|
|
# Create a copy to avoid race conditions
|
|
return dict(self.latest_position)
|
|
return None
|
|
|
|
async def stop(self):
|
|
"""Stop the collector"""
|
|
self.running = False
|
|
if self.connection:
|
|
self.connection.stop()
|