#!/usr/bin/env python3 """ Telegram Alert Module for GNSS Guard Sends alerts to Telegram for GPS validation failures: - Distance differences exceeding threshold - Missing GPS sources - Stale GPS data """ import requests import logging from datetime import datetime from typing import Dict, Any, Optional logger = logging.getLogger("gnss_guard.telegram") class TelegramAlert: """Handle Telegram notifications for GNSS Guard validation alerts.""" def __init__(self, bot_token: str, chat_id: str): """ Initialize Telegram bot. Args: bot_token: Telegram bot token (from BotFather) chat_id: Telegram chat/group ID (can be negative for groups) """ self.bot_token = bot_token self.chat_id = chat_id self.api_url = f"https://api.telegram.org/bot{bot_token}" @staticmethod def escape_html(text: str) -> str: """ Escape HTML special characters for Telegram HTML parsing. Args: text: Text to escape Returns: str: Escaped text safe for Telegram HTML """ text = str(text) text = text.replace('&', '&') text = text.replace('<', '<') text = text.replace('>', '>') return text def send_message(self, message: str, parse_mode: str = "HTML") -> bool: """ Send a message to Telegram. Args: message: Message text to send parse_mode: Message formatting (HTML or Markdown) Returns: bool: True if successful, False otherwise """ try: url = f"{self.api_url}/sendMessage" payload = { "chat_id": self.chat_id, "text": message, "parse_mode": parse_mode, "disable_web_page_preview": True } response = requests.post(url, json=payload, timeout=10) if response.status_code == 200: return True else: logger.error(f"Telegram API error: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Failed to send Telegram message: {e}") return False def send_validation_alert( self, validation_result: Dict[str, Any], asset_name: str, positions: Dict[str, Dict[str, Any]] ) -> bool: """ Send alert when GPS validation fails. Args: validation_result: Validation result dictionary from CoordinateValidator asset_name: Asset identifier from config positions: Dictionary of positions from all sources Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") validation_details = validation_result.get("validation_details", {}) threshold_meters = validation_details.get("threshold_meters", 0) max_distance_meters = validation_details.get("max_distance_meters", 0) # Build alert message message = ( f"šØ GNSS VALIDATION FAILED\n\n" f"š Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n\n" ) # Missing sources missing_sources = validation_result.get("sources_missing", []) if missing_sources: missing_list = ", ".join(missing_sources) message += ( f"ā Missing Sources: {self.escape_html(missing_list)}\n\n" ) # Stale sources stale_sources = validation_result.get("sources_stale", []) if stale_sources: stale_list = ", ".join(stale_sources) message += ( f"ā±ļø Stale Sources: {self.escape_html(stale_list)}\n" f" (Data older than {validation_details.get('stale_threshold_seconds', 60)}s)\n\n" ) # Distance differences coordinate_differences = validation_result.get("coordinate_differences", {}) if coordinate_differences: message += f"š Distance Differences:\n" for pair_key, diff_data in coordinate_differences.items(): source1 = diff_data.get("source1", "unknown") source2 = diff_data.get("source2", "unknown") distance_m = diff_data.get("distance_meters", 0) distance_km = distance_m / 1000.0 threshold_exceeded = "šØ" if distance_m > threshold_meters else "ā" message += ( f" {threshold_exceeded} {self.escape_html(source1)} ā {self.escape_html(source2)}: " f"{distance_m:.1f}m ({distance_km:.3f}km)\n" ) if max_distance_meters > threshold_meters: message += ( f"\nā ļø Threshold Exceeded: {max_distance_meters:.1f}m > {threshold_meters:.1f}m\n" ) message += "\n" # Source coordinates summary source_coordinates = validation_result.get("source_coordinates", {}) if source_coordinates: message += f"š Source Coordinates:\n" for source, coords in source_coordinates.items(): lat = coords.get("latitude", "N/A") lon = coords.get("longitude", "N/A") alt = coords.get("altitude") timestamp_str = coords.get("timestamp", "N/A") # Truncate timestamp for display if isinstance(timestamp_str, str) and len(timestamp_str) > 19: timestamp_str = timestamp_str[:19] + "Z" alt_str = f"{alt:.1f}m" if alt is not None else "N/A" message += ( f" ⢠{self.escape_html(source)}\n" f" Lat: {lat}, Lon: {lon}, Alt: {alt_str}\n" f" Time: {self.escape_html(str(timestamp_str))}\n\n" ) # Expected vs found sources expected_sources = validation_details.get("expected_sources", []) sources_found = validation_details.get("sources_found", []) if expected_sources: message += ( f"š Sources: {len(sources_found)}/{len(expected_sources)} found\n" f" Expected: {', '.join(expected_sources)}\n" f" Found: {', '.join(sources_found) if sources_found else 'None'}\n" ) return self.send_message(message) def send_validation_success( self, validation_result: Dict[str, Any], asset_name: str, positions: Dict[str, Dict[str, Any]] ) -> bool: """ Send notification when GPS validation passes (only if TELEGRAM_SEND_ALL=true). Args: validation_result: Validation result dictionary from CoordinateValidator asset_name: Asset identifier from config positions: Dictionary of positions from all sources Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") validation_details = validation_result.get("validation_details", {}) max_distance_meters = validation_details.get("max_distance_meters", 0) sources_found = validation_details.get("sources_found", []) # Build success message message = ( f"ā GNSS VALIDATION PASSED\n\n" f"š Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n\n" ) # Source coordinates summary source_coordinates = validation_result.get("source_coordinates", {}) if source_coordinates: message += f"š Source Coordinates:\n" for source, coords in source_coordinates.items(): lat = coords.get("latitude", "N/A") lon = coords.get("longitude", "N/A") alt = coords.get("altitude") alt_str = f"{alt:.1f}m" if alt is not None else "N/A" message += ( f" ⢠{self.escape_html(source)}: " f"{lat}, {lon}, Alt: {alt_str}\n" ) message += "\n" # Distance summary coordinate_differences = validation_result.get("coordinate_differences", {}) if coordinate_differences: message += f"š Max Distance Difference: {max_distance_meters:.1f}m\n\n" message += ( f"š Sources: {len(sources_found)} active\n" f" {', '.join(sources_found) if sources_found else 'None'}\n" ) return self.send_message(message) def send_error_alert( self, error_message: str, asset_name: str, error_details: Optional[str] = None ) -> bool: """ Send alert when there's an error during validation or data collection. Args: error_message: Main error message asset_name: Asset identifier from config error_details: Detailed error information Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") # Escape error message to prevent HTML parsing issues escaped_error = self.escape_html(error_message) message = ( f"š“ GNSS GUARD ERROR\n\n" f"š Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n" f"ā Error: {escaped_error}\n" ) if error_details: escaped_details = self.escape_html(error_details[:1000]) message += f"\nš Details:\n
{escaped_details}"
return self.send_message(message)
def send_state_change_alert(
self,
asset_name: str,
missing_added: set,
missing_removed: set,
stale_added: set,
stale_removed: set,
threshold_breached: bool,
threshold_was_breached: bool,
max_distance_meters: float,
threshold_meters: float,
source_coordinates: Dict[str, Any]
) -> bool:
"""
Send alert when validation state changes.
Args:
asset_name: Asset identifier from config
missing_added: Sources that became missing
missing_removed: Sources that recovered from missing
stale_added: Sources that became stale
stale_removed: Sources that recovered from stale
threshold_breached: Current threshold breach state
threshold_was_breached: Previous threshold breach state
max_distance_meters: Current max distance between sources
threshold_meters: Configured threshold
source_coordinates: Current source coordinates
Returns:
bool: True if successful
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC")
# Determine if this is a degradation or recovery
is_degradation = missing_added or stale_added or (threshold_breached and not threshold_was_breached)
is_recovery = missing_removed or stale_removed or (not threshold_breached and threshold_was_breached)
if is_degradation and not is_recovery:
emoji = "šØ"
title = "GNSS STATE DEGRADED"
elif is_recovery and not is_degradation:
emoji = "ā
"
title = "GNSS STATE RECOVERED"
else:
emoji = "ā ļø"
title = "GNSS STATE CHANGED"
message = (
f"{emoji} {title}\n\n"
f"š Asset: {self.escape_html(asset_name)}\n"
f"ā° Time: {timestamp}\n\n"
)
# Missing sources changes
if missing_added:
message += f"ā Sources now MISSING: {', '.join(sorted(missing_added))}\n"
if missing_removed:
message += f"ā
Sources RECOVERED (was missing): {', '.join(sorted(missing_removed))}\n"
# Stale sources changes
if stale_added:
message += f"ā±ļø Sources now STALE: {', '.join(sorted(stale_added))}\n"
if stale_removed:
message += f"ā
Sources RECOVERED (was stale): {', '.join(sorted(stale_removed))}\n"
# Threshold breach changes
if threshold_breached and not threshold_was_breached:
message += (
f"\nšØ DISTANCE THRESHOLD BREACHED!\n"
f" Max distance: {max_distance_meters:.1f}m (threshold: {threshold_meters:.1f}m)\n"
f" ā ļø Possible GPS jamming or spoofing!\n"
)
elif not threshold_breached and threshold_was_breached:
message += (
f"\nā
Distance threshold OK\n"
f" Max distance: {max_distance_meters:.1f}m (threshold: {threshold_meters:.1f}m)\n"
)
# Current coordinates summary
if source_coordinates:
message += f"\nš Current Coordinates:\n"
for source, coords in source_coordinates.items():
lat = coords.get("latitude", "N/A")
lon = coords.get("longitude", "N/A")
message += f" ⢠{self.escape_html(source)}: {lat}, {lon}\n"
return self.send_message(message)
def test_connection(self) -> bool:
"""
Test Telegram bot connection.
Returns:
bool: True if connection successful
"""
try:
url = f"{self.api_url}/getMe"
response = requests.get(url, timeout=10)
if response.status_code == 200:
bot_info = response.json()
logger.info(f"Telegram bot connected: @{bot_info['result']['username']}")
return True
else:
logger.error(f"Telegram connection failed: {response.status_code}")
return False
except Exception as e:
logger.error(f"Telegram connection error: {e}")
return False