267 lines
8.0 KiB
Python
267 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Flask Buzzer Control Application for reTerminal DM4
|
|
Provides REST API for controlling the built-in buzzer
|
|
"""
|
|
|
|
from flask import Flask, jsonify, request
|
|
import subprocess
|
|
import time
|
|
import threading
|
|
|
|
app = Flask(__name__)
|
|
BUZZER_PATH = '/sys/class/leds/usr-buzzer/brightness'
|
|
|
|
class BuzzerController:
|
|
"""Buzzer controller class with pattern support"""
|
|
|
|
def __init__(self):
|
|
self.is_playing = False
|
|
self.play_thread = None
|
|
|
|
def _write_buzzer(self, value):
|
|
"""Internal method to write to buzzer"""
|
|
try:
|
|
subprocess.run(['sudo', 'tee', BUZZER_PATH],
|
|
input=str(value), text=True,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
def on(self):
|
|
"""Turn buzzer ON"""
|
|
return self._write_buzzer(1)
|
|
|
|
def off(self):
|
|
"""Turn buzzer OFF"""
|
|
return self._write_buzzer(0)
|
|
|
|
def beep(self, duration=0.2):
|
|
"""Play a single beep"""
|
|
self.on()
|
|
time.sleep(duration)
|
|
self.off()
|
|
|
|
def play_pattern(self, pattern):
|
|
"""
|
|
Play a pattern
|
|
pattern: list of tuples [(duration_on, duration_off), ...]
|
|
Example: [(0.1, 0.1), (0.1, 0.1), (0.1, 0.3)] = two short beeps, pause, one beep
|
|
"""
|
|
if self.is_playing:
|
|
return False
|
|
|
|
def _play():
|
|
self.is_playing = True
|
|
for on_time, off_time in pattern:
|
|
self.on()
|
|
time.sleep(on_time)
|
|
self.off()
|
|
time.sleep(off_time)
|
|
self.is_playing = False
|
|
|
|
self.play_thread = threading.Thread(target=_play, daemon=True)
|
|
self.play_thread.start()
|
|
return True
|
|
|
|
def starwars_theme(self, duration=5):
|
|
"""Play Star Wars theme pattern"""
|
|
if self.is_playing:
|
|
return False
|
|
|
|
def _play():
|
|
self.is_playing = True
|
|
start_time = time.time()
|
|
|
|
# Opening sequence
|
|
self.beep(0.05)
|
|
time.sleep(0.05)
|
|
self.beep(0.05)
|
|
time.sleep(0.05)
|
|
self.beep(0.05)
|
|
time.sleep(0.1)
|
|
self.beep(0.1)
|
|
time.sleep(0.1)
|
|
self.beep(0.15)
|
|
time.sleep(0.15)
|
|
|
|
# Continue pattern until duration reached
|
|
while time.time() - start_time < duration:
|
|
self.beep(0.08)
|
|
time.sleep(0.05)
|
|
self.beep(0.08)
|
|
time.sleep(0.05)
|
|
self.beep(0.08)
|
|
time.sleep(0.1)
|
|
self.beep(0.12)
|
|
time.sleep(0.1)
|
|
|
|
self.off()
|
|
self.is_playing = False
|
|
|
|
self.play_thread = threading.Thread(target=_play, daemon=True)
|
|
self.play_thread.start()
|
|
return True
|
|
|
|
# Create global buzzer controller
|
|
buzzer = BuzzerController()
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""API documentation endpoint"""
|
|
return jsonify({
|
|
'status': 'Buzzer Control API',
|
|
'version': '1.0',
|
|
'endpoints': {
|
|
'/buzzer/on': {
|
|
'method': 'POST, GET',
|
|
'description': 'Turn buzzer ON'
|
|
},
|
|
'/buzzer/off': {
|
|
'method': 'POST, GET',
|
|
'description': 'Turn buzzer OFF'
|
|
},
|
|
'/buzzer/beep': {
|
|
'method': 'GET',
|
|
'description': 'Play a beep',
|
|
'parameters': {'duration': 'float (0-5 seconds)'}
|
|
},
|
|
'/buzzer/pattern': {
|
|
'method': 'POST',
|
|
'description': 'Play custom pattern',
|
|
'body': {'pattern': '[[on_time, off_time], ...]'}
|
|
},
|
|
'/buzzer/starwars': {
|
|
'method': 'GET',
|
|
'description': 'Play Star Wars theme',
|
|
'parameters': {'duration': 'float (0-10 seconds)'}
|
|
},
|
|
'/buzzer/status': {
|
|
'method': 'GET',
|
|
'description': 'Get buzzer status'
|
|
}
|
|
}
|
|
})
|
|
|
|
@app.route('/buzzer/on', methods=['POST', 'GET'])
|
|
def turn_on():
|
|
"""Turn buzzer ON"""
|
|
if buzzer.on():
|
|
return jsonify({'status': 'success', 'message': 'Buzzer turned ON'})
|
|
return jsonify({'status': 'error', 'message': 'Failed to turn buzzer ON'}), 500
|
|
|
|
@app.route('/buzzer/off', methods=['POST', 'GET'])
|
|
def turn_off():
|
|
"""Turn buzzer OFF"""
|
|
if buzzer.off():
|
|
return jsonify({'status': 'success', 'message': 'Buzzer turned OFF'})
|
|
return jsonify({'status': 'error', 'message': 'Failed to turn buzzer OFF'}), 500
|
|
|
|
@app.route('/buzzer/beep', methods=['POST', 'GET'])
|
|
def beep():
|
|
"""Play a beep for specified duration"""
|
|
duration = float(request.args.get('duration', 0.2))
|
|
if duration < 0 or duration > 5:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Duration must be between 0 and 5 seconds'
|
|
}), 400
|
|
|
|
buzzer.beep(duration)
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': f'Beeped for {duration} seconds'
|
|
})
|
|
|
|
@app.route('/buzzer/pattern', methods=['POST'])
|
|
def play_pattern():
|
|
"""Play a custom pattern"""
|
|
data = request.get_json()
|
|
if not data or 'pattern' not in data:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Pattern required in JSON body'
|
|
}), 400
|
|
|
|
pattern = data['pattern']
|
|
if not isinstance(pattern, list):
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Pattern must be a list'
|
|
}), 400
|
|
|
|
# Validate pattern format
|
|
try:
|
|
validated_pattern = []
|
|
for item in pattern:
|
|
if isinstance(item, list) and len(item) == 2:
|
|
validated_pattern.append((float(item[0]), float(item[1])))
|
|
else:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Each pattern item must be [on_time, off_time]'
|
|
}), 400
|
|
except (ValueError, TypeError) as e:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': f'Invalid pattern format: {str(e)}'
|
|
}), 400
|
|
|
|
if buzzer.play_pattern(validated_pattern):
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': 'Pattern playing'
|
|
})
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Buzzer is already playing'
|
|
}), 409
|
|
|
|
@app.route('/buzzer/starwars', methods=['GET'])
|
|
def starwars():
|
|
"""Play Star Wars theme"""
|
|
duration = float(request.args.get('duration', 5))
|
|
if duration < 0 or duration > 10:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Duration must be between 0 and 10 seconds'
|
|
}), 400
|
|
|
|
if buzzer.starwars_theme(duration):
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': f'Playing Star Wars theme for {duration} seconds'
|
|
})
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': 'Buzzer is already playing'
|
|
}), 409
|
|
|
|
@app.route('/buzzer/status', methods=['GET'])
|
|
def status():
|
|
"""Get buzzer status"""
|
|
try:
|
|
result = subprocess.run(['cat', BUZZER_PATH],
|
|
capture_output=True, text=True, check=True)
|
|
state = result.stdout.strip()
|
|
return jsonify({
|
|
'status': 'success',
|
|
'buzzer': 'ON' if state == '1' else 'OFF',
|
|
'state': state,
|
|
'is_playing': buzzer.is_playing
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': str(e)
|
|
}), 500
|
|
|
|
if __name__ == '__main__':
|
|
print("Starting Buzzer Control API...")
|
|
print("API available at http://0.0.0.0:5000")
|
|
print("Documentation at http://0.0.0.0:5000/")
|
|
app.run(host='0.0.0.0', port=5000, debug=True)
|