#!/usr/bin/env python3 """ Buzzer Service for reTerminal DM4 Controls the hardware buzzer using the Linux LED subsystem """ import logging import os import subprocess import threading import time from typing import Optional logger = logging.getLogger("gnss_guard.buzzer") # Buzzer control path (Linux LED subsystem) BUZZER_PATH = '/sys/class/leds/usr-buzzer/brightness' class BuzzerService: """ Service to control the hardware buzzer on reTerminal DM4. The buzzer is controlled via the Linux LED subsystem: - Write "1" to turn ON - Write "0" to turn OFF Supports alarm patterns (on/off cycling) that run in a background thread. """ def __init__(self, on_duration: float = 1.0, off_duration: float = 1.0): """ Initialize the buzzer service. Args: on_duration: Duration in seconds for buzzer ON during alarm pattern off_duration: Duration in seconds for buzzer OFF during alarm pattern """ self.on_duration = on_duration self.off_duration = off_duration # Alarm state self._alarm_active = False self._alarm_acknowledged = False self._alarm_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() # Check if buzzer is available self._buzzer_available = os.path.exists(BUZZER_PATH) if not self._buzzer_available: logger.warning(f"Buzzer not available at {BUZZER_PATH} - running in simulation mode") else: logger.info(f"Buzzer service initialized (path: {BUZZER_PATH})") # Ensure buzzer is off on startup self.buzzer_off() def _write_buzzer(self, value: str) -> bool: """ Write value to buzzer control file. Args: value: "1" for ON, "0" for OFF Returns: True if successful, False otherwise """ if not self._buzzer_available: logger.debug(f"Buzzer simulation: {'ON' if value == '1' else 'OFF'}") return True try: # Use sudo tee to write to the sysfs file (requires sudo permissions) result = subprocess.run( ['sudo', 'tee', BUZZER_PATH], input=value, text=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, timeout=2.0 ) if result.returncode != 0: logger.error(f"Failed to write to buzzer: {result.stderr}") return False return True except subprocess.TimeoutExpired: logger.error("Timeout writing to buzzer") return False except Exception as e: logger.error(f"Error writing to buzzer: {e}") return False def buzzer_on(self) -> bool: """Turn buzzer ON""" return self._write_buzzer('1') def buzzer_off(self) -> bool: """Turn buzzer OFF""" return self._write_buzzer('0') def get_status(self) -> str: """ Get current buzzer status. Returns: "ON", "OFF", or "UNKNOWN" """ if not self._buzzer_available: return "SIMULATED" try: with open(BUZZER_PATH, 'r') as f: value = f.read().strip() return "ON" if value in ['1', '255'] else "OFF" except Exception as e: logger.error(f"Error reading buzzer status: {e}") return "UNKNOWN" def _alarm_loop(self): """ Background thread loop for alarm pattern (1 second on, 1 second off). Runs until alarm is acknowledged or stopped. """ logger.info("Alarm pattern started") while not self._stop_event.is_set() and not self._alarm_acknowledged: # Buzzer ON self.buzzer_on() # Wait for on_duration or until stopped if self._stop_event.wait(self.on_duration): break if self._alarm_acknowledged: break # Buzzer OFF self.buzzer_off() # Wait for off_duration or until stopped if self._stop_event.wait(self.off_duration): break # Ensure buzzer is off when alarm stops self.buzzer_off() self._alarm_active = False logger.info("Alarm pattern stopped") def start_alarm(self) -> bool: """ Start the alarm pattern (1 second on, 1 second off). Returns: True if alarm started, False if already running """ if self._alarm_active and self._alarm_thread and self._alarm_thread.is_alive(): logger.debug("Alarm already active") return False # Reset state self._alarm_acknowledged = False self._alarm_active = True self._stop_event.clear() # Start alarm thread self._alarm_thread = threading.Thread(target=self._alarm_loop, daemon=True) self._alarm_thread.start() logger.info("Alarm started") return True def stop_alarm(self) -> bool: """ Stop the alarm pattern. Returns: True if alarm was stopped, False if not running """ if not self._alarm_active: return False self._stop_event.set() # Wait for thread to finish if self._alarm_thread and self._alarm_thread.is_alive(): self._alarm_thread.join(timeout=3.0) # Ensure buzzer is off self.buzzer_off() self._alarm_active = False logger.info("Alarm stopped") return True def acknowledge_alarm(self) -> bool: """ Acknowledge the alarm, stopping the buzzer. Returns: True if alarm was acknowledged, False if no alarm active """ if not self._alarm_active: logger.debug("No active alarm to acknowledge") return False self._alarm_acknowledged = True self._stop_event.set() # Wait for thread to finish if self._alarm_thread and self._alarm_thread.is_alive(): self._alarm_thread.join(timeout=3.0) # Ensure buzzer is off self.buzzer_off() self._alarm_active = False logger.info("Alarm acknowledged") return True def is_alarm_active(self) -> bool: """Check if alarm is currently active""" return self._alarm_active def is_alarm_acknowledged(self) -> bool: """Check if current alarm has been acknowledged""" return self._alarm_acknowledged def reset_acknowledged(self): """ Reset the acknowledged state. Called when status returns to healthy, allowing new alarms to trigger. """ self._alarm_acknowledged = False def shutdown(self): """Shutdown the buzzer service, ensuring buzzer is off""" self.stop_alarm() self.buzzer_off() logger.info("Buzzer service shutdown") # Global buzzer service instance (singleton pattern) _buzzer_instance: Optional[BuzzerService] = None def get_buzzer_service(on_duration: float = 1.0, off_duration: float = 1.0) -> BuzzerService: """ Get or create the global buzzer service instance. Args: on_duration: Duration in seconds for buzzer ON during alarm off_duration: Duration in seconds for buzzer OFF during alarm Returns: BuzzerService instance """ global _buzzer_instance if _buzzer_instance is None: _buzzer_instance = BuzzerService(on_duration, off_duration) return _buzzer_instance