#!/usr/bin/env python3 """ Telegram Alert Module for GNSS Guard Sends alerts to Telegram for GPS validation failures: - Distance differences exceeding threshold - Missing GPS sources - Stale GPS data """ import requests import logging from datetime import datetime from typing import Dict, Any, Optional logger = logging.getLogger("gnss_guard.telegram") class TelegramAlert: """Handle Telegram notifications for GNSS Guard validation alerts.""" def __init__(self, bot_token: str, chat_id: str): """ Initialize Telegram bot. Args: bot_token: Telegram bot token (from BotFather) chat_id: Telegram chat/group ID (can be negative for groups) """ self.bot_token = bot_token self.chat_id = chat_id self.api_url = f"https://api.telegram.org/bot{bot_token}" @staticmethod def escape_html(text: str) -> str: """ Escape HTML special characters for Telegram HTML parsing. Args: text: Text to escape Returns: str: Escaped text safe for Telegram HTML """ text = str(text) text = text.replace('&', '&') text = text.replace('<', '<') text = text.replace('>', '>') return text def send_message(self, message: str, parse_mode: str = "HTML") -> bool: """ Send a message to Telegram. Args: message: Message text to send parse_mode: Message formatting (HTML or Markdown) Returns: bool: True if successful, False otherwise """ try: url = f"{self.api_url}/sendMessage" payload = { "chat_id": self.chat_id, "text": message, "parse_mode": parse_mode, "disable_web_page_preview": True } response = requests.post(url, json=payload, timeout=10) if response.status_code == 200: return True else: logger.error(f"Telegram API error: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Failed to send Telegram message: {e}") return False def send_validation_alert( self, validation_result: Dict[str, Any], asset_name: str, positions: Dict[str, Dict[str, Any]] ) -> bool: """ Send alert when GPS validation fails. Args: validation_result: Validation result dictionary from CoordinateValidator asset_name: Asset identifier from config positions: Dictionary of positions from all sources Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") validation_details = validation_result.get("validation_details", {}) threshold_meters = validation_details.get("threshold_meters", 0) max_distance_meters = validation_details.get("max_distance_meters", 0) # Build alert message message = ( f"🚨 GNSS VALIDATION FAILED\n\n" f"šŸ“ Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n\n" ) # Missing sources missing_sources = validation_result.get("sources_missing", []) if missing_sources: missing_list = ", ".join(missing_sources) message += ( f"āŒ Missing Sources: {self.escape_html(missing_list)}\n\n" ) # Stale sources stale_sources = validation_result.get("sources_stale", []) if stale_sources: stale_list = ", ".join(stale_sources) message += ( f"ā±ļø Stale Sources: {self.escape_html(stale_list)}\n" f" (Data older than {validation_details.get('stale_threshold_seconds', 60)}s)\n\n" ) # Distance differences coordinate_differences = validation_result.get("coordinate_differences", {}) if coordinate_differences: message += f"šŸ“ Distance Differences:\n" for pair_key, diff_data in coordinate_differences.items(): source1 = diff_data.get("source1", "unknown") source2 = diff_data.get("source2", "unknown") distance_m = diff_data.get("distance_meters", 0) distance_km = distance_m / 1000.0 threshold_exceeded = "🚨" if distance_m > threshold_meters else "āœ“" message += ( f" {threshold_exceeded} {self.escape_html(source1)} ↔ {self.escape_html(source2)}: " f"{distance_m:.1f}m ({distance_km:.3f}km)\n" ) if max_distance_meters > threshold_meters: message += ( f"\nāš ļø Threshold Exceeded: {max_distance_meters:.1f}m > {threshold_meters:.1f}m\n" ) message += "\n" # Source coordinates summary source_coordinates = validation_result.get("source_coordinates", {}) if source_coordinates: message += f"šŸ“ Source Coordinates:\n" for source, coords in source_coordinates.items(): lat = coords.get("latitude", "N/A") lon = coords.get("longitude", "N/A") alt = coords.get("altitude") timestamp_str = coords.get("timestamp", "N/A") # Truncate timestamp for display if isinstance(timestamp_str, str) and len(timestamp_str) > 19: timestamp_str = timestamp_str[:19] + "Z" alt_str = f"{alt:.1f}m" if alt is not None else "N/A" message += ( f" • {self.escape_html(source)}\n" f" Lat: {lat}, Lon: {lon}, Alt: {alt_str}\n" f" Time: {self.escape_html(str(timestamp_str))}\n\n" ) # Expected vs found sources expected_sources = validation_details.get("expected_sources", []) sources_found = validation_details.get("sources_found", []) if expected_sources: message += ( f"šŸ“Š Sources: {len(sources_found)}/{len(expected_sources)} found\n" f" Expected: {', '.join(expected_sources)}\n" f" Found: {', '.join(sources_found) if sources_found else 'None'}\n" ) return self.send_message(message) def send_validation_success( self, validation_result: Dict[str, Any], asset_name: str, positions: Dict[str, Dict[str, Any]] ) -> bool: """ Send notification when GPS validation passes (only if TELEGRAM_SEND_ALL=true). Args: validation_result: Validation result dictionary from CoordinateValidator asset_name: Asset identifier from config positions: Dictionary of positions from all sources Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") validation_details = validation_result.get("validation_details", {}) max_distance_meters = validation_details.get("max_distance_meters", 0) sources_found = validation_details.get("sources_found", []) # Build success message message = ( f"āœ… GNSS VALIDATION PASSED\n\n" f"šŸ“ Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n\n" ) # Source coordinates summary source_coordinates = validation_result.get("source_coordinates", {}) if source_coordinates: message += f"šŸ“ Source Coordinates:\n" for source, coords in source_coordinates.items(): lat = coords.get("latitude", "N/A") lon = coords.get("longitude", "N/A") alt = coords.get("altitude") alt_str = f"{alt:.1f}m" if alt is not None else "N/A" message += ( f" • {self.escape_html(source)}: " f"{lat}, {lon}, Alt: {alt_str}\n" ) message += "\n" # Distance summary coordinate_differences = validation_result.get("coordinate_differences", {}) if coordinate_differences: message += f"šŸ“ Max Distance Difference: {max_distance_meters:.1f}m\n\n" message += ( f"šŸ“Š Sources: {len(sources_found)} active\n" f" {', '.join(sources_found) if sources_found else 'None'}\n" ) return self.send_message(message) def send_error_alert( self, error_message: str, asset_name: str, error_details: Optional[str] = None ) -> bool: """ Send alert when there's an error during validation or data collection. Args: error_message: Main error message asset_name: Asset identifier from config error_details: Detailed error information Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") # Escape error message to prevent HTML parsing issues escaped_error = self.escape_html(error_message) message = ( f"šŸ”“ GNSS GUARD ERROR\n\n" f"šŸ“ Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n" f"āŒ Error: {escaped_error}\n" ) if error_details: escaped_details = self.escape_html(error_details[:1000]) message += f"\nšŸ“‹ Details:\n
{escaped_details}
" return self.send_message(message) def send_state_change_alert( self, asset_name: str, missing_added: set, missing_removed: set, stale_added: set, stale_removed: set, threshold_breached: bool, threshold_was_breached: bool, max_distance_meters: float, threshold_meters: float, source_coordinates: Dict[str, Any] ) -> bool: """ Send alert when validation state changes. Args: asset_name: Asset identifier from config missing_added: Sources that became missing missing_removed: Sources that recovered from missing stale_added: Sources that became stale stale_removed: Sources that recovered from stale threshold_breached: Current threshold breach state threshold_was_breached: Previous threshold breach state max_distance_meters: Current max distance between sources threshold_meters: Configured threshold source_coordinates: Current source coordinates Returns: bool: True if successful """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S UTC") # Determine if this is a degradation or recovery is_degradation = missing_added or stale_added or (threshold_breached and not threshold_was_breached) is_recovery = missing_removed or stale_removed or (not threshold_breached and threshold_was_breached) if is_degradation and not is_recovery: emoji = "🚨" title = "GNSS STATE DEGRADED" elif is_recovery and not is_degradation: emoji = "āœ…" title = "GNSS STATE RECOVERED" else: emoji = "āš ļø" title = "GNSS STATE CHANGED" message = ( f"{emoji} {title}\n\n" f"šŸ“ Asset: {self.escape_html(asset_name)}\n" f"ā° Time: {timestamp}\n\n" ) # Missing sources changes if missing_added: message += f"āŒ Sources now MISSING: {', '.join(sorted(missing_added))}\n" if missing_removed: message += f"āœ… Sources RECOVERED (was missing): {', '.join(sorted(missing_removed))}\n" # Stale sources changes if stale_added: message += f"ā±ļø Sources now STALE: {', '.join(sorted(stale_added))}\n" if stale_removed: message += f"āœ… Sources RECOVERED (was stale): {', '.join(sorted(stale_removed))}\n" # Threshold breach changes if threshold_breached and not threshold_was_breached: message += ( f"\n🚨 DISTANCE THRESHOLD BREACHED!\n" f" Max distance: {max_distance_meters:.1f}m (threshold: {threshold_meters:.1f}m)\n" f" āš ļø Possible GPS jamming or spoofing!\n" ) elif not threshold_breached and threshold_was_breached: message += ( f"\nāœ… Distance threshold OK\n" f" Max distance: {max_distance_meters:.1f}m (threshold: {threshold_meters:.1f}m)\n" ) # Current coordinates summary if source_coordinates: message += f"\nšŸ“ Current Coordinates:\n" for source, coords in source_coordinates.items(): lat = coords.get("latitude", "N/A") lon = coords.get("longitude", "N/A") message += f" • {self.escape_html(source)}: {lat}, {lon}\n" return self.send_message(message) def test_connection(self) -> bool: """ Test Telegram bot connection. Returns: bool: True if connection successful """ try: url = f"{self.api_url}/getMe" response = requests.get(url, timeout=10) if response.status_code == 200: bot_info = response.json() logger.info(f"Telegram bot connected: @{bot_info['result']['username']}") return True else: logger.error(f"Telegram connection failed: {response.status_code}") return False except Exception as e: logger.error(f"Telegram connection error: {e}") return False