<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
259 lines
7.8 KiB
Python
259 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Buzzer Service for reTerminal DM4
|
|
Controls the hardware buzzer using the Linux LED subsystem
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger("gnss_guard.buzzer")
|
|
|
|
# Buzzer control path (Linux LED subsystem)
|
|
BUZZER_PATH = '/sys/class/leds/usr-buzzer/brightness'
|
|
|
|
|
|
class BuzzerService:
|
|
"""
|
|
Service to control the hardware buzzer on reTerminal DM4.
|
|
|
|
The buzzer is controlled via the Linux LED subsystem:
|
|
- Write "1" to turn ON
|
|
- Write "0" to turn OFF
|
|
|
|
Supports alarm patterns (on/off cycling) that run in a background thread.
|
|
"""
|
|
|
|
def __init__(self, on_duration: float = 1.0, off_duration: float = 1.0):
|
|
"""
|
|
Initialize the buzzer service.
|
|
|
|
Args:
|
|
on_duration: Duration in seconds for buzzer ON during alarm pattern
|
|
off_duration: Duration in seconds for buzzer OFF during alarm pattern
|
|
"""
|
|
self.on_duration = on_duration
|
|
self.off_duration = off_duration
|
|
|
|
# Alarm state
|
|
self._alarm_active = False
|
|
self._alarm_acknowledged = False
|
|
self._alarm_thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
|
|
# Check if buzzer is available
|
|
self._buzzer_available = os.path.exists(BUZZER_PATH)
|
|
if not self._buzzer_available:
|
|
logger.warning(f"Buzzer not available at {BUZZER_PATH} - running in simulation mode")
|
|
else:
|
|
logger.info(f"Buzzer service initialized (path: {BUZZER_PATH})")
|
|
# Ensure buzzer is off on startup
|
|
self.buzzer_off()
|
|
|
|
def _write_buzzer(self, value: str) -> bool:
|
|
"""
|
|
Write value to buzzer control file.
|
|
|
|
Args:
|
|
value: "1" for ON, "0" for OFF
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if not self._buzzer_available:
|
|
logger.debug(f"Buzzer simulation: {'ON' if value == '1' else 'OFF'}")
|
|
return True
|
|
|
|
try:
|
|
# Use sudo tee to write to the sysfs file (requires sudo permissions)
|
|
result = subprocess.run(
|
|
['sudo', 'tee', BUZZER_PATH],
|
|
input=value,
|
|
text=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
timeout=2.0
|
|
)
|
|
if result.returncode != 0:
|
|
logger.error(f"Failed to write to buzzer: {result.stderr}")
|
|
return False
|
|
return True
|
|
except subprocess.TimeoutExpired:
|
|
logger.error("Timeout writing to buzzer")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error writing to buzzer: {e}")
|
|
return False
|
|
|
|
def buzzer_on(self) -> bool:
|
|
"""Turn buzzer ON"""
|
|
return self._write_buzzer('1')
|
|
|
|
def buzzer_off(self) -> bool:
|
|
"""Turn buzzer OFF"""
|
|
return self._write_buzzer('0')
|
|
|
|
def get_status(self) -> str:
|
|
"""
|
|
Get current buzzer status.
|
|
|
|
Returns:
|
|
"ON", "OFF", or "UNKNOWN"
|
|
"""
|
|
if not self._buzzer_available:
|
|
return "SIMULATED"
|
|
|
|
try:
|
|
with open(BUZZER_PATH, 'r') as f:
|
|
value = f.read().strip()
|
|
return "ON" if value in ['1', '255'] else "OFF"
|
|
except Exception as e:
|
|
logger.error(f"Error reading buzzer status: {e}")
|
|
return "UNKNOWN"
|
|
|
|
def _alarm_loop(self):
|
|
"""
|
|
Background thread loop for alarm pattern (1 second on, 1 second off).
|
|
Runs until alarm is acknowledged or stopped.
|
|
"""
|
|
logger.info("Alarm pattern started")
|
|
|
|
while not self._stop_event.is_set() and not self._alarm_acknowledged:
|
|
# Buzzer ON
|
|
self.buzzer_on()
|
|
|
|
# Wait for on_duration or until stopped
|
|
if self._stop_event.wait(self.on_duration):
|
|
break
|
|
if self._alarm_acknowledged:
|
|
break
|
|
|
|
# Buzzer OFF
|
|
self.buzzer_off()
|
|
|
|
# Wait for off_duration or until stopped
|
|
if self._stop_event.wait(self.off_duration):
|
|
break
|
|
|
|
# Ensure buzzer is off when alarm stops
|
|
self.buzzer_off()
|
|
self._alarm_active = False
|
|
logger.info("Alarm pattern stopped")
|
|
|
|
def start_alarm(self) -> bool:
|
|
"""
|
|
Start the alarm pattern (1 second on, 1 second off).
|
|
|
|
Returns:
|
|
True if alarm started, False if already running
|
|
"""
|
|
if self._alarm_active and self._alarm_thread and self._alarm_thread.is_alive():
|
|
logger.debug("Alarm already active")
|
|
return False
|
|
|
|
# Reset state
|
|
self._alarm_acknowledged = False
|
|
self._alarm_active = True
|
|
self._stop_event.clear()
|
|
|
|
# Start alarm thread
|
|
self._alarm_thread = threading.Thread(target=self._alarm_loop, daemon=True)
|
|
self._alarm_thread.start()
|
|
|
|
logger.info("Alarm started")
|
|
return True
|
|
|
|
def stop_alarm(self) -> bool:
|
|
"""
|
|
Stop the alarm pattern.
|
|
|
|
Returns:
|
|
True if alarm was stopped, False if not running
|
|
"""
|
|
if not self._alarm_active:
|
|
return False
|
|
|
|
self._stop_event.set()
|
|
|
|
# Wait for thread to finish
|
|
if self._alarm_thread and self._alarm_thread.is_alive():
|
|
self._alarm_thread.join(timeout=3.0)
|
|
|
|
# Ensure buzzer is off
|
|
self.buzzer_off()
|
|
self._alarm_active = False
|
|
|
|
logger.info("Alarm stopped")
|
|
return True
|
|
|
|
def acknowledge_alarm(self) -> bool:
|
|
"""
|
|
Acknowledge the alarm, stopping the buzzer.
|
|
|
|
Returns:
|
|
True if alarm was acknowledged, False if no alarm active
|
|
"""
|
|
if not self._alarm_active:
|
|
logger.debug("No active alarm to acknowledge")
|
|
return False
|
|
|
|
self._alarm_acknowledged = True
|
|
self._stop_event.set()
|
|
|
|
# Wait for thread to finish
|
|
if self._alarm_thread and self._alarm_thread.is_alive():
|
|
self._alarm_thread.join(timeout=3.0)
|
|
|
|
# Ensure buzzer is off
|
|
self.buzzer_off()
|
|
self._alarm_active = False
|
|
|
|
logger.info("Alarm acknowledged")
|
|
return True
|
|
|
|
def is_alarm_active(self) -> bool:
|
|
"""Check if alarm is currently active"""
|
|
return self._alarm_active
|
|
|
|
def is_alarm_acknowledged(self) -> bool:
|
|
"""Check if current alarm has been acknowledged"""
|
|
return self._alarm_acknowledged
|
|
|
|
def reset_acknowledged(self):
|
|
"""
|
|
Reset the acknowledged state.
|
|
Called when status returns to healthy, allowing new alarms to trigger.
|
|
"""
|
|
self._alarm_acknowledged = False
|
|
|
|
def shutdown(self):
|
|
"""Shutdown the buzzer service, ensuring buzzer is off"""
|
|
self.stop_alarm()
|
|
self.buzzer_off()
|
|
logger.info("Buzzer service shutdown")
|
|
|
|
|
|
# Global buzzer service instance (singleton pattern)
|
|
_buzzer_instance: Optional[BuzzerService] = None
|
|
|
|
|
|
def get_buzzer_service(on_duration: float = 1.0, off_duration: float = 1.0) -> BuzzerService:
|
|
"""
|
|
Get or create the global buzzer service instance.
|
|
|
|
Args:
|
|
on_duration: Duration in seconds for buzzer ON during alarm
|
|
off_duration: Duration in seconds for buzzer OFF during alarm
|
|
|
|
Returns:
|
|
BuzzerService instance
|
|
"""
|
|
global _buzzer_instance
|
|
if _buzzer_instance is None:
|
|
_buzzer_instance = BuzzerService(on_duration, off_duration)
|
|
return _buzzer_instance
|