Files
Alpine_5G/web/at_client.py
nearxos 9dc35a57a2 Enhance 5G modem management with integrated web GUI and connection control
- Introduced a web GUI for managing 5G connections, replacing the standalone 5g-router service.
- Updated scripts to ensure exclusive access to the AT port, preventing conflicts.
- Improved troubleshooting documentation in 5G_MODEM_TROUBLESHOOTING.md, adding checks for processes using the AT port.
- Enhanced connection management in the web app, including auto-connect and detailed status APIs.
- Updated installation scripts to reflect changes in service management and dependencies.
2026-02-02 10:34:25 +02:00

317 lines
11 KiB
Python

"""
AT Command Client for Fibocom FM350-GL modem using pyserial.
Thread-safe with exclusive port access via Lock.
"""
import os
import re
import threading
import time
import logging
from typing import Optional, Dict, Any, List
try:
import serial
except ImportError:
serial = None # Will raise error when used
logger = logging.getLogger(__name__)
class ATClient:
"""
Thread-safe AT command interface for Fibocom FM350-GL modem.
Uses pyserial for serial communication.
"""
DEFAULT_PORTS = ["/dev/ttyUSB1", "/dev/ttyUSB0", "/dev/ttyUSB2"]
DEFAULT_BAUDRATE = 115200
DEFAULT_TIMEOUT = 5
def __init__(self, port: str = "/dev/ttyUSB1", baudrate: int = DEFAULT_BAUDRATE):
if serial is None:
raise ImportError("pyserial is required. Install with: pip install pyserial")
self.port = port
self.baudrate = baudrate
self.lock = threading.Lock()
self._serial: Optional[serial.Serial] = None
def _open(self) -> bool:
"""Open serial port if not already open."""
if self._serial and self._serial.is_open:
return True
try:
self._serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1,
write_timeout=1,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
)
# Disable echo
self._send_raw("ATE0\r", timeout=2)
return True
except (serial.SerialException, OSError) as e:
logger.warning(f"Failed to open {self.port}: {e}")
self._serial = None
return False
def _close(self):
"""Close serial port."""
if self._serial and self._serial.is_open:
try:
self._serial.close()
except Exception:
pass
self._serial = None
def _send_raw(self, cmd: str, timeout: float = DEFAULT_TIMEOUT) -> str:
"""
Send raw AT command and read response. Must hold lock.
Returns the response string (may contain OK, ERROR, or data).
"""
if not self._serial or not self._serial.is_open:
return ""
# Clear input buffer
self._serial.reset_input_buffer()
# Send command
if not cmd.endswith("\r"):
cmd += "\r"
self._serial.write(cmd.encode("utf-8"))
self._serial.flush()
# Read response until OK, ERROR, or timeout
response_lines = []
start = time.time()
while (time.time() - start) < timeout:
if self._serial.in_waiting > 0:
line = self._serial.readline().decode("utf-8", errors="replace").strip()
if line:
response_lines.append(line)
# Check for terminal responses
if line in ("OK", "ERROR") or line.startswith("+CME ERROR") or line.startswith("+CMS ERROR"):
break
else:
time.sleep(0.05)
return "\n".join(response_lines)
def send_command(self, cmd: str, timeout: float = DEFAULT_TIMEOUT) -> str:
"""
Send AT command and return response. Thread-safe.
"""
with self.lock:
if not self._open():
return ""
try:
return self._send_raw(cmd, timeout)
except Exception as e:
logger.error(f"AT command error: {e}")
self._close()
return ""
def test(self) -> bool:
"""Test AT communication. Returns True if modem responds with OK."""
response = self.send_command("AT", timeout=3)
return "OK" in response
def find_working_port(self) -> Optional[str]:
"""
Try to find a working AT port from the default list.
Updates self.port if found. Returns the working port or None.
"""
for port in self.DEFAULT_PORTS:
if not os.path.exists(port):
continue
old_port = self.port
self.port = port
with self.lock:
self._close()
if self.test():
logger.info(f"Found working AT port: {port}")
return port
self.port = old_port
with self.lock:
self._close()
return None
def get_manufacturer(self) -> str:
"""Get modem manufacturer (AT+CGMI)."""
response = self.send_command("AT+CGMI")
for line in response.split("\n"):
line = line.strip()
if line and line not in ("OK", "AT+CGMI") and not line.startswith("+"):
return line
return ""
def get_model(self) -> str:
"""Get modem model (AT+CGMM)."""
response = self.send_command("AT+CGMM")
for line in response.split("\n"):
line = line.strip()
if line and line not in ("OK", "AT+CGMM") and not line.startswith("+"):
return line
return ""
def get_revision(self) -> str:
"""Get firmware revision (AT+CGMR)."""
response = self.send_command("AT+CGMR")
for line in response.split("\n"):
line = line.strip()
if line and line not in ("OK", "AT+CGMR", "ERROR") and not line.startswith("+"):
return line
return ""
def get_imei(self) -> str:
"""Get IMEI (AT+CGSN)."""
response = self.send_command("AT+CGSN")
match = re.search(r"\d{15}", response)
return match.group(0) if match else ""
def get_iccid(self) -> str:
"""Get SIM ICCID (AT+CCID)."""
response = self.send_command("AT+CCID")
match = re.search(r"\+CCID:\s*\"?(\d+)\"?", response)
if match:
return match.group(1)
# Try alternate format
for line in response.split("\n"):
line = line.strip()
if line and re.match(r"^\d{18,22}$", line):
return line
return ""
def get_signal_csq(self) -> Optional[int]:
"""
Get signal strength as CSQ value (AT+CSQ).
CSQ 0-31 = signal, 99 = unknown.
Returns None if unavailable.
"""
response = self.send_command("AT+CSQ")
match = re.search(r"\+CSQ:\s*(\d+)", response)
if match:
csq = int(match.group(1))
return csq if csq <= 31 else None
return None
def get_registration_status(self) -> Dict[str, str]:
"""
Get network registration status (AT+CREG?, AT+CEREG?).
Returns dict with 'creg' and 'cereg' status strings.
"""
result = {"creg": "unknown", "cereg": "unknown"}
# Parse registration: ,1 = home, ,5 = roaming, ,2 = searching, ,0 = not registered
def parse_reg(response: str) -> str:
if ",1" in response:
return "registered_home"
elif ",5" in response:
return "registered_roaming"
elif ",2" in response:
return "searching"
elif ",0" in response:
return "not_registered"
return "unknown"
creg_resp = self.send_command("AT+CREG?")
cereg_resp = self.send_command("AT+CEREG?")
if "+CREG:" in creg_resp:
result["creg"] = parse_reg(creg_resp)
if "+CEREG:" in cereg_resp:
result["cereg"] = parse_reg(cereg_resp)
return result
def get_operator(self) -> str:
"""Get current operator name (AT+COPS?)."""
response = self.send_command("AT+COPS?")
# Format: +COPS: 0,0,"Vodafone",7
match = re.search(r'"([^"]+)"', response)
return match.group(1) if match else ""
def get_usb_mode(self) -> Optional[int]:
"""Get USB mode (AT+GTUSBMODE?). 40 = RNDIS, 41 = extended."""
response = self.send_command("AT+GTUSBMODE?")
match = re.search(r"\+GTUSBMODE:\s*(\d+)", response)
return int(match.group(1)) if match else None
def get_pdp_address(self, cid: int = 1) -> Optional[str]:
"""Get IP address assigned to PDP context (AT+CGPADDR=cid)."""
response = self.send_command(f"AT+CGPADDR={cid}")
match = re.search(r"(\d+\.\d+\.\d+\.\d+)", response)
return match.group(1) if match else None
def get_connection_params(self, cid: int = 1) -> Dict[str, Any]:
"""
Get connection dynamic parameters (AT+CGCONTRDP=cid).
Returns dict with 'ip', 'dns1', 'dns2', etc.
"""
response = self.send_command(f"AT+CGCONTRDP={cid}", timeout=5)
result = {"ip": None, "dns1": None, "dns2": None}
# Extract all IP addresses from response
ips = re.findall(r"(\d+\.\d+\.\d+\.\d+)", response)
if ips:
# First IP is usually the assigned IP, rest are DNS
result["ip"] = ips[0]
if len(ips) > 1:
result["dns1"] = ips[1]
if len(ips) > 2:
result["dns2"] = ips[2]
return result
def configure_apn(self, apn: str, cid: int = 1) -> bool:
"""Configure APN for PDP context (AT+CGDCONT)."""
response = self.send_command(f'AT+CGDCONT={cid},"IP","{apn}"', timeout=5)
return "OK" in response
def activate_pdp(self, cid: int = 1) -> bool:
"""Activate PDP context (AT+CGACT=1,cid)."""
response = self.send_command(f"AT+CGACT=1,{cid}", timeout=10)
return "OK" in response or "CGEV" in response
def deactivate_pdp(self, cid: int = 1) -> bool:
"""Deactivate PDP context (AT+CGACT=0,cid)."""
response = self.send_command(f"AT+CGACT=0,{cid}", timeout=5)
return "OK" in response
def reset_modem(self) -> bool:
"""Reset modem (AT+CFUN=1,1)."""
response = self.send_command("AT+CFUN=1,1", timeout=5)
self._close()
return "OK" in response
def get_full_status(self) -> Dict[str, Any]:
"""
Get complete modem status. Replaces modem-status-at.sh functionality.
"""
reg = self.get_registration_status()
return {
"at_port": self.port,
"manufacturer": self.get_manufacturer(),
"model": self.get_model(),
"revision": self.get_revision(),
"imei": self.get_imei(),
"signal_csq": self.get_signal_csq(),
"creg_status": reg["creg"],
"cereg_status": reg["cereg"],
"usb_mode": self.get_usb_mode(),
"iccid": self.get_iccid(),
"operator": self.get_operator(),
"modem_ip": self.get_pdp_address(1),
}
def close(self):
"""Close serial connection."""
with self.lock:
self._close()
def __del__(self):
self.close()