""" 5G Connection Manager - manages modem connection lifecycle. Migrates functionality from connect-5g.sh to native Python. """ import logging import os import subprocess import threading import time from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional, Callable from at_client import ATClient logger = logging.getLogger(__name__) class ConnectionState(Enum): DISCONNECTED = "disconnected" CONNECTING = "connecting" CONNECTED = "connected" FAILED = "failed" RECONNECTING = "reconnecting" @dataclass class ConnectionConfig: """Configuration for 5G connection.""" at_port: str = "/dev/ttyUSB1" apn: str = "internet" wan_if: str = "eth1" lan_if: str = "eth0.100" failover_enabled: bool = False failover_if: str = "eth0" failover_metric: int = 202 watchdog_interval: int = 0 # 0 = disabled log_signal: bool = True weak_signal_csq: int = 10 dns_servers: Optional[str] = None # Comma-separated fallback DNS @classmethod def from_file(cls, path: str = "/etc/5g-router.conf") -> "ConnectionConfig": """Load config from shell-style config file.""" config = cls() if not os.path.exists(path): return config try: with open(path, "r") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key == "AT_PORT": config.at_port = value elif key == "APN": config.apn = value elif key == "WAN_IF": config.wan_if = value elif key == "LAN_IF": config.lan_if = value elif key == "FAILOVER_ENABLED": config.failover_enabled = value.lower() == "yes" elif key == "FAILOVER_IF": config.failover_if = value elif key == "FAILOVER_METRIC": config.failover_metric = int(value) elif key == "WATCHDOG_INTERVAL": config.watchdog_interval = int(value) elif key == "LOG_SIGNAL": config.log_signal = value.lower() == "yes" elif key == "WEAK_SIGNAL_CSQ": config.weak_signal_csq = int(value) elif key == "DNS_SERVERS": config.dns_servers = value except Exception as e: logger.warning(f"Error reading config {path}: {e}") return config @dataclass class ConnectionStatus: """Current connection status.""" state: ConnectionState = ConnectionState.DISCONNECTED ip_address: Optional[str] = None dns1: Optional[str] = None dns2: Optional[str] = None signal_csq: Optional[int] = None last_connect_time: Optional[datetime] = None last_error: Optional[str] = None connect_attempts: int = 0 class ConnectionManager: """ Manages 5G modem connection lifecycle. Thread-safe, supports background watchdog. """ LOG_FILE = "/var/log/5g-router.log" def __init__(self, at_client: ATClient, config: Optional[ConnectionConfig] = None): self.at = at_client self.config = config or ConnectionConfig() self.status = ConnectionStatus() self._lock = threading.Lock() self._watchdog_thread: Optional[threading.Thread] = None self._watchdog_stop = threading.Event() self._log_callbacks: List[Callable[[str], None]] = [] def _log(self, message: str): """Log message to file and callbacks.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{timestamp}] {message}" logger.info(message) # Write to log file try: with open(self.LOG_FILE, "a") as f: f.write(line + "\n") except Exception: pass # Notify callbacks for cb in self._log_callbacks: try: cb(line) except Exception: pass def add_log_callback(self, callback: Callable[[str], None]): """Add callback to receive log messages.""" self._log_callbacks.append(callback) def _run_cmd(self, cmd: List[str], check: bool = False) -> subprocess.CompletedProcess: """Run shell command.""" try: return subprocess.run(cmd, capture_output=True, text=True, timeout=30, check=check) except subprocess.TimeoutExpired: logger.warning(f"Command timed out: {' '.join(cmd)}") return subprocess.CompletedProcess(cmd, 1, "", "timeout") except subprocess.CalledProcessError as e: return subprocess.CompletedProcess(cmd, e.returncode, e.stdout or "", e.stderr or "") except Exception as e: logger.error(f"Command failed: {e}") return subprocess.CompletedProcess(cmd, 1, "", str(e)) def _check_usb_mode(self) -> bool: """Check if modem is in correct USB mode (40).""" result = self._run_cmd(["lsusb"]) if "0e8d:7127" in result.stdout: self._log("WARN: Modem is in Mode 41 (7127). AT commands may not work.") return False return True def _wait_for_modem(self, max_wait: int = 30) -> bool: """Wait for modem AT port to be available.""" self._log("Waiting for modem AT port...") for i in range(max_wait // 2): # Check if port exists as char device if os.path.exists(self.config.at_port): import stat mode = os.stat(self.config.at_port).st_mode if stat.S_ISCHR(mode): # Try to communicate if self.at.test(): self._log("Modem AT port available") return True # Try to find working port working = self.at.find_working_port() if working: self.config.at_port = working self._log(f"Found working AT port: {working}") return True time.sleep(2) self._log("Modem AT port not available") return False def _configure_interface(self, ip: str) -> bool: """Configure network interface with IP from modem.""" wan_if = self.config.wan_if self._log(f"Configuring {wan_if} with IP {ip}") try: # Bring up interface self._run_cmd(["ip", "link", "set", wan_if, "up"]) # Flush existing addresses self._run_cmd(["ip", "addr", "flush", "dev", wan_if]) # Add IP address result = self._run_cmd(["ip", "addr", "add", f"{ip}/32", "dev", wan_if]) if result.returncode != 0 and "RTNETLINK answers: File exists" not in result.stderr: self._log(f"Failed to add IP: {result.stderr}") return False # Add default route self._run_cmd(["ip", "route", "add", "default", "dev", wan_if, "metric", "50"]) self._log("Interface configured") return True except Exception as e: self._log(f"Interface configuration failed: {e}") return False def _configure_nat(self) -> bool: """Setup NAT/masquerading for LAN clients.""" self._log("Setting up NAT...") wan_if = self.config.wan_if lan_if = self.config.lan_if try: # Enable IP forwarding with open("/proc/sys/net/ipv4/ip_forward", "w") as f: f.write("1") # Masquerade outgoing traffic self._run_cmd([ "iptables", "-t", "nat", "-C", "POSTROUTING", "-o", wan_if, "-j", "MASQUERADE" ]) if self._run_cmd([ "iptables", "-t", "nat", "-C", "POSTROUTING", "-o", wan_if, "-j", "MASQUERADE" ]).returncode != 0: self._run_cmd([ "iptables", "-t", "nat", "-A", "POSTROUTING", "-o", wan_if, "-j", "MASQUERADE" ]) # Forward from LAN to WAN if self._run_cmd([ "iptables", "-C", "FORWARD", "-i", lan_if, "-o", wan_if, "-j", "ACCEPT" ]).returncode != 0: self._run_cmd([ "iptables", "-A", "FORWARD", "-i", lan_if, "-o", wan_if, "-j", "ACCEPT" ]) # Allow established/related back if self._run_cmd([ "iptables", "-C", "FORWARD", "-i", wan_if, "-o", lan_if, "-m", "state", "--state", "RELATED,ESTABLISHED", "-j", "ACCEPT" ]).returncode != 0: self._run_cmd([ "iptables", "-A", "FORWARD", "-i", wan_if, "-o", lan_if, "-m", "state", "--state", "RELATED,ESTABLISHED", "-j", "ACCEPT" ]) self._log("NAT configured") return True except Exception as e: self._log(f"NAT configuration failed: {e}") return False def _configure_dns(self, dns1: Optional[str], dns2: Optional[str]): """Configure /etc/resolv.conf with DNS servers.""" if not dns1: # Try fallback from config if self.config.dns_servers: parts = self.config.dns_servers.split(",") dns1 = parts[0].strip() if len(parts) > 0 else None dns2 = parts[1].strip() if len(parts) > 1 else None if dns1: try: with open("/etc/resolv.conf", "w") as f: f.write(f"nameserver {dns1}\n") if dns2: f.write(f"nameserver {dns2}\n") self._log("DNS configured") except Exception as e: self._log(f"DNS configuration failed: {e}") def _do_failover(self): """Setup failover route if enabled.""" if not self.config.failover_enabled: return self._run_cmd([ "ip", "route", "add", "default", "dev", self.config.failover_if, "metric", str(self.config.failover_metric) ]) self._log(f"Failover route via {self.config.failover_if} configured") def connect(self) -> bool: """ Establish 5G connection. Returns True on success. """ with self._lock: self.status.state = ConnectionState.CONNECTING self.status.connect_attempts += 1 self.status.last_error = None self._log("Starting 5G connection...") try: # Check USB mode if not self._check_usb_mode(): self._set_failed("Modem in wrong USB mode") self._do_failover() return False # Wait for modem if not self._wait_for_modem(): self._set_failed("Modem AT port not available") self._do_failover() return False # Test AT communication with retries at_ok = False for attempt in range(1, 4): if self.at.test(): at_ok = True break if attempt < 3: self._log(f"AT not OK, retry {attempt}/3 in 5s...") time.sleep(5) if not at_ok: self._set_failed("AT communication failed") self._do_failover() return False # Log signal strength csq = self.at.get_signal_csq() if csq is not None: self.status.signal_csq = csq if self.config.log_signal: self._log(f"Signal CSQ={csq}") if csq < self.config.weak_signal_csq: self._log(f"WARN: weak signal CSQ={csq}") # Configure APN self._log(f"Configuring APN: {self.config.apn}") self.at.configure_apn(self.config.apn) # Activate PDP context self._log("Activating PDP context...") self.at.activate_pdp() # Get IP address with retries ip = None for attempt in range(1, 6): ip = self.at.get_pdp_address() if ip: break if attempt < 5: self._log(f"No IP yet, retry {attempt}/5 in 3s...") time.sleep(3) if not ip: self._set_failed("Could not get modem IP") self._do_failover() return False # Get DNS from modem params = self.at.get_connection_params() dns1 = params.get("dns1") dns2 = params.get("dns2") # Configure DNS self._configure_dns(dns1, dns2) # Configure interface if not self._configure_interface(ip): self._set_failed("Interface configuration failed") self._do_failover() return False # Setup NAT if not self._configure_nat(): self._set_failed("NAT configuration failed") # Don't fail completely, connection might still work pass # Success! with self._lock: self.status.state = ConnectionState.CONNECTED self.status.ip_address = ip self.status.dns1 = dns1 self.status.dns2 = dns2 self.status.last_connect_time = datetime.now() self._log("5G connection successful!") return True except Exception as e: self._set_failed(f"Connection error: {e}") self._do_failover() return False def _set_failed(self, error: str): """Set connection state to failed.""" self._log(error) with self._lock: self.status.state = ConnectionState.FAILED self.status.last_error = error def disconnect(self) -> bool: """ Disconnect 5G connection. """ self._log("Disconnecting 5G...") with self._lock: self.status.state = ConnectionState.DISCONNECTED self.status.ip_address = None try: # Deactivate PDP self.at.deactivate_pdp() # Bring down interface self._run_cmd(["ip", "link", "set", self.config.wan_if, "down"]) # Remove routes self._run_cmd(["ip", "route", "del", "default", "dev", self.config.wan_if]) self._log("5G disconnected") return True except Exception as e: self._log(f"Disconnect error: {e}") return False def check_health(self) -> bool: """ Check if connection is healthy. Returns True if connected and working. """ if self.status.state != ConnectionState.CONNECTED: return False # Check if interface is up with IP result = self._run_cmd(["ip", "addr", "show", self.config.wan_if]) if self.status.ip_address not in result.stdout: return False # Ping test (optional) result = self._run_cmd(["ping", "-c", "1", "-W", "3", "8.8.8.8"]) if result.returncode != 0: self._log("Health check: ping failed") return False return True def _watchdog_loop(self): """Background thread: periodically check and reconnect if needed.""" self._log(f"Watchdog started (interval: {self.config.watchdog_interval}s)") while not self._watchdog_stop.is_set(): try: if not self.check_health(): self._log("Watchdog: connection unhealthy, reconnecting...") with self._lock: self.status.state = ConnectionState.RECONNECTING self.connect() except Exception as e: logger.error(f"Watchdog error: {e}") # Wait for interval or stop signal self._watchdog_stop.wait(timeout=self.config.watchdog_interval) self._log("Watchdog stopped") def start_watchdog(self): """Start background watchdog thread.""" if self.config.watchdog_interval <= 0: return if self._watchdog_thread and self._watchdog_thread.is_alive(): return self._watchdog_stop.clear() self._watchdog_thread = threading.Thread( target=self._watchdog_loop, name="5g-watchdog", daemon=True ) self._watchdog_thread.start() def stop_watchdog(self): """Stop background watchdog thread.""" self._watchdog_stop.set() if self._watchdog_thread: self._watchdog_thread.join(timeout=5) self._watchdog_thread = None def get_status(self) -> Dict[str, Any]: """Get current connection status as dict.""" with self._lock: return { "state": self.status.state.value, "ip_address": self.status.ip_address, "dns1": self.status.dns1, "dns2": self.status.dns2, "signal_csq": self.status.signal_csq, "last_connect_time": self.status.last_connect_time.isoformat() if self.status.last_connect_time else None, "last_error": self.status.last_error, "connect_attempts": self.status.connect_attempts, "watchdog_running": self._watchdog_thread is not None and self._watchdog_thread.is_alive(), } def get_full_status(self) -> Dict[str, Any]: """Get combined connection and modem status.""" conn_status = self.get_status() modem_status = self.at.get_full_status() return { "connection": conn_status, "modem": modem_status, }