Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

259 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""
Buzzer Service for reTerminal DM4
Controls the hardware buzzer using the Linux LED subsystem
"""
import logging
import os
import subprocess
import threading
import time
from typing import Optional
logger = logging.getLogger("gnss_guard.buzzer")
# Buzzer control path (Linux LED subsystem)
BUZZER_PATH = '/sys/class/leds/usr-buzzer/brightness'
class BuzzerService:
"""
Service to control the hardware buzzer on reTerminal DM4.
The buzzer is controlled via the Linux LED subsystem:
- Write "1" to turn ON
- Write "0" to turn OFF
Supports alarm patterns (on/off cycling) that run in a background thread.
"""
def __init__(self, on_duration: float = 1.0, off_duration: float = 1.0):
"""
Initialize the buzzer service.
Args:
on_duration: Duration in seconds for buzzer ON during alarm pattern
off_duration: Duration in seconds for buzzer OFF during alarm pattern
"""
self.on_duration = on_duration
self.off_duration = off_duration
# Alarm state
self._alarm_active = False
self._alarm_acknowledged = False
self._alarm_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Check if buzzer is available
self._buzzer_available = os.path.exists(BUZZER_PATH)
if not self._buzzer_available:
logger.warning(f"Buzzer not available at {BUZZER_PATH} - running in simulation mode")
else:
logger.info(f"Buzzer service initialized (path: {BUZZER_PATH})")
# Ensure buzzer is off on startup
self.buzzer_off()
def _write_buzzer(self, value: str) -> bool:
"""
Write value to buzzer control file.
Args:
value: "1" for ON, "0" for OFF
Returns:
True if successful, False otherwise
"""
if not self._buzzer_available:
logger.debug(f"Buzzer simulation: {'ON' if value == '1' else 'OFF'}")
return True
try:
# Use sudo tee to write to the sysfs file (requires sudo permissions)
result = subprocess.run(
['sudo', 'tee', BUZZER_PATH],
input=value,
text=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
timeout=2.0
)
if result.returncode != 0:
logger.error(f"Failed to write to buzzer: {result.stderr}")
return False
return True
except subprocess.TimeoutExpired:
logger.error("Timeout writing to buzzer")
return False
except Exception as e:
logger.error(f"Error writing to buzzer: {e}")
return False
def buzzer_on(self) -> bool:
"""Turn buzzer ON"""
return self._write_buzzer('1')
def buzzer_off(self) -> bool:
"""Turn buzzer OFF"""
return self._write_buzzer('0')
def get_status(self) -> str:
"""
Get current buzzer status.
Returns:
"ON", "OFF", or "UNKNOWN"
"""
if not self._buzzer_available:
return "SIMULATED"
try:
with open(BUZZER_PATH, 'r') as f:
value = f.read().strip()
return "ON" if value in ['1', '255'] else "OFF"
except Exception as e:
logger.error(f"Error reading buzzer status: {e}")
return "UNKNOWN"
def _alarm_loop(self):
"""
Background thread loop for alarm pattern (1 second on, 1 second off).
Runs until alarm is acknowledged or stopped.
"""
logger.info("Alarm pattern started")
while not self._stop_event.is_set() and not self._alarm_acknowledged:
# Buzzer ON
self.buzzer_on()
# Wait for on_duration or until stopped
if self._stop_event.wait(self.on_duration):
break
if self._alarm_acknowledged:
break
# Buzzer OFF
self.buzzer_off()
# Wait for off_duration or until stopped
if self._stop_event.wait(self.off_duration):
break
# Ensure buzzer is off when alarm stops
self.buzzer_off()
self._alarm_active = False
logger.info("Alarm pattern stopped")
def start_alarm(self) -> bool:
"""
Start the alarm pattern (1 second on, 1 second off).
Returns:
True if alarm started, False if already running
"""
if self._alarm_active and self._alarm_thread and self._alarm_thread.is_alive():
logger.debug("Alarm already active")
return False
# Reset state
self._alarm_acknowledged = False
self._alarm_active = True
self._stop_event.clear()
# Start alarm thread
self._alarm_thread = threading.Thread(target=self._alarm_loop, daemon=True)
self._alarm_thread.start()
logger.info("Alarm started")
return True
def stop_alarm(self) -> bool:
"""
Stop the alarm pattern.
Returns:
True if alarm was stopped, False if not running
"""
if not self._alarm_active:
return False
self._stop_event.set()
# Wait for thread to finish
if self._alarm_thread and self._alarm_thread.is_alive():
self._alarm_thread.join(timeout=3.0)
# Ensure buzzer is off
self.buzzer_off()
self._alarm_active = False
logger.info("Alarm stopped")
return True
def acknowledge_alarm(self) -> bool:
"""
Acknowledge the alarm, stopping the buzzer.
Returns:
True if alarm was acknowledged, False if no alarm active
"""
if not self._alarm_active:
logger.debug("No active alarm to acknowledge")
return False
self._alarm_acknowledged = True
self._stop_event.set()
# Wait for thread to finish
if self._alarm_thread and self._alarm_thread.is_alive():
self._alarm_thread.join(timeout=3.0)
# Ensure buzzer is off
self.buzzer_off()
self._alarm_active = False
logger.info("Alarm acknowledged")
return True
def is_alarm_active(self) -> bool:
"""Check if alarm is currently active"""
return self._alarm_active
def is_alarm_acknowledged(self) -> bool:
"""Check if current alarm has been acknowledged"""
return self._alarm_acknowledged
def reset_acknowledged(self):
"""
Reset the acknowledged state.
Called when status returns to healthy, allowing new alarms to trigger.
"""
self._alarm_acknowledged = False
def shutdown(self):
"""Shutdown the buzzer service, ensuring buzzer is off"""
self.stop_alarm()
self.buzzer_off()
logger.info("Buzzer service shutdown")
# Global buzzer service instance (singleton pattern)
_buzzer_instance: Optional[BuzzerService] = None
def get_buzzer_service(on_duration: float = 1.0, off_duration: float = 1.0) -> BuzzerService:
"""
Get or create the global buzzer service instance.
Args:
on_duration: Duration in seconds for buzzer ON during alarm
off_duration: Duration in seconds for buzzer OFF during alarm
Returns:
BuzzerService instance
"""
global _buzzer_instance
if _buzzer_instance is None:
_buzzer_instance = BuzzerService(on_duration, off_duration)
return _buzzer_instance