- Introduced a web GUI for managing 5G connections, replacing the standalone 5g-router service. - Updated scripts to ensure exclusive access to the AT port, preventing conflicts. - Improved troubleshooting documentation in 5G_MODEM_TROUBLESHOOTING.md, adding checks for processes using the AT port. - Enhanced connection management in the web app, including auto-connect and detailed status APIs. - Updated installation scripts to reflect changes in service management and dependencies.
317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""
|
|
AT Command Client for Fibocom FM350-GL modem using pyserial.
|
|
Thread-safe with exclusive port access via Lock.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
import logging
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
serial = None # Will raise error when used
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ATClient:
|
|
"""
|
|
Thread-safe AT command interface for Fibocom FM350-GL modem.
|
|
Uses pyserial for serial communication.
|
|
"""
|
|
|
|
DEFAULT_PORTS = ["/dev/ttyUSB1", "/dev/ttyUSB0", "/dev/ttyUSB2"]
|
|
DEFAULT_BAUDRATE = 115200
|
|
DEFAULT_TIMEOUT = 5
|
|
|
|
def __init__(self, port: str = "/dev/ttyUSB1", baudrate: int = DEFAULT_BAUDRATE):
|
|
if serial is None:
|
|
raise ImportError("pyserial is required. Install with: pip install pyserial")
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.lock = threading.Lock()
|
|
self._serial: Optional[serial.Serial] = None
|
|
|
|
def _open(self) -> bool:
|
|
"""Open serial port if not already open."""
|
|
if self._serial and self._serial.is_open:
|
|
return True
|
|
try:
|
|
self._serial = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baudrate,
|
|
timeout=1,
|
|
write_timeout=1,
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
)
|
|
# Disable echo
|
|
self._send_raw("ATE0\r", timeout=2)
|
|
return True
|
|
except (serial.SerialException, OSError) as e:
|
|
logger.warning(f"Failed to open {self.port}: {e}")
|
|
self._serial = None
|
|
return False
|
|
|
|
def _close(self):
|
|
"""Close serial port."""
|
|
if self._serial and self._serial.is_open:
|
|
try:
|
|
self._serial.close()
|
|
except Exception:
|
|
pass
|
|
self._serial = None
|
|
|
|
def _send_raw(self, cmd: str, timeout: float = DEFAULT_TIMEOUT) -> str:
|
|
"""
|
|
Send raw AT command and read response. Must hold lock.
|
|
Returns the response string (may contain OK, ERROR, or data).
|
|
"""
|
|
if not self._serial or not self._serial.is_open:
|
|
return ""
|
|
|
|
# Clear input buffer
|
|
self._serial.reset_input_buffer()
|
|
|
|
# Send command
|
|
if not cmd.endswith("\r"):
|
|
cmd += "\r"
|
|
self._serial.write(cmd.encode("utf-8"))
|
|
self._serial.flush()
|
|
|
|
# Read response until OK, ERROR, or timeout
|
|
response_lines = []
|
|
start = time.time()
|
|
while (time.time() - start) < timeout:
|
|
if self._serial.in_waiting > 0:
|
|
line = self._serial.readline().decode("utf-8", errors="replace").strip()
|
|
if line:
|
|
response_lines.append(line)
|
|
# Check for terminal responses
|
|
if line in ("OK", "ERROR") or line.startswith("+CME ERROR") or line.startswith("+CMS ERROR"):
|
|
break
|
|
else:
|
|
time.sleep(0.05)
|
|
|
|
return "\n".join(response_lines)
|
|
|
|
def send_command(self, cmd: str, timeout: float = DEFAULT_TIMEOUT) -> str:
|
|
"""
|
|
Send AT command and return response. Thread-safe.
|
|
"""
|
|
with self.lock:
|
|
if not self._open():
|
|
return ""
|
|
try:
|
|
return self._send_raw(cmd, timeout)
|
|
except Exception as e:
|
|
logger.error(f"AT command error: {e}")
|
|
self._close()
|
|
return ""
|
|
|
|
def test(self) -> bool:
|
|
"""Test AT communication. Returns True if modem responds with OK."""
|
|
response = self.send_command("AT", timeout=3)
|
|
return "OK" in response
|
|
|
|
def find_working_port(self) -> Optional[str]:
|
|
"""
|
|
Try to find a working AT port from the default list.
|
|
Updates self.port if found. Returns the working port or None.
|
|
"""
|
|
for port in self.DEFAULT_PORTS:
|
|
if not os.path.exists(port):
|
|
continue
|
|
old_port = self.port
|
|
self.port = port
|
|
with self.lock:
|
|
self._close()
|
|
if self.test():
|
|
logger.info(f"Found working AT port: {port}")
|
|
return port
|
|
self.port = old_port
|
|
with self.lock:
|
|
self._close()
|
|
return None
|
|
|
|
def get_manufacturer(self) -> str:
|
|
"""Get modem manufacturer (AT+CGMI)."""
|
|
response = self.send_command("AT+CGMI")
|
|
for line in response.split("\n"):
|
|
line = line.strip()
|
|
if line and line not in ("OK", "AT+CGMI") and not line.startswith("+"):
|
|
return line
|
|
return ""
|
|
|
|
def get_model(self) -> str:
|
|
"""Get modem model (AT+CGMM)."""
|
|
response = self.send_command("AT+CGMM")
|
|
for line in response.split("\n"):
|
|
line = line.strip()
|
|
if line and line not in ("OK", "AT+CGMM") and not line.startswith("+"):
|
|
return line
|
|
return ""
|
|
|
|
def get_revision(self) -> str:
|
|
"""Get firmware revision (AT+CGMR)."""
|
|
response = self.send_command("AT+CGMR")
|
|
for line in response.split("\n"):
|
|
line = line.strip()
|
|
if line and line not in ("OK", "AT+CGMR", "ERROR") and not line.startswith("+"):
|
|
return line
|
|
return ""
|
|
|
|
def get_imei(self) -> str:
|
|
"""Get IMEI (AT+CGSN)."""
|
|
response = self.send_command("AT+CGSN")
|
|
match = re.search(r"\d{15}", response)
|
|
return match.group(0) if match else ""
|
|
|
|
def get_iccid(self) -> str:
|
|
"""Get SIM ICCID (AT+CCID)."""
|
|
response = self.send_command("AT+CCID")
|
|
match = re.search(r"\+CCID:\s*\"?(\d+)\"?", response)
|
|
if match:
|
|
return match.group(1)
|
|
# Try alternate format
|
|
for line in response.split("\n"):
|
|
line = line.strip()
|
|
if line and re.match(r"^\d{18,22}$", line):
|
|
return line
|
|
return ""
|
|
|
|
def get_signal_csq(self) -> Optional[int]:
|
|
"""
|
|
Get signal strength as CSQ value (AT+CSQ).
|
|
CSQ 0-31 = signal, 99 = unknown.
|
|
Returns None if unavailable.
|
|
"""
|
|
response = self.send_command("AT+CSQ")
|
|
match = re.search(r"\+CSQ:\s*(\d+)", response)
|
|
if match:
|
|
csq = int(match.group(1))
|
|
return csq if csq <= 31 else None
|
|
return None
|
|
|
|
def get_registration_status(self) -> Dict[str, str]:
|
|
"""
|
|
Get network registration status (AT+CREG?, AT+CEREG?).
|
|
Returns dict with 'creg' and 'cereg' status strings.
|
|
"""
|
|
result = {"creg": "unknown", "cereg": "unknown"}
|
|
|
|
# Parse registration: ,1 = home, ,5 = roaming, ,2 = searching, ,0 = not registered
|
|
def parse_reg(response: str) -> str:
|
|
if ",1" in response:
|
|
return "registered_home"
|
|
elif ",5" in response:
|
|
return "registered_roaming"
|
|
elif ",2" in response:
|
|
return "searching"
|
|
elif ",0" in response:
|
|
return "not_registered"
|
|
return "unknown"
|
|
|
|
creg_resp = self.send_command("AT+CREG?")
|
|
cereg_resp = self.send_command("AT+CEREG?")
|
|
|
|
if "+CREG:" in creg_resp:
|
|
result["creg"] = parse_reg(creg_resp)
|
|
if "+CEREG:" in cereg_resp:
|
|
result["cereg"] = parse_reg(cereg_resp)
|
|
|
|
return result
|
|
|
|
def get_operator(self) -> str:
|
|
"""Get current operator name (AT+COPS?)."""
|
|
response = self.send_command("AT+COPS?")
|
|
# Format: +COPS: 0,0,"Vodafone",7
|
|
match = re.search(r'"([^"]+)"', response)
|
|
return match.group(1) if match else ""
|
|
|
|
def get_usb_mode(self) -> Optional[int]:
|
|
"""Get USB mode (AT+GTUSBMODE?). 40 = RNDIS, 41 = extended."""
|
|
response = self.send_command("AT+GTUSBMODE?")
|
|
match = re.search(r"\+GTUSBMODE:\s*(\d+)", response)
|
|
return int(match.group(1)) if match else None
|
|
|
|
def get_pdp_address(self, cid: int = 1) -> Optional[str]:
|
|
"""Get IP address assigned to PDP context (AT+CGPADDR=cid)."""
|
|
response = self.send_command(f"AT+CGPADDR={cid}")
|
|
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", response)
|
|
return match.group(1) if match else None
|
|
|
|
def get_connection_params(self, cid: int = 1) -> Dict[str, Any]:
|
|
"""
|
|
Get connection dynamic parameters (AT+CGCONTRDP=cid).
|
|
Returns dict with 'ip', 'dns1', 'dns2', etc.
|
|
"""
|
|
response = self.send_command(f"AT+CGCONTRDP={cid}", timeout=5)
|
|
result = {"ip": None, "dns1": None, "dns2": None}
|
|
|
|
# Extract all IP addresses from response
|
|
ips = re.findall(r"(\d+\.\d+\.\d+\.\d+)", response)
|
|
if ips:
|
|
# First IP is usually the assigned IP, rest are DNS
|
|
result["ip"] = ips[0]
|
|
if len(ips) > 1:
|
|
result["dns1"] = ips[1]
|
|
if len(ips) > 2:
|
|
result["dns2"] = ips[2]
|
|
|
|
return result
|
|
|
|
def configure_apn(self, apn: str, cid: int = 1) -> bool:
|
|
"""Configure APN for PDP context (AT+CGDCONT)."""
|
|
response = self.send_command(f'AT+CGDCONT={cid},"IP","{apn}"', timeout=5)
|
|
return "OK" in response
|
|
|
|
def activate_pdp(self, cid: int = 1) -> bool:
|
|
"""Activate PDP context (AT+CGACT=1,cid)."""
|
|
response = self.send_command(f"AT+CGACT=1,{cid}", timeout=10)
|
|
return "OK" in response or "CGEV" in response
|
|
|
|
def deactivate_pdp(self, cid: int = 1) -> bool:
|
|
"""Deactivate PDP context (AT+CGACT=0,cid)."""
|
|
response = self.send_command(f"AT+CGACT=0,{cid}", timeout=5)
|
|
return "OK" in response
|
|
|
|
def reset_modem(self) -> bool:
|
|
"""Reset modem (AT+CFUN=1,1)."""
|
|
response = self.send_command("AT+CFUN=1,1", timeout=5)
|
|
self._close()
|
|
return "OK" in response
|
|
|
|
def get_full_status(self) -> Dict[str, Any]:
|
|
"""
|
|
Get complete modem status. Replaces modem-status-at.sh functionality.
|
|
"""
|
|
reg = self.get_registration_status()
|
|
return {
|
|
"at_port": self.port,
|
|
"manufacturer": self.get_manufacturer(),
|
|
"model": self.get_model(),
|
|
"revision": self.get_revision(),
|
|
"imei": self.get_imei(),
|
|
"signal_csq": self.get_signal_csq(),
|
|
"creg_status": reg["creg"],
|
|
"cereg_status": reg["cereg"],
|
|
"usb_mode": self.get_usb_mode(),
|
|
"iccid": self.get_iccid(),
|
|
"operator": self.get_operator(),
|
|
"modem_ip": self.get_pdp_address(1),
|
|
}
|
|
|
|
def close(self):
|
|
"""Close serial connection."""
|
|
with self.lock:
|
|
self._close()
|
|
|
|
def __del__(self):
|
|
self.close()
|