#!/usr/bin/env python3 """ Telegram Bot — SeaFare Montana maritime AI agent on Telegram. Polling-based, runs as standalone systemd service (seafare-telegram.service). Features: - Full AI agent access (26 tools) - Group chat support (@mention / reply / commands) - Vessel watch alerts (/watch, /unwatch, /watches) - Auto language detection (EN/RU/ES) Run: python telegram_bot.py """ import os import re import math import logging import threading import time from collections import defaultdict from datetime import datetime, timedelta import config logger = logging.getLogger('telegram_bot') # --------------------------------------------------------------------------- # CONFIGURATION # --------------------------------------------------------------------------- RATE_LIMIT_MAX = config.TELEGRAM_RATE_LIMIT_MAX RATE_LIMIT_PERIOD = config.TELEGRAM_RATE_LIMIT_PERIOD MAX_HISTORY = config.TELEGRAM_MAX_HISTORY POLL_TIMEOUT = config.TELEGRAM_POLL_TIMEOUT BOT_USERNAME = config.TELEGRAM_BOT_USERNAME WATCH_MAX = config.TELEGRAM_WATCH_MAX_PER_USER WATCH_CHECK_INTERVAL = config.TELEGRAM_WATCH_CHECK_INTERVAL WATCH_EXPIRY_HOURS = config.TELEGRAM_WATCH_EXPIRY_HOURS # --------------------------------------------------------------------------- # IN-MEMORY STATE (per-process) # --------------------------------------------------------------------------- _conversations = defaultdict(list) # chat_id -> [{role, message}] _user_langs = {} # chat_id -> 'en'|'ru'|'es' _linked_users = {} # chat_id -> user dict (cached) _rate_buckets = defaultdict(list) # rate_key -> [timestamp, ...] _watches = defaultdict(list) # chat_id -> [watch_dict, ...] _bot_user_id = None # set at startup via bot.get_me() # --------------------------------------------------------------------------- # i18n # --------------------------------------------------------------------------- _WELCOME = { 'en': ( "SeaFare Montana — Maritime freight operations specialist.\n\n" "Ask me anything about vessels, routes, freight rates, or sanctions.\n" "Just type naturally in English, Russian, or Spanish.\n\n" "Tap a button below to get started:" ), 'ru': ( "SeaFare Montana — Специалист по морским грузоперевозкам.\n\n" "Спросите о судах, маршрутах, фрахте или санкциях.\n" "Пишите на русском, английском или испанском.\n\n" "Нажмите кнопку чтобы начать:" ), 'es': ( "SeaFare Montana — Especialista en operaciones marítimas.\n\n" "Pregunte sobre buques, rutas, fletes o sanciones.\n" "Escribe en español, inglés o ruso.\n\n" "Pulse un botón para empezar:" ), } _HELP = { 'en': ( "Commands:\n" "/start — Welcome\n" "/help — This help\n" "/lang en|ru|es — Switch language\n" "/watch VESSEL to PORT — Alert on arrival\n" "/watch VESSEL status — Alert on status change\n" "/watches — List active watches\n" "/unwatch N — Remove watch #N\n\n" "Just type naturally:\n" '- "Track EVER GIVEN"\n' '- "Vessels near Rotterdam"\n' '- "Route from Singapore to Rotterdam"\n' '- "Freight rate 50k DWT bulk Shanghai to Santos"\n\n' "In groups: mention @seafare_montana_bot or reply to my message" ), 'ru': ( "Команды:\n" "/start — Приветствие\n" "/help — Эта справка\n" "/lang en|ru|es — Сменить язык\n" "/watch СУДНО to ПОРТ — Оповестить о приходе\n" "/watch СУДНО status — Оповестить о смене статуса\n" "/watches — Список отслеживаний\n" "/unwatch N — Удалить отслеживание #N\n\n" "Просто пишите:\n" '- "Найди EVER GIVEN"\n' '- "Суда в порту Роттердам"\n' '- "Маршрут из Сингапура в Роттердам"\n' '- "Фрахт 50т DWT балк Шанхай - Сантос"\n\n' "В группах: упомяните @seafare_montana_bot или ответьте на моё сообщение" ), 'es': ( "Comandos:\n" "/start — Bienvenida\n" "/help — Esta ayuda\n" "/lang en|ru|es — Cambiar idioma\n" "/watch BUQUE to PUERTO — Alerta de llegada\n" "/watch BUQUE status — Alerta de cambio de estado\n" "/watches — Lista de seguimientos\n" "/unwatch N — Eliminar seguimiento #N\n\n" "Escribe naturalmente:\n" '- "Rastrear EVER GIVEN"\n' '- "Buques cerca de Rotterdam"\n' '- "Ruta de Singapur a Rotterdam"\n' '- "Flete 50k DWT bulk Shanghai a Santos"\n\n' "En grupos: menciona @seafare_montana_bot o responde a mi mensaje" ), } _RATE_LIMITED = { 'en': "Too many messages. Please wait a moment.", 'ru': "Слишком много сообщений. Подождите немного.", 'es': "Demasiados mensajes. Espere un momento.", } _ERROR = { 'en': "Service is temporarily busy. Please try again.", 'ru': "Сервис временно занят. Попробуйте ещё раз.", 'es': "El servicio está temporalmente ocupado. Inténtelo de nuevo.", } # Hints shown when user taps an inline button (teaches by example) _HINTS = { 'hint_track': { 'en': 'Type a vessel name, IMO or MMSI. Example:\n\nTrack EVER GIVEN', 'ru': 'Введите имя судна, IMO или MMSI. Пример:\n\nНайди EVER GIVEN', 'es': 'Escriba nombre, IMO o MMSI. Ejemplo:\n\nRastrear EVER GIVEN', }, 'hint_nearby': { 'en': 'Type a port name. Example:\n\nVessels near Rotterdam', 'ru': 'Введите название порта. Пример:\n\nСуда в порту Роттердам', 'es': 'Escriba un puerto. Ejemplo:\n\nBuques cerca de Rotterdam', }, 'hint_route': { 'en': 'Example:\n\nRoute from Singapore to Rotterdam', 'ru': 'Пример:\n\nМаршрут из Сингапура в Роттердам', 'es': 'Ejemplo:\n\nRuta de Singapur a Rotterdam', }, 'hint_freight': { 'en': 'Example:\n\nFreight rate 50k DWT bulk Shanghai to Santos', 'ru': 'Пример:\n\nФрахт 50т DWT балк Шанхай - Сантос', 'es': 'Ejemplo:\n\nFlete 50k DWT bulk Shanghai a Santos', }, 'hint_watch': { 'en': 'Example:\n\n/watch EVER GIVEN to Rotterdam\n/watch 9811000 status', 'ru': 'Пример:\n\n/watch EVER GIVEN to Rotterdam\n/watch 9811000 status', 'es': 'Ejemplo:\n\n/watch EVER GIVEN to Rotterdam\n/watch 9811000 status', }, } # --------------------------------------------------------------------------- # TELEGRAM ACCOUNT LINKING (website-first model) # --------------------------------------------------------------------------- _REGISTER_FIRST = { 'en': ( "Welcome to SeaFare Montana!\n\n" "To use this bot, please register at:\n" "https://seafare.efir.org\n\n" "Then link your Telegram in Profile settings." ), 'ru': ( "Добро пожаловать в SeaFare Montana!\n\n" "Для работы с ботом зарегистрируйтесь на сайте:\n" "https://seafare.efir.org\n\n" "Затем привяжите Telegram в настройках профиля." ), 'es': ( "Bienvenido a SeaFare Montana!\n\n" "Para usar este bot, regístrese en:\n" "https://seafare.efir.org\n\n" "Luego vincule su Telegram en la configuración del perfil." ), } _LINK_SUCCESS = { 'en': "Account linked successfully!", 'ru': "Аккаунт успешно привязан!", 'es': "Cuenta vinculada exitosamente!", } _LINK_FAILED = { 'en': "Invalid or expired link. Please generate a new one on the website.", 'ru': "Недействительная или просроченная ссылка. Создайте новую на сайте.", 'es': "Enlace inválido o expirado. Genere uno nuevo en el sitio web.", } def _get_linked_user(chat_id: int): """Get website user linked to this chat_id (with cache).""" if chat_id in _linked_users: return _linked_users[chat_id] import maritime_db as mdb user = mdb.get_user_by_telegram_chat_id(chat_id) if user: _linked_users[chat_id] = user return user def _handle_start(bot, message): """Handle /start with optional deep-link payload.""" import maritime_db as mdb chat_id = message.chat.id text = (message.text or '').strip() # 1. Deep-link verification: /start link_XXXX if ' ' in text: payload = text.split(' ', 1)[1].strip() if payload.startswith('link_'): code = payload[5:] user = mdb.verify_telegram_link(code, chat_id) if user: _linked_users[chat_id] = user _user_langs[chat_id] = user.get('lang', 'en') lang = user.get('lang', 'en') bot.send_message(chat_id, f"{_LINK_SUCCESS.get(lang, _LINK_SUCCESS['en'])}", parse_mode='HTML') _send_lang_picker(bot, chat_id) else: bot.send_message(chat_id, _LINK_FAILED.get('en', _LINK_FAILED['en']), parse_mode='HTML') return # 2. Already linked? user = _get_linked_user(chat_id) if user: _user_langs[chat_id] = user.get('lang', 'en') _send_lang_picker(bot, chat_id) return # 3. Not linked — register first bot.send_message(chat_id, _REGISTER_FIRST.get('en', _REGISTER_FIRST['en']), parse_mode='HTML') # --------------------------------------------------------------------------- # LANGUAGE PICKER (first step on /start) # --------------------------------------------------------------------------- def _send_lang_picker(bot, chat_id: int): """Send language selection keyboard as first /start step.""" from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton kb = InlineKeyboardMarkup(row_width=3) kb.add( InlineKeyboardButton("English", callback_data="lang_en"), InlineKeyboardButton("Русский", callback_data="lang_ru"), InlineKeyboardButton("Español", callback_data="lang_es"), ) bot.send_message( chat_id, "SeaFare Montana\n\nChoose language / Выберите язык / Elija idioma:", parse_mode='HTML', reply_markup=kb, ) # --------------------------------------------------------------------------- # INLINE KEYBOARD (shown on /start) # --------------------------------------------------------------------------- def _start_keyboard(lang: str): """Build inline keyboard for /start message.""" from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, WebAppInfo kb = InlineKeyboardMarkup(row_width=2) # Mini App button (top row, full width) from config import APP_VERSION webapp_url = f'https://seafare.efir.org/miniapp?v={APP_VERSION}' app_label = {'en': 'Open Mini App', 'ru': 'Открыть приложение', 'es': 'Abrir aplicación'} kb.add(InlineKeyboardButton( app_label.get(lang, app_label['en']), web_app=WebAppInfo(url=webapp_url))) labels = { 'en': [("Track vessel", "hint_track"), ("Vessels nearby", "hint_nearby"), ("Route", "hint_route"), ("Freight rate", "hint_freight"), ("Watch vessel", "hint_watch"), ("Help", "hint_help")], 'ru': [("Найти судно", "hint_track"), ("Суда в порту", "hint_nearby"), ("Маршрут", "hint_route"), ("Фрахт", "hint_freight"), ("Следить за судном", "hint_watch"), ("Помощь", "hint_help")], 'es': [("Rastrear buque", "hint_track"), ("Buques en puerto", "hint_nearby"), ("Ruta", "hint_route"), ("Flete", "hint_freight"), ("Seguir buque", "hint_watch"), ("Ayuda", "hint_help")], } btns = labels.get(lang, labels['en']) for i in range(0, len(btns), 2): row = [InlineKeyboardButton(b[0], callback_data=b[1]) for b in btns[i:i+2]] kb.add(*row) return kb # --------------------------------------------------------------------------- # RATE LIMITING # --------------------------------------------------------------------------- def _check_rate_limit(rate_key: int) -> bool: """Returns True if request is allowed.""" now = time.time() bucket = _rate_buckets[rate_key] _rate_buckets[rate_key] = [t for t in bucket if now - t < RATE_LIMIT_PERIOD] if len(_rate_buckets[rate_key]) >= RATE_LIMIT_MAX: return False _rate_buckets[rate_key].append(now) return True def _get_rate_key(message) -> int: """Per-user rate limit in groups, per-chat in private.""" if message.chat.type in ('group', 'supergroup'): return message.from_user.id if message.from_user else message.chat.id return message.chat.id # --------------------------------------------------------------------------- # LANGUAGE DETECTION # --------------------------------------------------------------------------- _CYRILLIC_RE = re.compile(r'[а-яёА-ЯЁ]') _SPANISH_WORDS = {'hola', 'buenos', 'buenas', 'buscar', 'buque', 'puerto', 'flete'} def _detect_lang(text: str, chat_id: int) -> str: """Auto-detect language from message text. Remembers per chat.""" if _CYRILLIC_RE.search(text): _user_langs[chat_id] = 'ru' elif any(w in text.lower().split() for w in _SPANISH_WORDS): _user_langs[chat_id] = 'es' return _user_langs.get(chat_id, 'en') # --------------------------------------------------------------------------- # GROUP CHAT SUPPORT # --------------------------------------------------------------------------- def _should_respond(message) -> bool: """Determine if bot should respond to this message.""" # Always respond in private chats if message.chat.type == 'private': return True text = message.text or '' # /commands in groups if text.startswith('/'): return True # Reply to bot's own message if (message.reply_to_message and message.reply_to_message.from_user and _bot_user_id and message.reply_to_message.from_user.id == _bot_user_id): return True # @mention in entities if message.entities: for entity in message.entities: if entity.type == 'mention': mention = text[entity.offset:entity.offset + entity.length] if mention.lower() == f'@{BOT_USERNAME}'.lower(): return True elif entity.type == 'text_mention' and entity.user: if _bot_user_id and entity.user.id == _bot_user_id: return True # Plain text @mention fallback if f'@{BOT_USERNAME}'.lower() in text.lower(): return True return False def _clean_message_text(text: str) -> str: """Remove bot @mention from message text.""" return re.sub(rf'@{re.escape(BOT_USERNAME)}\s*', '', text, flags=re.IGNORECASE).strip() # --------------------------------------------------------------------------- # RESPONSE FORMATTING (Telegram-specific) # --------------------------------------------------------------------------- _SHOWMAP_RE = re.compile(r'\{\{SHOWMAP~([^~]+)~([^~]+)~([^~]+)~([^}]+)\}\}') def _format_for_telegram(text: str): """Convert agent response to Telegram HTML format. Returns: (html_text, [(lat, lon, label), ...]) """ locations = [] def _extract_map(match): lat_s, lon_s, _zoom, label = match.group(1), match.group(2), match.group(3), match.group(4) try: locations.append((float(lat_s), float(lon_s), label.strip())) except ValueError: pass return label.strip() text = _SHOWMAP_RE.sub(_extract_map, text) # Markdown → HTML (safe subset) text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'(?\1', text) text = re.sub(r'`([^`]+)`', r'\1', text) # Markdown tables →
 (Telegram has no table rendering)
    lines = text.split('\n')
    result = []
    table_lines = []
    in_table = False
    for line in lines:
        stripped = line.strip()
        if stripped.startswith('|') and stripped.endswith('|'):
            if re.match(r'^\|[\s\-:|]+\|$', stripped):
                continue  # skip separator row
            if not in_table:
                in_table = True
                table_lines = []
            table_lines.append(stripped)
        else:
            if in_table:
                result.append('
' + '\n'.join(table_lines) + '
') in_table = False table_lines = [] result.append(line) if in_table: result.append('
' + '\n'.join(table_lines) + '
') text = '\n'.join(result) # Telegram message limit if len(text) > 4000: text = text[:3990] + '\n\n... (truncated)' return text, locations # --------------------------------------------------------------------------- # VESSEL WATCH — COMMANDS # --------------------------------------------------------------------------- def _cmd_watch(bot, message, text: str, lang: str): """Handle /watch command. Creates arrival or status watch.""" import maritime_db as db from marinetraffic_parser import resolve_port chat_id = message.chat.id # Check watch limit active = _watches.get(chat_id, []) active = [w for w in active if not w.get('notified')] if len(active) >= WATCH_MAX: msgs = { 'en': f"Maximum {WATCH_MAX} active watches. Use /unwatch to remove one.", 'ru': f"Максимум {WATCH_MAX} активных отслеживаний. Используйте /unwatch.", 'es': f"Máximo {WATCH_MAX} seguimientos activos. Use /unwatch para eliminar uno.", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) return # Parse: /watch VESSEL to PORT or /watch VESSEL status parts = text.split(maxsplit=1) if len(parts) < 2: msgs = { 'en': "Usage:\n/watch VESSEL to PORT — arrival alert\n/watch VESSEL status — status change alert", 'ru': "Использование:\n/watch СУДНО to ПОРТ — оповещение о приходе\n/watch СУДНО status — смена статуса", 'es': "Uso:\n/watch BUQUE to PUERTO — alerta de llegada\n/watch BUQUE status — cambio de estado", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) return args = parts[1].strip() # Determine watch type watch_type = None vessel_query = None port_query = None if args.lower().endswith(' status'): watch_type = 'status' vessel_query = args[:-7].strip() elif ' to ' in args.lower(): watch_type = 'arrival' idx = args.lower().index(' to ') vessel_query = args[:idx].strip() port_query = args[idx + 4:].strip() else: msgs = { 'en': "Use: /watch VESSEL to PORT or /watch VESSEL status", 'ru': "Формат: /watch СУДНО to ПОРТ или /watch СУДНО status", 'es': "Formato: /watch BUQUE to PUERTO o /watch BUQUE status", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) return if not vessel_query: bot.send_message(chat_id, "Vessel name required.") return bot.send_chat_action(chat_id, 'typing') # Resolve vessel vessel = None vessels = db.search_vessels(vessel_query, limit=1) if vessels: vessel = vessels[0] else: # Try live lookup try: from ais_provider import get_provider pos = get_provider().get_vessel_position(name=vessel_query) if pos and pos.get('mmsi'): vessel = db.get_vessel(mmsi=pos['mmsi']) if not vessel: vessel = {'mmsi': pos['mmsi'], 'name': vessel_query.upper()} except Exception as e: print(f"[Telegram] Watch vessel lookup error: {e}") if not vessel or not vessel.get('mmsi'): msgs = { 'en': f"Vessel not found: {vessel_query}. Try exact name, IMO, or MMSI.", 'ru': f"Судно не найдено: {vessel_query}. Попробуйте точное имя, IMO или MMSI.", 'es': f"Buque no encontrado: {vessel_query}. Pruebe nombre exacto, IMO o MMSI.", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) return mmsi = vessel['mmsi'] vessel_name = vessel.get('name', vessel_query.upper()) # For arrival watches: resolve port dest_port = None dest_lat = None dest_lon = None port_radius = 15.0 if watch_type == 'arrival': if not port_query: bot.send_message(chat_id, "Port name required after 'to'.") return port = resolve_port(port_query) if not port: msgs = { 'en': f"Port not found: {port_query}", 'ru': f"Порт не найден: {port_query}", 'es': f"Puerto no encontrado: {port_query}", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) return dest_port = port.get('name', port_query) dest_lat = port['lat'] dest_lon = port['lon'] port_radius = port.get('radius_nm', 15.0) # For status watches: get current status last_status = None if watch_type == 'status': pos = db.get_last_position(mmsi) if pos: last_status = pos.get('status') # Save to DB watch_id = db.add_vessel_watch( chat_id=chat_id, mmsi=mmsi, vessel_name=vessel_name, watch_type=watch_type, destination_port=dest_port, destination_lat=dest_lat, destination_lon=dest_lon, port_radius_nm=port_radius, last_status=last_status ) # Save to memory watch_dict = { 'id': watch_id, 'mmsi': mmsi, 'vessel_name': vessel_name, 'watch_type': watch_type, 'destination_port': dest_port, 'dest_lat': dest_lat, 'dest_lon': dest_lon, 'port_radius_nm': port_radius, 'last_status': last_status, 'last_lat': None, 'last_lon': None, 'notified': False, 'created_at': time.time(), } _watches[chat_id].append(watch_dict) # Subscribe to AISStream for fresh data try: from ais_provider import get_provider prov = get_provider() if hasattr(prov, 'aisstream') and prov.aisstream: prov.aisstream.subscribe_mmsi(mmsi) except Exception: pass # Confirm if watch_type == 'arrival': msgs = { 'en': f"Watching {vessel_name} — will notify when it arrives at {dest_port}.", 'ru': f"Отслеживаю {vessel_name} — оповещу когда придёт в {dest_port}.", 'es': f"Siguiendo {vessel_name} — notificaré cuando llegue a {dest_port}.", } else: status_str = last_status or 'unknown' msgs = { 'en': f"Watching {vessel_name} status (current: {status_str}).", 'ru': f"Отслеживаю статус {vessel_name} (текущий: {status_str}).", 'es': f"Siguiendo estado de {vessel_name} (actual: {status_str}).", } bot.send_message(chat_id, msgs.get(lang, msgs['en']), parse_mode='HTML') def _cmd_watches(bot, message, lang: str): """Handle /watches — list active watches.""" chat_id = message.chat.id active = [w for w in _watches.get(chat_id, []) if not w.get('notified')] if not active: msgs = { 'en': "No active watches. Use /watch to create one.", 'ru': "Нет активных отслеживаний. Используйте /watch.", 'es': "Sin seguimientos activos. Use /watch para crear uno.", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) return headers = {'en': 'Active watches:', 'ru': 'Отслеживания:', 'es': 'Seguimientos:'} lines = [headers.get(lang, headers['en'])] for i, w in enumerate(active, 1): if w['watch_type'] == 'arrival': lines.append(f"{i}. {w['vessel_name']} → {w['destination_port']}") else: lines.append(f"{i}. {w['vessel_name']} (status)") bot.send_message(chat_id, '\n'.join(lines), parse_mode='HTML') def _cmd_unwatch(bot, message, text: str, lang: str): """Handle /unwatch N — remove watch by number.""" import maritime_db as db chat_id = message.chat.id active = [w for w in _watches.get(chat_id, []) if not w.get('notified')] parts = text.split() if len(parts) < 2: if not active: msgs = {'en': "No active watches.", 'ru': "Нет отслеживаний.", 'es': "Sin seguimientos."} bot.send_message(chat_id, msgs.get(lang, msgs['en'])) else: _cmd_watches(bot, message, lang) bot.send_message(chat_id, "Usage: /unwatch N") return try: idx = int(parts[1]) - 1 except ValueError: bot.send_message(chat_id, "Usage: /unwatch N (number)") return if idx < 0 or idx >= len(active): bot.send_message(chat_id, f"Invalid number. You have {len(active)} watches.") return watch = active[idx] db.delete_watch(watch['id'], chat_id) _watches[chat_id] = [w for w in _watches[chat_id] if w['id'] != watch['id']] msgs = { 'en': f"Removed: {watch['vessel_name']}", 'ru': f"Удалено: {watch['vessel_name']}", 'es': f"Eliminado: {watch['vessel_name']}", } bot.send_message(chat_id, msgs.get(lang, msgs['en'])) # --------------------------------------------------------------------------- # VESSEL WATCH — BACKGROUND CHECKER # --------------------------------------------------------------------------- def _load_watches_from_db(): """Load active watches from DB into memory (called at startup).""" try: import maritime_db as db all_watches = db.get_active_watches() for w in all_watches: watch_dict = { 'id': w['id'], 'mmsi': w['mmsi'], 'vessel_name': w.get('vessel_name', ''), 'watch_type': w['watch_type'], 'destination_port': w.get('destination_port'), 'dest_lat': w.get('destination_lat'), 'dest_lon': w.get('destination_lon'), 'port_radius_nm': w.get('port_radius_nm', 15), 'last_status': w.get('last_status'), 'last_lat': w.get('last_lat'), 'last_lon': w.get('last_lon'), 'notified': False, 'created_at': time.time(), } _watches[w['chat_id']].append(watch_dict) total = sum(len(v) for v in _watches.values()) print(f"[Telegram] Loaded {total} active watches from DB") except Exception as e: print(f"[Telegram] Failed to load watches: {e}") def _subscribe_watched_vessels(): """Subscribe all watched MMSIs to AISStream.""" try: from ais_provider import get_provider prov = get_provider() if not (hasattr(prov, 'aisstream') and prov.aisstream): return mmsis = set() for watches in _watches.values(): for w in watches: mmsis.add(w['mmsi']) for mmsi in mmsis: prov.aisstream.subscribe_mmsi(mmsi) if mmsis: print(f"[Telegram] Subscribed {len(mmsis)} watched MMSIs to AISStream") except Exception as e: print(f"[Telegram] AISStream subscribe error: {e}") def _calc_distance_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Simple flat-earth distance in nautical miles.""" dlat = abs(lat2 - lat1) * 60 dlon = abs(lon2 - lon1) * 60 * max(math.cos(math.radians(lat1)), 0.01) return math.sqrt(dlat ** 2 + dlon ** 2) def _check_arrival(bot, chat_id: int, watch: dict, pos: dict): """Check if vessel has arrived at destination port.""" lat = pos.get('latitude') lon = pos.get('longitude') if lat is None or lon is None or watch.get('dest_lat') is None: return dist = _calc_distance_nm(lat, lon, watch['dest_lat'], watch['dest_lon']) if dist <= watch.get('port_radius_nm', 15): lang = _user_langs.get(chat_id, 'en') msgs = { 'en': f"Vessel Alert: {watch['vessel_name']} arrived at {watch['destination_port']} ({dist:.1f} NM)", 'ru': f"Оповещение: {watch['vessel_name']} прибыл в {watch['destination_port']} ({dist:.1f} NM)", 'es': f"Alerta: {watch['vessel_name']} llegó a {watch['destination_port']} ({dist:.1f} NM)", } try: from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton kb = InlineKeyboardMarkup() kb.add(InlineKeyboardButton( f"\U0001F4CD {watch['vessel_name']}", url=f"https://www.google.com/maps?q={lat},{lon}" )) bot.send_message(chat_id, msgs.get(lang, msgs['en']), parse_mode='HTML', reply_markup=kb) except Exception as e: print(f"[Telegram] Arrival notification failed: {e}") watch['notified'] = True try: import maritime_db as db db.mark_watch_notified(watch['id']) except Exception: pass def _check_status_change(bot, chat_id: int, watch: dict, pos: dict): """Check if vessel nav status has changed.""" current_status = pos.get('status') if not current_status or not watch.get('last_status'): if current_status and not watch.get('last_status'): watch['last_status'] = current_status return if current_status != watch['last_status']: lang = _user_langs.get(chat_id, 'en') old = watch['last_status'] msgs = { 'en': f"Status Alert: {watch['vessel_name']} changed: {old} → {current_status}", 'ru': f"Статус: {watch['vessel_name']} изменился: {old} → {current_status}", 'es': f"Estado: {watch['vessel_name']} cambió: {old} → {current_status}", } try: reply_kb = None lat = pos.get('latitude') lon = pos.get('longitude') if lat and lon: from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton reply_kb = InlineKeyboardMarkup() reply_kb.add(InlineKeyboardButton( f"\U0001F4CD {watch['vessel_name']}", url=f"https://www.google.com/maps?q={lat},{lon}" )) bot.send_message(chat_id, msgs.get(lang, msgs['en']), parse_mode='HTML', reply_markup=reply_kb) except Exception as e: print(f"[Telegram] Status notification failed: {e}") watch['last_status'] = current_status try: import maritime_db as db db.update_watch_position( watch['id'], last_status=current_status, last_lat=pos.get('latitude'), last_lon=pos.get('longitude') ) except Exception: pass def _watch_checker_loop(bot): """Background thread: check all active watches periodically.""" import maritime_db as db print(f"[Telegram] Watch checker started (interval={WATCH_CHECK_INTERVAL}s)") while True: time.sleep(WATCH_CHECK_INTERVAL) try: checked = 0 expired = 0 now = time.time() for chat_id in list(_watches.keys()): watches = _watches[chat_id] for watch in watches: if watch.get('notified'): continue # Auto-expire old watches age_hours = (now - watch.get('created_at', now)) / 3600 if age_hours > WATCH_EXPIRY_HOURS: watch['notified'] = True try: db.mark_watch_notified(watch['id']) except Exception: pass expired += 1 continue pos = db.get_last_position(watch['mmsi']) if not pos: continue # Update position in watch watch['last_lat'] = pos.get('latitude') watch['last_lon'] = pos.get('longitude') if watch['watch_type'] == 'arrival': _check_arrival(bot, chat_id, watch, pos) elif watch['watch_type'] == 'status': _check_status_change(bot, chat_id, watch, pos) checked += 1 # Cleanup notified watches from memory _watches[chat_id] = [w for w in watches if not w.get('notified')] if checked > 0 or expired > 0: print(f"[Telegram] Watch check: {checked} checked, {expired} expired") except Exception as e: print(f"[Telegram] Watch checker error: {e}") # --------------------------------------------------------------------------- # MESSAGE HANDLER # --------------------------------------------------------------------------- def _handle_message(bot, message): """Process incoming Telegram message (private + group).""" # Group filtering if not _should_respond(message): return chat_id = message.chat.id text = (message.text or '').strip() if not text: return is_group = message.chat.type in ('group', 'supergroup') # Clean @mention from text in groups if is_group: text = _clean_message_text(text) if not text: return # --- Commands --- if text.startswith('/start'): _handle_start(bot, message) return if text.startswith('/help'): lang = _user_langs.get(chat_id, 'en') bot.send_message(chat_id, _HELP.get(lang, _HELP['en']), parse_mode='HTML') return if text.startswith('/lang'): parts = text.split() if len(parts) >= 2 and parts[1] in ('en', 'ru', 'es'): _user_langs[chat_id] = parts[1] ack = {'en': 'Language set to English', 'ru': 'Язык: Русский', 'es': 'Idioma: Español'} bot.send_message(chat_id, ack[parts[1]]) else: bot.send_message(chat_id, 'Usage: /lang en|ru|es') return lang = _detect_lang(text, chat_id) if text.startswith('/watch'): _cmd_watch(bot, message, text, lang) return if text.startswith('/watches'): _cmd_watches(bot, message, lang) return if text.startswith('/unwatch'): _cmd_unwatch(bot, message, text, lang) return # --- Check account linked (private chats only) --- user = _get_linked_user(chat_id) if not is_group else None if not is_group and not user: bot.send_message(chat_id, _REGISTER_FIRST.get(lang, _REGISTER_FIRST['en']), parse_mode='HTML') return # --- Rate limit --- rate_key = _get_rate_key(message) if not _check_rate_limit(rate_key): bot.send_message(chat_id, _RATE_LIMITED.get(lang, _RATE_LIMITED['en'])) return # --- Typing indicator --- bot.send_chat_action(chat_id, 'typing') # --- Build user context for agent --- user_context = None user_id = None is_admin = False if user: user_id = user.get('id') is_admin = bool(user.get('is_admin')) try: import maritime_db as mdb user_context = mdb.get_profile_summary(user['id']) # Load DB history on first message (cross-client sync) if not _conversations[chat_id] and user_id: db_hist = mdb.get_chat_history(user_id, limit=10) for msg in db_hist: _conversations[chat_id].append({ 'role': msg['role'], 'message': msg['message'] }) except Exception: pass # --- Build conversation history --- history = _conversations[chat_id][-(MAX_HISTORY * 2):] # --- Call agent --- try: from seafare_agent import generate_response response = generate_response( user_message=text, conversation_history=history, user_context=user_context, is_admin=is_admin, user_id=user_id, lang=lang, ) except Exception as e: print(f"[Telegram] Agent error chat={chat_id}: {e}") response = _ERROR.get(lang, _ERROR['en']) # --- Save to in-memory history --- if is_group and message.from_user: sender = message.from_user.first_name or 'User' history_text = f"[{sender}]: {text}" else: history_text = text _conversations[chat_id].append({'role': 'user', 'message': history_text}) _conversations[chat_id].append({'role': 'assistant', 'message': response}) if len(_conversations[chat_id]) > MAX_HISTORY * 2: _conversations[chat_id] = _conversations[chat_id][-(MAX_HISTORY * 2):] # Save to DB for cross-client sync (web ↔ miniapp ↔ telegram) if user_id: try: import maritime_db as mdb mdb.add_chat_message(user_id, 'user', text) mdb.add_chat_message(user_id, 'assistant', response) except Exception: pass # --- Format and send --- formatted_text, locations = _format_for_telegram(response) # Build inline keyboard with map links (instead of separate location messages) reply_markup = None if locations: from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton kb = InlineKeyboardMarkup(row_width=1) for lat, lon, label in locations[:3]: short = label[:30] if len(label) > 30 else label url = f"https://www.google.com/maps?q={lat},{lon}" kb.add(InlineKeyboardButton(f"\U0001F4CD {short}", url=url)) reply_markup = kb try: bot.send_message(chat_id, formatted_text, parse_mode='HTML', disable_web_page_preview=True, reply_markup=reply_markup) except Exception as e: print(f"[Telegram] HTML send failed: {e}") try: bot.send_message(chat_id, response[:4000]) except Exception as e2: print(f"[Telegram] Plain send failed: {e2}") # --------------------------------------------------------------------------- # BOT POLLING LOOP # --------------------------------------------------------------------------- def _bot_polling(token: str): """Blocking polling loop. Runs as main process or daemon thread.""" global _bot_user_id import telebot bot = telebot.TeleBot(token, parse_mode=None) # Get bot user ID for group reply detection try: me = bot.get_me() _bot_user_id = me.id print(f"[Telegram] Bot identity: @{me.username} (id={me.id})") except Exception as e: print(f"[Telegram] get_me() failed: {e}") # Set Mini App menu button try: from telebot.types import MenuButtonWebApp, WebAppInfo from config import APP_VERSION webapp_url = f'https://seafare.efir.org/miniapp?v={APP_VERSION}' bot.set_chat_menu_button(menu_button=MenuButtonWebApp( type='web_app', text='SeaFare', web_app=WebAppInfo(url=webapp_url))) print(f"[Telegram] Menu button set → {webapp_url}") except Exception as e: print(f"[Telegram] Menu button error: {e}") # Load watches from DB _load_watches_from_db() _subscribe_watched_vessels() # Start watch checker thread checker = threading.Thread(target=_watch_checker_loop, args=(bot,), daemon=True, name='watch-checker') checker.start() @bot.callback_query_handler(func=lambda call: True) def on_callback(call): try: chat_id = call.message.chat.id data = call.data # Language selection from /start if data.startswith('lang_'): lang_code = data[5:] # 'en', 'ru', 'es' if lang_code in ('en', 'ru', 'es'): _user_langs[chat_id] = lang_code kb = _start_keyboard(lang_code) bot.send_message(chat_id, _WELCOME.get(lang_code, _WELCOME['en']), parse_mode='HTML', reply_markup=kb) bot.answer_callback_query(call.id) return lang = _user_langs.get(chat_id, 'en') if data == 'hint_help': bot.send_message(chat_id, _HELP.get(lang, _HELP['en']), parse_mode='HTML') elif data in _HINTS: hint = _HINTS[data] bot.send_message(chat_id, hint.get(lang, hint['en']), parse_mode='HTML') bot.answer_callback_query(call.id) except Exception as e: print(f"[Telegram] Callback error: {e}") @bot.message_handler(func=lambda m: True) def on_message(message): try: _handle_message(bot, message) except Exception as e: print(f"[Telegram] Handler error: {e}") print("[Telegram] Bot polling started") while True: try: bot.polling( non_stop=True, interval=1, timeout=POLL_TIMEOUT, long_polling_timeout=POLL_TIMEOUT, ) except Exception as e: print(f"[Telegram] Polling error: {e}, restarting in 10s...") time.sleep(10) # --------------------------------------------------------------------------- # STARTUP (called from seafare_api.py or standalone) # --------------------------------------------------------------------------- def start_telegram_bot(): """Start Telegram bot as background daemon thread.""" if os.environ.get('_TELEGRAM_STARTED'): print("[Telegram] Bot already started — skipping") return os.environ['_TELEGRAM_STARTED'] = '1' token = os.environ.get(config.ENV_TELEGRAM_BOT_TOKEN) if not token: print("[Telegram] TELEGRAM_BOT_TOKEN not set — bot disabled") return anthropic_key = os.environ.get(config.ENV_ANTHROPIC_API_KEY) if not anthropic_key: print("[Telegram] ANTHROPIC_API_KEY not set — bot disabled") return t = threading.Thread(target=_bot_polling, args=(token,), daemon=True) t.start() print("[Telegram] Bot thread started (polling mode)") # --------------------------------------------------------------------------- # STANDALONE # --------------------------------------------------------------------------- if __name__ == '__main__': from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(name)s] %(levelname)s: %(message)s' ) token = os.environ.get(config.ENV_TELEGRAM_BOT_TOKEN) if not token: print("Set TELEGRAM_BOT_TOKEN in .env") exit(1) print("[Telegram] Starting bot in standalone mode...") _bot_polling(token)