Files
nearxos 808fbf5c7c Refactor golden image handling in backup upload process</message>
<message>Update the _set_golden_from_path function to improve the handling of existing golden image files. Replace the existing unlink logic with a more robust method that safely removes files or broken symlinks using the missing_ok parameter. This change enhances the reliability of the backup upload process by ensuring that stale references are properly cleared before setting a new golden image path.
2026-02-24 00:19:40 +02:00

387 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Telegram Alert Module for GNSS Guard
Sends alerts to Telegram for GPS validation failures:
- Distance differences exceeding threshold
- Missing GPS sources
- Stale GPS data
"""
import requests
import logging
from datetime import datetime
from typing import Dict, Any, Optional
logger = logging.getLogger("gnss_guard.telegram")
class TelegramAlert:
"""Handle Telegram notifications for GNSS Guard validation alerts."""
def __init__(self, bot_token: str, chat_id: str):
"""
Initialize Telegram bot.
Args:
bot_token: Telegram bot token (from BotFather)
chat_id: Telegram chat/group ID (can be negative for groups)
"""
self.bot_token = bot_token
self.chat_id = chat_id
self.api_url = f"https://api.telegram.org/bot{bot_token}"
@staticmethod
def escape_html(text: str) -> str:
"""
Escape HTML special characters for Telegram HTML parsing.
Args:
text: Text to escape
Returns:
str: Escaped text safe for Telegram HTML
"""
text = str(text)
text = text.replace('&', '&amp;')
text = text.replace('<', '&lt;')
text = text.replace('>', '&gt;')
return text
def send_message(self, message: str, parse_mode: str = "HTML") -> bool:
"""
Send a message to Telegram.
Args:
message: Message text to send
parse_mode: Message formatting (HTML or Markdown)
Returns:
bool: True if successful, False otherwise
"""
try:
url = f"{self.api_url}/sendMessage"
payload = {
"chat_id": self.chat_id,
"text": message,
"parse_mode": parse_mode,
"disable_web_page_preview": True
}
response = requests.post(url, json=payload, timeout=10)
if response.status_code == 200:
return True
else:
logger.error(f"Telegram API error: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Failed to send Telegram message: {e}")
return False
def send_validation_alert(
self,
validation_result: Dict[str, Any],
asset_name: str,
positions: Dict[str, Dict[str, Any]]
) -> bool:
"""
Send alert when GPS validation fails.
Args:
validation_result: Validation result dictionary from CoordinateValidator
asset_name: Asset identifier from config
positions: Dictionary of positions from all sources
Returns:
bool: True if successful
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
validation_details = validation_result.get("validation_details", {})
threshold_meters = validation_details.get("threshold_meters", 0)
max_distance_meters = validation_details.get("max_distance_meters", 0)
# Build alert message
message = (
f"🚨 <b>GNSS VALIDATION FAILED</b>\n\n"
f"📍 <b>Asset:</b> {self.escape_html(asset_name)}\n"
f"⏰ <b>Time:</b> {timestamp}\n\n"
)
# Missing sources
missing_sources = validation_result.get("sources_missing", [])
if missing_sources:
missing_list = ", ".join(missing_sources)
message += (
f"❌ <b>Missing Sources:</b> {self.escape_html(missing_list)}\n\n"
)
# Stale sources
stale_sources = validation_result.get("sources_stale", [])
if stale_sources:
stale_list = ", ".join(stale_sources)
message += (
f"⏱️ <b>Stale Sources:</b> {self.escape_html(stale_list)}\n"
f" (Data older than {validation_details.get('stale_threshold_seconds', 60)}s)\n\n"
)
# Distance differences
coordinate_differences = validation_result.get("coordinate_differences", {})
if coordinate_differences:
message += f"📏 <b>Distance Differences:</b>\n"
for pair_key, diff_data in coordinate_differences.items():
source1 = diff_data.get("source1", "unknown")
source2 = diff_data.get("source2", "unknown")
distance_m = diff_data.get("distance_meters", 0)
distance_km = distance_m / 1000.0
threshold_exceeded = "🚨" if distance_m > threshold_meters else ""
message += (
f" {threshold_exceeded} {self.escape_html(source1)}{self.escape_html(source2)}: "
f"{distance_m:.1f}m ({distance_km:.3f}km)\n"
)
if max_distance_meters > threshold_meters:
message += (
f"\n⚠️ <b>Threshold Exceeded:</b> {max_distance_meters:.1f}m > {threshold_meters:.1f}m\n"
)
message += "\n"
# Source coordinates summary
source_coordinates = validation_result.get("source_coordinates", {})
if source_coordinates:
message += f"📍 <b>Source Coordinates:</b>\n"
for source, coords in source_coordinates.items():
lat = coords.get("latitude", "N/A")
lon = coords.get("longitude", "N/A")
alt = coords.get("altitude")
timestamp_str = coords.get("timestamp", "N/A")
# Truncate timestamp for display
if isinstance(timestamp_str, str) and len(timestamp_str) > 19:
timestamp_str = timestamp_str[:19] + "Z"
alt_str = f"{alt:.1f}m" if alt is not None else "N/A"
message += (
f" • <b>{self.escape_html(source)}</b>\n"
f" Lat: {lat}, Lon: {lon}, Alt: {alt_str}\n"
f" Time: {self.escape_html(str(timestamp_str))}\n\n"
)
# Expected vs found sources
expected_sources = validation_details.get("expected_sources", [])
sources_found = validation_details.get("sources_found", [])
if expected_sources:
message += (
f"📊 <b>Sources:</b> {len(sources_found)}/{len(expected_sources)} found\n"
f" Expected: {', '.join(expected_sources)}\n"
f" Found: {', '.join(sources_found) if sources_found else 'None'}\n"
)
return self.send_message(message)
def send_validation_success(
self,
validation_result: Dict[str, Any],
asset_name: str,
positions: Dict[str, Dict[str, Any]]
) -> bool:
"""
Send notification when GPS validation passes (only if TELEGRAM_SEND_ALL=true).
Args:
validation_result: Validation result dictionary from CoordinateValidator
asset_name: Asset identifier from config
positions: Dictionary of positions from all sources
Returns:
bool: True if successful
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
validation_details = validation_result.get("validation_details", {})
max_distance_meters = validation_details.get("max_distance_meters", 0)
sources_found = validation_details.get("sources_found", [])
# Build success message
message = (
f"✅ <b>GNSS VALIDATION PASSED</b>\n\n"
f"📍 <b>Asset:</b> {self.escape_html(asset_name)}\n"
f"⏰ <b>Time:</b> {timestamp}\n\n"
)
# Source coordinates summary
source_coordinates = validation_result.get("source_coordinates", {})
if source_coordinates:
message += f"📍 <b>Source Coordinates:</b>\n"
for source, coords in source_coordinates.items():
lat = coords.get("latitude", "N/A")
lon = coords.get("longitude", "N/A")
alt = coords.get("altitude")
alt_str = f"{alt:.1f}m" if alt is not None else "N/A"
message += (
f" • <b>{self.escape_html(source)}</b>: "
f"{lat}, {lon}, Alt: {alt_str}\n"
)
message += "\n"
# Distance summary
coordinate_differences = validation_result.get("coordinate_differences", {})
if coordinate_differences:
message += f"📏 <b>Max Distance Difference:</b> {max_distance_meters:.1f}m\n\n"
message += (
f"📊 <b>Sources:</b> {len(sources_found)} active\n"
f" {', '.join(sources_found) if sources_found else 'None'}\n"
)
return self.send_message(message)
def send_error_alert(
self,
error_message: str,
asset_name: str,
error_details: Optional[str] = None
) -> bool:
"""
Send alert when there's an error during validation or data collection.
Args:
error_message: Main error message
asset_name: Asset identifier from config
error_details: Detailed error information
Returns:
bool: True if successful
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
# Escape error message to prevent HTML parsing issues
escaped_error = self.escape_html(error_message)
message = (
f"🔴 <b>GNSS GUARD ERROR</b>\n\n"
f"📍 <b>Asset:</b> {self.escape_html(asset_name)}\n"
f"⏰ <b>Time:</b> {timestamp}\n"
f"❌ <b>Error:</b> {escaped_error}\n"
)
if error_details:
escaped_details = self.escape_html(error_details[:1000])
message += f"\n📋 <b>Details:</b>\n<pre>{escaped_details}</pre>"
return self.send_message(message)
def send_state_change_alert(
self,
asset_name: str,
missing_added: set,
missing_removed: set,
stale_added: set,
stale_removed: set,
threshold_breached: bool,
threshold_was_breached: bool,
max_distance_meters: float,
threshold_meters: float,
source_coordinates: Dict[str, Any]
) -> bool:
"""
Send alert when validation state changes.
Args:
asset_name: Asset identifier from config
missing_added: Sources that became missing
missing_removed: Sources that recovered from missing
stale_added: Sources that became stale
stale_removed: Sources that recovered from stale
threshold_breached: Current threshold breach state
threshold_was_breached: Previous threshold breach state
max_distance_meters: Current max distance between sources
threshold_meters: Configured threshold
source_coordinates: Current source coordinates
Returns:
bool: True if successful
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
# Determine if this is a degradation or recovery
is_degradation = missing_added or stale_added or (threshold_breached and not threshold_was_breached)
is_recovery = missing_removed or stale_removed or (not threshold_breached and threshold_was_breached)
if is_degradation and not is_recovery:
emoji = "🚨"
title = "GNSS STATE DEGRADED"
elif is_recovery and not is_degradation:
emoji = ""
title = "GNSS STATE RECOVERED"
else:
emoji = "⚠️"
title = "GNSS STATE CHANGED"
message = (
f"{emoji} <b>{title}</b>\n\n"
f"📍 <b>Asset:</b> {self.escape_html(asset_name)}\n"
f"⏰ <b>Time:</b> {timestamp}\n\n"
)
# Missing sources changes
if missing_added:
message += f"❌ <b>Sources now MISSING:</b> {', '.join(sorted(missing_added))}\n"
if missing_removed:
message += f"✅ <b>Sources RECOVERED (was missing):</b> {', '.join(sorted(missing_removed))}\n"
# Stale sources changes
if stale_added:
message += f"⏱️ <b>Sources now STALE:</b> {', '.join(sorted(stale_added))}\n"
if stale_removed:
message += f"✅ <b>Sources RECOVERED (was stale):</b> {', '.join(sorted(stale_removed))}\n"
# Threshold breach changes
if threshold_breached and not threshold_was_breached:
message += (
f"\n🚨 <b>DISTANCE THRESHOLD BREACHED!</b>\n"
f" Max distance: {max_distance_meters:.1f}m (threshold: {threshold_meters:.1f}m)\n"
f" ⚠️ Possible GPS jamming or spoofing!\n"
)
elif not threshold_breached and threshold_was_breached:
message += (
f"\n✅ <b>Distance threshold OK</b>\n"
f" Max distance: {max_distance_meters:.1f}m (threshold: {threshold_meters:.1f}m)\n"
)
# Current coordinates summary
if source_coordinates:
message += f"\n📍 <b>Current Coordinates:</b>\n"
for source, coords in source_coordinates.items():
lat = coords.get("latitude", "N/A")
lon = coords.get("longitude", "N/A")
message += f"{self.escape_html(source)}: {lat}, {lon}\n"
return self.send_message(message)
def test_connection(self) -> bool:
"""
Test Telegram bot connection.
Returns:
bool: True if connection successful
"""
try:
url = f"{self.api_url}/getMe"
response = requests.get(url, timeout=10)
if response.status_code == 200:
bot_info = response.json()
logger.info(f"Telegram bot connected: @{bot_info['result']['username']}")
return True
else:
logger.error(f"Telegram connection failed: {response.status_code}")
return False
except Exception as e:
logger.error(f"Telegram connection error: {e}")
return False