montana/Русский/Логистика/telegram_bot.py

1158 lines
44 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Telegram Bot SeaFare Montana maritime AI agent on Telegram.
Polling-based, runs as standalone systemd service (seafare-telegram.service).
Features:
- Full AI agent access (26 tools)
- Group chat support (@mention / reply / commands)
- Vessel watch alerts (/watch, /unwatch, /watches)
- Auto language detection (EN/RU/ES)
Run: python telegram_bot.py
"""
import os
import re
import math
import logging
import threading
import time
from collections import defaultdict
from datetime import datetime, timedelta
import config
logger = logging.getLogger('telegram_bot')
# ---------------------------------------------------------------------------
# CONFIGURATION
# ---------------------------------------------------------------------------
RATE_LIMIT_MAX = config.TELEGRAM_RATE_LIMIT_MAX
RATE_LIMIT_PERIOD = config.TELEGRAM_RATE_LIMIT_PERIOD
MAX_HISTORY = config.TELEGRAM_MAX_HISTORY
POLL_TIMEOUT = config.TELEGRAM_POLL_TIMEOUT
BOT_USERNAME = config.TELEGRAM_BOT_USERNAME
WATCH_MAX = config.TELEGRAM_WATCH_MAX_PER_USER
WATCH_CHECK_INTERVAL = config.TELEGRAM_WATCH_CHECK_INTERVAL
WATCH_EXPIRY_HOURS = config.TELEGRAM_WATCH_EXPIRY_HOURS
# ---------------------------------------------------------------------------
# IN-MEMORY STATE (per-process)
# ---------------------------------------------------------------------------
_conversations = defaultdict(list) # chat_id -> [{role, message}]
_user_langs = {} # chat_id -> 'en'|'ru'|'es'
_linked_users = {} # chat_id -> user dict (cached)
_rate_buckets = defaultdict(list) # rate_key -> [timestamp, ...]
_watches = defaultdict(list) # chat_id -> [watch_dict, ...]
_bot_user_id = None # set at startup via bot.get_me()
# ---------------------------------------------------------------------------
# i18n
# ---------------------------------------------------------------------------
_WELCOME = {
'en': (
"<b>SeaFare Montana</b> — Maritime freight operations specialist.\n\n"
"Ask me anything about vessels, routes, freight rates, or sanctions.\n"
"Just type naturally in English, Russian, or Spanish.\n\n"
"Tap a button below to get started:"
),
'ru': (
"<b>SeaFare Montana</b> — Специалист по морским грузоперевозкам.\n\n"
"Спросите о судах, маршрутах, фрахте или санкциях.\n"
"Пишите на русском, английском или испанском.\n\n"
"Нажмите кнопку чтобы начать:"
),
'es': (
"<b>SeaFare Montana</b> — Especialista en operaciones marítimas.\n\n"
"Pregunte sobre buques, rutas, fletes o sanciones.\n"
"Escribe en español, inglés o ruso.\n\n"
"Pulse un botón para empezar:"
),
}
_HELP = {
'en': (
"<b>Commands:</b>\n"
"/start — Welcome\n"
"/help — This help\n"
"/lang en|ru|es — Switch language\n"
"/watch VESSEL to PORT — Alert on arrival\n"
"/watch VESSEL status — Alert on status change\n"
"/watches — List active watches\n"
"/unwatch N — Remove watch #N\n\n"
"<b>Just type naturally:</b>\n"
'- "Track EVER GIVEN"\n'
'- "Vessels near Rotterdam"\n'
'- "Route from Singapore to Rotterdam"\n'
'- "Freight rate 50k DWT bulk Shanghai to Santos"\n\n'
"<b>In groups:</b> mention @seafare_montana_bot or reply to my message"
),
'ru': (
"<b>Команды:</b>\n"
"/start — Приветствие\n"
"/help — Эта справка\n"
"/lang en|ru|es — Сменить язык\n"
"/watch СУДНО to ПОРТ — Оповестить о приходе\n"
"/watch СУДНО status — Оповестить о смене статуса\n"
"/watches — Список отслеживаний\n"
"/unwatch N — Удалить отслеживание #N\n\n"
"<b>Просто пишите:</b>\n"
'- "Найди EVER GIVEN"\n'
'- "Суда в порту Роттердам"\n'
'- "Маршрут из Сингапура в Роттердам"\n'
'- "Фрахт 50т DWT балк Шанхай - Сантос"\n\n'
"<b>В группах:</b> упомяните @seafare_montana_bot или ответьте на моё сообщение"
),
'es': (
"<b>Comandos:</b>\n"
"/start — Bienvenida\n"
"/help — Esta ayuda\n"
"/lang en|ru|es — Cambiar idioma\n"
"/watch BUQUE to PUERTO — Alerta de llegada\n"
"/watch BUQUE status — Alerta de cambio de estado\n"
"/watches — Lista de seguimientos\n"
"/unwatch N — Eliminar seguimiento #N\n\n"
"<b>Escribe naturalmente:</b>\n"
'- "Rastrear EVER GIVEN"\n'
'- "Buques cerca de Rotterdam"\n'
'- "Ruta de Singapur a Rotterdam"\n'
'- "Flete 50k DWT bulk Shanghai a Santos"\n\n'
"<b>En grupos:</b> menciona @seafare_montana_bot o responde a mi mensaje"
),
}
_RATE_LIMITED = {
'en': "Too many messages. Please wait a moment.",
'ru': "Слишком много сообщений. Подождите немного.",
'es': "Demasiados mensajes. Espere un momento.",
}
_ERROR = {
'en': "Service is temporarily busy. Please try again.",
'ru': "Сервис временно занят. Попробуйте ещё раз.",
'es': "El servicio está temporalmente ocupado. Inténtelo de nuevo.",
}
# Hints shown when user taps an inline button (teaches by example)
_HINTS = {
'hint_track': {
'en': 'Type a vessel name, IMO or MMSI. Example:\n\n<code>Track EVER GIVEN</code>',
'ru': 'Введите имя судна, IMO или MMSI. Пример:\n\n<code>Найди EVER GIVEN</code>',
'es': 'Escriba nombre, IMO o MMSI. Ejemplo:\n\n<code>Rastrear EVER GIVEN</code>',
},
'hint_nearby': {
'en': 'Type a port name. Example:\n\n<code>Vessels near Rotterdam</code>',
'ru': 'Введите название порта. Пример:\n\n<code>Суда в порту Роттердам</code>',
'es': 'Escriba un puerto. Ejemplo:\n\n<code>Buques cerca de Rotterdam</code>',
},
'hint_route': {
'en': 'Example:\n\n<code>Route from Singapore to Rotterdam</code>',
'ru': 'Пример:\n\n<code>Маршрут из Сингапура в Роттердам</code>',
'es': 'Ejemplo:\n\n<code>Ruta de Singapur a Rotterdam</code>',
},
'hint_freight': {
'en': 'Example:\n\n<code>Freight rate 50k DWT bulk Shanghai to Santos</code>',
'ru': 'Пример:\n\n<code>Фрахт 50т DWT балк Шанхай - Сантос</code>',
'es': 'Ejemplo:\n\n<code>Flete 50k DWT bulk Shanghai a Santos</code>',
},
'hint_watch': {
'en': 'Example:\n\n<code>/watch EVER GIVEN to Rotterdam</code>\n<code>/watch 9811000 status</code>',
'ru': 'Пример:\n\n<code>/watch EVER GIVEN to Rotterdam</code>\n<code>/watch 9811000 status</code>',
'es': 'Ejemplo:\n\n<code>/watch EVER GIVEN to Rotterdam</code>\n<code>/watch 9811000 status</code>',
},
}
# ---------------------------------------------------------------------------
# TELEGRAM ACCOUNT LINKING (website-first model)
# ---------------------------------------------------------------------------
_REGISTER_FIRST = {
'en': (
"Welcome to <b>SeaFare Montana</b>!\n\n"
"To use this bot, please register at:\n"
"https://seafare.efir.org\n\n"
"Then link your Telegram in Profile settings."
),
'ru': (
"Добро пожаловать в <b>SeaFare Montana</b>!\n\n"
"Для работы с ботом зарегистрируйтесь на сайте:\n"
"https://seafare.efir.org\n\n"
"Затем привяжите Telegram в настройках профиля."
),
'es': (
"Bienvenido a <b>SeaFare Montana</b>!\n\n"
"Para usar este bot, regístrese en:\n"
"https://seafare.efir.org\n\n"
"Luego vincule su Telegram en la configuración del perfil."
),
}
_LINK_SUCCESS = {
'en': "Account linked successfully!",
'ru': "Аккаунт успешно привязан!",
'es': "Cuenta vinculada exitosamente!",
}
_LINK_FAILED = {
'en': "Invalid or expired link. Please generate a new one on the website.",
'ru': "Недействительная или просроченная ссылка. Создайте новую на сайте.",
'es': "Enlace inválido o expirado. Genere uno nuevo en el sitio web.",
}
def _get_linked_user(chat_id: int):
"""Get website user linked to this chat_id (with cache)."""
if chat_id in _linked_users:
return _linked_users[chat_id]
import maritime_db as mdb
user = mdb.get_user_by_telegram_chat_id(chat_id)
if user:
_linked_users[chat_id] = user
return user
def _handle_start(bot, message):
"""Handle /start with optional deep-link payload."""
import maritime_db as mdb
chat_id = message.chat.id
text = (message.text or '').strip()
# 1. Deep-link verification: /start link_XXXX
if ' ' in text:
payload = text.split(' ', 1)[1].strip()
if payload.startswith('link_'):
code = payload[5:]
user = mdb.verify_telegram_link(code, chat_id)
if user:
_linked_users[chat_id] = user
_user_langs[chat_id] = user.get('lang', 'en')
lang = user.get('lang', 'en')
bot.send_message(chat_id,
f"<b>{_LINK_SUCCESS.get(lang, _LINK_SUCCESS['en'])}</b>",
parse_mode='HTML')
_send_lang_picker(bot, chat_id)
else:
bot.send_message(chat_id,
_LINK_FAILED.get('en', _LINK_FAILED['en']),
parse_mode='HTML')
return
# 2. Already linked?
user = _get_linked_user(chat_id)
if user:
_user_langs[chat_id] = user.get('lang', 'en')
_send_lang_picker(bot, chat_id)
return
# 3. Not linked — register first
bot.send_message(chat_id, _REGISTER_FIRST.get('en', _REGISTER_FIRST['en']),
parse_mode='HTML')
# ---------------------------------------------------------------------------
# LANGUAGE PICKER (first step on /start)
# ---------------------------------------------------------------------------
def _send_lang_picker(bot, chat_id: int):
"""Send language selection keyboard as first /start step."""
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
kb = InlineKeyboardMarkup(row_width=3)
kb.add(
InlineKeyboardButton("English", callback_data="lang_en"),
InlineKeyboardButton("Русский", callback_data="lang_ru"),
InlineKeyboardButton("Español", callback_data="lang_es"),
)
bot.send_message(
chat_id,
"<b>SeaFare Montana</b>\n\nChoose language / Выберите язык / Elija idioma:",
parse_mode='HTML',
reply_markup=kb,
)
# ---------------------------------------------------------------------------
# INLINE KEYBOARD (shown on /start)
# ---------------------------------------------------------------------------
def _start_keyboard(lang: str):
"""Build inline keyboard for /start message."""
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, WebAppInfo
kb = InlineKeyboardMarkup(row_width=2)
# Mini App button (top row, full width)
from config import APP_VERSION
webapp_url = f'https://seafare.efir.org/miniapp?v={APP_VERSION}'
app_label = {'en': 'Open Mini App', 'ru': 'Открыть приложение', 'es': 'Abrir aplicación'}
kb.add(InlineKeyboardButton(
app_label.get(lang, app_label['en']),
web_app=WebAppInfo(url=webapp_url)))
labels = {
'en': [("Track vessel", "hint_track"), ("Vessels nearby", "hint_nearby"),
("Route", "hint_route"), ("Freight rate", "hint_freight"),
("Watch vessel", "hint_watch"), ("Help", "hint_help")],
'ru': [("Найти судно", "hint_track"), ("Суда в порту", "hint_nearby"),
("Маршрут", "hint_route"), ("Фрахт", "hint_freight"),
("Следить за судном", "hint_watch"), ("Помощь", "hint_help")],
'es': [("Rastrear buque", "hint_track"), ("Buques en puerto", "hint_nearby"),
("Ruta", "hint_route"), ("Flete", "hint_freight"),
("Seguir buque", "hint_watch"), ("Ayuda", "hint_help")],
}
btns = labels.get(lang, labels['en'])
for i in range(0, len(btns), 2):
row = [InlineKeyboardButton(b[0], callback_data=b[1]) for b in btns[i:i+2]]
kb.add(*row)
return kb
# ---------------------------------------------------------------------------
# RATE LIMITING
# ---------------------------------------------------------------------------
def _check_rate_limit(rate_key: int) -> bool:
"""Returns True if request is allowed."""
now = time.time()
bucket = _rate_buckets[rate_key]
_rate_buckets[rate_key] = [t for t in bucket if now - t < RATE_LIMIT_PERIOD]
if len(_rate_buckets[rate_key]) >= RATE_LIMIT_MAX:
return False
_rate_buckets[rate_key].append(now)
return True
def _get_rate_key(message) -> int:
"""Per-user rate limit in groups, per-chat in private."""
if message.chat.type in ('group', 'supergroup'):
return message.from_user.id if message.from_user else message.chat.id
return message.chat.id
# ---------------------------------------------------------------------------
# LANGUAGE DETECTION
# ---------------------------------------------------------------------------
_CYRILLIC_RE = re.compile(r'[а-яёА-ЯЁ]')
_SPANISH_WORDS = {'hola', 'buenos', 'buenas', 'buscar', 'buque', 'puerto', 'flete'}
def _detect_lang(text: str, chat_id: int) -> str:
"""Auto-detect language from message text. Remembers per chat."""
if _CYRILLIC_RE.search(text):
_user_langs[chat_id] = 'ru'
elif any(w in text.lower().split() for w in _SPANISH_WORDS):
_user_langs[chat_id] = 'es'
return _user_langs.get(chat_id, 'en')
# ---------------------------------------------------------------------------
# GROUP CHAT SUPPORT
# ---------------------------------------------------------------------------
def _should_respond(message) -> bool:
"""Determine if bot should respond to this message."""
# Always respond in private chats
if message.chat.type == 'private':
return True
text = message.text or ''
# /commands in groups
if text.startswith('/'):
return True
# Reply to bot's own message
if (message.reply_to_message
and message.reply_to_message.from_user
and _bot_user_id
and message.reply_to_message.from_user.id == _bot_user_id):
return True
# @mention in entities
if message.entities:
for entity in message.entities:
if entity.type == 'mention':
mention = text[entity.offset:entity.offset + entity.length]
if mention.lower() == f'@{BOT_USERNAME}'.lower():
return True
elif entity.type == 'text_mention' and entity.user:
if _bot_user_id and entity.user.id == _bot_user_id:
return True
# Plain text @mention fallback
if f'@{BOT_USERNAME}'.lower() in text.lower():
return True
return False
def _clean_message_text(text: str) -> str:
"""Remove bot @mention from message text."""
return re.sub(rf'@{re.escape(BOT_USERNAME)}\s*', '', text, flags=re.IGNORECASE).strip()
# ---------------------------------------------------------------------------
# RESPONSE FORMATTING (Telegram-specific)
# ---------------------------------------------------------------------------
_SHOWMAP_RE = re.compile(r'\{\{SHOWMAP~([^~]+)~([^~]+)~([^~]+)~([^}]+)\}\}')
def _format_for_telegram(text: str):
"""Convert agent response to Telegram HTML format.
Returns: (html_text, [(lat, lon, label), ...])
"""
locations = []
def _extract_map(match):
lat_s, lon_s, _zoom, label = match.group(1), match.group(2), match.group(3), match.group(4)
try:
locations.append((float(lat_s), float(lon_s), label.strip()))
except ValueError:
pass
return label.strip()
text = _SHOWMAP_RE.sub(_extract_map, text)
# Markdown → HTML (safe subset)
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
text = re.sub(r'(?<!\w)_([^_]+)_(?!\w)', r'<i>\1</i>', text)
text = re.sub(r'`([^`]+)`', r'<code>\1</code>', text)
# Markdown tables → <pre> (Telegram has no table rendering)
lines = text.split('\n')
result = []
table_lines = []
in_table = False
for line in lines:
stripped = line.strip()
if stripped.startswith('|') and stripped.endswith('|'):
if re.match(r'^\|[\s\-:|]+\|$', stripped):
continue # skip separator row
if not in_table:
in_table = True
table_lines = []
table_lines.append(stripped)
else:
if in_table:
result.append('<pre>' + '\n'.join(table_lines) + '</pre>')
in_table = False
table_lines = []
result.append(line)
if in_table:
result.append('<pre>' + '\n'.join(table_lines) + '</pre>')
text = '\n'.join(result)
# Telegram message limit
if len(text) > 4000:
text = text[:3990] + '\n\n... (truncated)'
return text, locations
# ---------------------------------------------------------------------------
# VESSEL WATCH — COMMANDS
# ---------------------------------------------------------------------------
def _cmd_watch(bot, message, text: str, lang: str):
"""Handle /watch command. Creates arrival or status watch."""
import maritime_db as db
from marinetraffic_parser import resolve_port
chat_id = message.chat.id
# Check watch limit
active = _watches.get(chat_id, [])
active = [w for w in active if not w.get('notified')]
if len(active) >= WATCH_MAX:
msgs = {
'en': f"Maximum {WATCH_MAX} active watches. Use /unwatch to remove one.",
'ru': f"Максимум {WATCH_MAX} активных отслеживаний. Используйте /unwatch.",
'es': f"Máximo {WATCH_MAX} seguimientos activos. Use /unwatch para eliminar uno.",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
return
# Parse: /watch VESSEL to PORT or /watch VESSEL status
parts = text.split(maxsplit=1)
if len(parts) < 2:
msgs = {
'en': "Usage:\n/watch VESSEL to PORT — arrival alert\n/watch VESSEL status — status change alert",
'ru': "Использование:\n/watch СУДНО to ПОРТ — оповещение о приходе\n/watch СУДНО status — смена статуса",
'es': "Uso:\n/watch BUQUE to PUERTO — alerta de llegada\n/watch BUQUE status — cambio de estado",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
return
args = parts[1].strip()
# Determine watch type
watch_type = None
vessel_query = None
port_query = None
if args.lower().endswith(' status'):
watch_type = 'status'
vessel_query = args[:-7].strip()
elif ' to ' in args.lower():
watch_type = 'arrival'
idx = args.lower().index(' to ')
vessel_query = args[:idx].strip()
port_query = args[idx + 4:].strip()
else:
msgs = {
'en': "Use: /watch VESSEL to PORT or /watch VESSEL status",
'ru': "Формат: /watch СУДНО to ПОРТ или /watch СУДНО status",
'es': "Formato: /watch BUQUE to PUERTO o /watch BUQUE status",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
return
if not vessel_query:
bot.send_message(chat_id, "Vessel name required.")
return
bot.send_chat_action(chat_id, 'typing')
# Resolve vessel
vessel = None
vessels = db.search_vessels(vessel_query, limit=1)
if vessels:
vessel = vessels[0]
else:
# Try live lookup
try:
from ais_provider import get_provider
pos = get_provider().get_vessel_position(name=vessel_query)
if pos and pos.get('mmsi'):
vessel = db.get_vessel(mmsi=pos['mmsi'])
if not vessel:
vessel = {'mmsi': pos['mmsi'], 'name': vessel_query.upper()}
except Exception as e:
print(f"[Telegram] Watch vessel lookup error: {e}")
if not vessel or not vessel.get('mmsi'):
msgs = {
'en': f"Vessel not found: {vessel_query}. Try exact name, IMO, or MMSI.",
'ru': f"Судно не найдено: {vessel_query}. Попробуйте точное имя, IMO или MMSI.",
'es': f"Buque no encontrado: {vessel_query}. Pruebe nombre exacto, IMO o MMSI.",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
return
mmsi = vessel['mmsi']
vessel_name = vessel.get('name', vessel_query.upper())
# For arrival watches: resolve port
dest_port = None
dest_lat = None
dest_lon = None
port_radius = 15.0
if watch_type == 'arrival':
if not port_query:
bot.send_message(chat_id, "Port name required after 'to'.")
return
port = resolve_port(port_query)
if not port:
msgs = {
'en': f"Port not found: {port_query}",
'ru': f"Порт не найден: {port_query}",
'es': f"Puerto no encontrado: {port_query}",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
return
dest_port = port.get('name', port_query)
dest_lat = port['lat']
dest_lon = port['lon']
port_radius = port.get('radius_nm', 15.0)
# For status watches: get current status
last_status = None
if watch_type == 'status':
pos = db.get_last_position(mmsi)
if pos:
last_status = pos.get('status')
# Save to DB
watch_id = db.add_vessel_watch(
chat_id=chat_id, mmsi=mmsi, vessel_name=vessel_name,
watch_type=watch_type, destination_port=dest_port,
destination_lat=dest_lat, destination_lon=dest_lon,
port_radius_nm=port_radius, last_status=last_status
)
# Save to memory
watch_dict = {
'id': watch_id, 'mmsi': mmsi, 'vessel_name': vessel_name,
'watch_type': watch_type, 'destination_port': dest_port,
'dest_lat': dest_lat, 'dest_lon': dest_lon,
'port_radius_nm': port_radius, 'last_status': last_status,
'last_lat': None, 'last_lon': None, 'notified': False,
'created_at': time.time(),
}
_watches[chat_id].append(watch_dict)
# Subscribe to AISStream for fresh data
try:
from ais_provider import get_provider
prov = get_provider()
if hasattr(prov, 'aisstream') and prov.aisstream:
prov.aisstream.subscribe_mmsi(mmsi)
except Exception:
pass
# Confirm
if watch_type == 'arrival':
msgs = {
'en': f"Watching <b>{vessel_name}</b> — will notify when it arrives at <b>{dest_port}</b>.",
'ru': f"Отслеживаю <b>{vessel_name}</b> — оповещу когда придёт в <b>{dest_port}</b>.",
'es': f"Siguiendo <b>{vessel_name}</b> — notificaré cuando llegue a <b>{dest_port}</b>.",
}
else:
status_str = last_status or 'unknown'
msgs = {
'en': f"Watching <b>{vessel_name}</b> status (current: {status_str}).",
'ru': f"Отслеживаю статус <b>{vessel_name}</b> (текущий: {status_str}).",
'es': f"Siguiendo estado de <b>{vessel_name}</b> (actual: {status_str}).",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']), parse_mode='HTML')
def _cmd_watches(bot, message, lang: str):
"""Handle /watches — list active watches."""
chat_id = message.chat.id
active = [w for w in _watches.get(chat_id, []) if not w.get('notified')]
if not active:
msgs = {
'en': "No active watches. Use /watch to create one.",
'ru': "Нет активных отслеживаний. Используйте /watch.",
'es': "Sin seguimientos activos. Use /watch para crear uno.",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
return
headers = {'en': '<b>Active watches:</b>', 'ru': '<b>Отслеживания:</b>', 'es': '<b>Seguimientos:</b>'}
lines = [headers.get(lang, headers['en'])]
for i, w in enumerate(active, 1):
if w['watch_type'] == 'arrival':
lines.append(f"{i}. {w['vessel_name']}{w['destination_port']}")
else:
lines.append(f"{i}. {w['vessel_name']} (status)")
bot.send_message(chat_id, '\n'.join(lines), parse_mode='HTML')
def _cmd_unwatch(bot, message, text: str, lang: str):
"""Handle /unwatch N — remove watch by number."""
import maritime_db as db
chat_id = message.chat.id
active = [w for w in _watches.get(chat_id, []) if not w.get('notified')]
parts = text.split()
if len(parts) < 2:
if not active:
msgs = {'en': "No active watches.", 'ru': "Нет отслеживаний.", 'es': "Sin seguimientos."}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
else:
_cmd_watches(bot, message, lang)
bot.send_message(chat_id, "Usage: /unwatch N")
return
try:
idx = int(parts[1]) - 1
except ValueError:
bot.send_message(chat_id, "Usage: /unwatch N (number)")
return
if idx < 0 or idx >= len(active):
bot.send_message(chat_id, f"Invalid number. You have {len(active)} watches.")
return
watch = active[idx]
db.delete_watch(watch['id'], chat_id)
_watches[chat_id] = [w for w in _watches[chat_id] if w['id'] != watch['id']]
msgs = {
'en': f"Removed: {watch['vessel_name']}",
'ru': f"Удалено: {watch['vessel_name']}",
'es': f"Eliminado: {watch['vessel_name']}",
}
bot.send_message(chat_id, msgs.get(lang, msgs['en']))
# ---------------------------------------------------------------------------
# VESSEL WATCH — BACKGROUND CHECKER
# ---------------------------------------------------------------------------
def _load_watches_from_db():
"""Load active watches from DB into memory (called at startup)."""
try:
import maritime_db as db
all_watches = db.get_active_watches()
for w in all_watches:
watch_dict = {
'id': w['id'], 'mmsi': w['mmsi'],
'vessel_name': w.get('vessel_name', ''),
'watch_type': w['watch_type'],
'destination_port': w.get('destination_port'),
'dest_lat': w.get('destination_lat'),
'dest_lon': w.get('destination_lon'),
'port_radius_nm': w.get('port_radius_nm', 15),
'last_status': w.get('last_status'),
'last_lat': w.get('last_lat'),
'last_lon': w.get('last_lon'),
'notified': False,
'created_at': time.time(),
}
_watches[w['chat_id']].append(watch_dict)
total = sum(len(v) for v in _watches.values())
print(f"[Telegram] Loaded {total} active watches from DB")
except Exception as e:
print(f"[Telegram] Failed to load watches: {e}")
def _subscribe_watched_vessels():
"""Subscribe all watched MMSIs to AISStream."""
try:
from ais_provider import get_provider
prov = get_provider()
if not (hasattr(prov, 'aisstream') and prov.aisstream):
return
mmsis = set()
for watches in _watches.values():
for w in watches:
mmsis.add(w['mmsi'])
for mmsi in mmsis:
prov.aisstream.subscribe_mmsi(mmsi)
if mmsis:
print(f"[Telegram] Subscribed {len(mmsis)} watched MMSIs to AISStream")
except Exception as e:
print(f"[Telegram] AISStream subscribe error: {e}")
def _calc_distance_nm(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Simple flat-earth distance in nautical miles."""
dlat = abs(lat2 - lat1) * 60
dlon = abs(lon2 - lon1) * 60 * max(math.cos(math.radians(lat1)), 0.01)
return math.sqrt(dlat ** 2 + dlon ** 2)
def _check_arrival(bot, chat_id: int, watch: dict, pos: dict):
"""Check if vessel has arrived at destination port."""
lat = pos.get('latitude')
lon = pos.get('longitude')
if lat is None or lon is None or watch.get('dest_lat') is None:
return
dist = _calc_distance_nm(lat, lon, watch['dest_lat'], watch['dest_lon'])
if dist <= watch.get('port_radius_nm', 15):
lang = _user_langs.get(chat_id, 'en')
msgs = {
'en': f"<b>Vessel Alert:</b> <b>{watch['vessel_name']}</b> arrived at <b>{watch['destination_port']}</b> ({dist:.1f} NM)",
'ru': f"<b>Оповещение:</b> <b>{watch['vessel_name']}</b> прибыл в <b>{watch['destination_port']}</b> ({dist:.1f} NM)",
'es': f"<b>Alerta:</b> <b>{watch['vessel_name']}</b> llegó a <b>{watch['destination_port']}</b> ({dist:.1f} NM)",
}
try:
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
kb = InlineKeyboardMarkup()
kb.add(InlineKeyboardButton(
f"\U0001F4CD {watch['vessel_name']}",
url=f"https://www.google.com/maps?q={lat},{lon}"
))
bot.send_message(chat_id, msgs.get(lang, msgs['en']),
parse_mode='HTML', reply_markup=kb)
except Exception as e:
print(f"[Telegram] Arrival notification failed: {e}")
watch['notified'] = True
try:
import maritime_db as db
db.mark_watch_notified(watch['id'])
except Exception:
pass
def _check_status_change(bot, chat_id: int, watch: dict, pos: dict):
"""Check if vessel nav status has changed."""
current_status = pos.get('status')
if not current_status or not watch.get('last_status'):
if current_status and not watch.get('last_status'):
watch['last_status'] = current_status
return
if current_status != watch['last_status']:
lang = _user_langs.get(chat_id, 'en')
old = watch['last_status']
msgs = {
'en': f"<b>Status Alert:</b> <b>{watch['vessel_name']}</b> changed: {old}{current_status}",
'ru': f"<b>Статус:</b> <b>{watch['vessel_name']}</b> изменился: {old}{current_status}",
'es': f"<b>Estado:</b> <b>{watch['vessel_name']}</b> cambió: {old}{current_status}",
}
try:
reply_kb = None
lat = pos.get('latitude')
lon = pos.get('longitude')
if lat and lon:
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
reply_kb = InlineKeyboardMarkup()
reply_kb.add(InlineKeyboardButton(
f"\U0001F4CD {watch['vessel_name']}",
url=f"https://www.google.com/maps?q={lat},{lon}"
))
bot.send_message(chat_id, msgs.get(lang, msgs['en']),
parse_mode='HTML', reply_markup=reply_kb)
except Exception as e:
print(f"[Telegram] Status notification failed: {e}")
watch['last_status'] = current_status
try:
import maritime_db as db
db.update_watch_position(
watch['id'], last_status=current_status,
last_lat=pos.get('latitude'), last_lon=pos.get('longitude')
)
except Exception:
pass
def _watch_checker_loop(bot):
"""Background thread: check all active watches periodically."""
import maritime_db as db
print(f"[Telegram] Watch checker started (interval={WATCH_CHECK_INTERVAL}s)")
while True:
time.sleep(WATCH_CHECK_INTERVAL)
try:
checked = 0
expired = 0
now = time.time()
for chat_id in list(_watches.keys()):
watches = _watches[chat_id]
for watch in watches:
if watch.get('notified'):
continue
# Auto-expire old watches
age_hours = (now - watch.get('created_at', now)) / 3600
if age_hours > WATCH_EXPIRY_HOURS:
watch['notified'] = True
try:
db.mark_watch_notified(watch['id'])
except Exception:
pass
expired += 1
continue
pos = db.get_last_position(watch['mmsi'])
if not pos:
continue
# Update position in watch
watch['last_lat'] = pos.get('latitude')
watch['last_lon'] = pos.get('longitude')
if watch['watch_type'] == 'arrival':
_check_arrival(bot, chat_id, watch, pos)
elif watch['watch_type'] == 'status':
_check_status_change(bot, chat_id, watch, pos)
checked += 1
# Cleanup notified watches from memory
_watches[chat_id] = [w for w in watches if not w.get('notified')]
if checked > 0 or expired > 0:
print(f"[Telegram] Watch check: {checked} checked, {expired} expired")
except Exception as e:
print(f"[Telegram] Watch checker error: {e}")
# ---------------------------------------------------------------------------
# MESSAGE HANDLER
# ---------------------------------------------------------------------------
def _handle_message(bot, message):
"""Process incoming Telegram message (private + group)."""
# Group filtering
if not _should_respond(message):
return
chat_id = message.chat.id
text = (message.text or '').strip()
if not text:
return
is_group = message.chat.type in ('group', 'supergroup')
# Clean @mention from text in groups
if is_group:
text = _clean_message_text(text)
if not text:
return
# --- Commands ---
if text.startswith('/start'):
_handle_start(bot, message)
return
if text.startswith('/help'):
lang = _user_langs.get(chat_id, 'en')
bot.send_message(chat_id, _HELP.get(lang, _HELP['en']), parse_mode='HTML')
return
if text.startswith('/lang'):
parts = text.split()
if len(parts) >= 2 and parts[1] in ('en', 'ru', 'es'):
_user_langs[chat_id] = parts[1]
ack = {'en': 'Language set to English', 'ru': 'Язык: Русский', 'es': 'Idioma: Español'}
bot.send_message(chat_id, ack[parts[1]])
else:
bot.send_message(chat_id, 'Usage: /lang en|ru|es')
return
lang = _detect_lang(text, chat_id)
if text.startswith('/watch'):
_cmd_watch(bot, message, text, lang)
return
if text.startswith('/watches'):
_cmd_watches(bot, message, lang)
return
if text.startswith('/unwatch'):
_cmd_unwatch(bot, message, text, lang)
return
# --- Check account linked (private chats only) ---
user = _get_linked_user(chat_id) if not is_group else None
if not is_group and not user:
bot.send_message(chat_id, _REGISTER_FIRST.get(lang, _REGISTER_FIRST['en']),
parse_mode='HTML')
return
# --- Rate limit ---
rate_key = _get_rate_key(message)
if not _check_rate_limit(rate_key):
bot.send_message(chat_id, _RATE_LIMITED.get(lang, _RATE_LIMITED['en']))
return
# --- Typing indicator ---
bot.send_chat_action(chat_id, 'typing')
# --- Build user context for agent ---
user_context = None
user_id = None
is_admin = False
if user:
user_id = user.get('id')
is_admin = bool(user.get('is_admin'))
try:
import maritime_db as mdb
user_context = mdb.get_profile_summary(user['id'])
# Load DB history on first message (cross-client sync)
if not _conversations[chat_id] and user_id:
db_hist = mdb.get_chat_history(user_id, limit=10)
for msg in db_hist:
_conversations[chat_id].append({
'role': msg['role'],
'message': msg['message']
})
except Exception:
pass
# --- Build conversation history ---
history = _conversations[chat_id][-(MAX_HISTORY * 2):]
# --- Call agent ---
try:
from seafare_agent import generate_response
response = generate_response(
user_message=text,
conversation_history=history,
user_context=user_context,
is_admin=is_admin,
user_id=user_id,
lang=lang,
)
except Exception as e:
print(f"[Telegram] Agent error chat={chat_id}: {e}")
response = _ERROR.get(lang, _ERROR['en'])
# --- Save to in-memory history ---
if is_group and message.from_user:
sender = message.from_user.first_name or 'User'
history_text = f"[{sender}]: {text}"
else:
history_text = text
_conversations[chat_id].append({'role': 'user', 'message': history_text})
_conversations[chat_id].append({'role': 'assistant', 'message': response})
if len(_conversations[chat_id]) > MAX_HISTORY * 2:
_conversations[chat_id] = _conversations[chat_id][-(MAX_HISTORY * 2):]
# Save to DB for cross-client sync (web ↔ miniapp ↔ telegram)
if user_id:
try:
import maritime_db as mdb
mdb.add_chat_message(user_id, 'user', text)
mdb.add_chat_message(user_id, 'assistant', response)
except Exception:
pass
# --- Format and send ---
formatted_text, locations = _format_for_telegram(response)
# Build inline keyboard with map links (instead of separate location messages)
reply_markup = None
if locations:
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
kb = InlineKeyboardMarkup(row_width=1)
for lat, lon, label in locations[:3]:
short = label[:30] if len(label) > 30 else label
url = f"https://www.google.com/maps?q={lat},{lon}"
kb.add(InlineKeyboardButton(f"\U0001F4CD {short}", url=url))
reply_markup = kb
try:
bot.send_message(chat_id, formatted_text, parse_mode='HTML',
disable_web_page_preview=True, reply_markup=reply_markup)
except Exception as e:
print(f"[Telegram] HTML send failed: {e}")
try:
bot.send_message(chat_id, response[:4000])
except Exception as e2:
print(f"[Telegram] Plain send failed: {e2}")
# ---------------------------------------------------------------------------
# BOT POLLING LOOP
# ---------------------------------------------------------------------------
def _bot_polling(token: str):
"""Blocking polling loop. Runs as main process or daemon thread."""
global _bot_user_id
import telebot
bot = telebot.TeleBot(token, parse_mode=None)
# Get bot user ID for group reply detection
try:
me = bot.get_me()
_bot_user_id = me.id
print(f"[Telegram] Bot identity: @{me.username} (id={me.id})")
except Exception as e:
print(f"[Telegram] get_me() failed: {e}")
# Set Mini App menu button
try:
from telebot.types import MenuButtonWebApp, WebAppInfo
from config import APP_VERSION
webapp_url = f'https://seafare.efir.org/miniapp?v={APP_VERSION}'
bot.set_chat_menu_button(menu_button=MenuButtonWebApp(
type='web_app', text='SeaFare', web_app=WebAppInfo(url=webapp_url)))
print(f"[Telegram] Menu button set → {webapp_url}")
except Exception as e:
print(f"[Telegram] Menu button error: {e}")
# Load watches from DB
_load_watches_from_db()
_subscribe_watched_vessels()
# Start watch checker thread
checker = threading.Thread(target=_watch_checker_loop, args=(bot,),
daemon=True, name='watch-checker')
checker.start()
@bot.callback_query_handler(func=lambda call: True)
def on_callback(call):
try:
chat_id = call.message.chat.id
data = call.data
# Language selection from /start
if data.startswith('lang_'):
lang_code = data[5:] # 'en', 'ru', 'es'
if lang_code in ('en', 'ru', 'es'):
_user_langs[chat_id] = lang_code
kb = _start_keyboard(lang_code)
bot.send_message(chat_id, _WELCOME.get(lang_code, _WELCOME['en']),
parse_mode='HTML', reply_markup=kb)
bot.answer_callback_query(call.id)
return
lang = _user_langs.get(chat_id, 'en')
if data == 'hint_help':
bot.send_message(chat_id, _HELP.get(lang, _HELP['en']), parse_mode='HTML')
elif data in _HINTS:
hint = _HINTS[data]
bot.send_message(chat_id, hint.get(lang, hint['en']), parse_mode='HTML')
bot.answer_callback_query(call.id)
except Exception as e:
print(f"[Telegram] Callback error: {e}")
@bot.message_handler(func=lambda m: True)
def on_message(message):
try:
_handle_message(bot, message)
except Exception as e:
print(f"[Telegram] Handler error: {e}")
print("[Telegram] Bot polling started")
while True:
try:
bot.polling(
non_stop=True,
interval=1,
timeout=POLL_TIMEOUT,
long_polling_timeout=POLL_TIMEOUT,
)
except Exception as e:
print(f"[Telegram] Polling error: {e}, restarting in 10s...")
time.sleep(10)
# ---------------------------------------------------------------------------
# STARTUP (called from seafare_api.py or standalone)
# ---------------------------------------------------------------------------
def start_telegram_bot():
"""Start Telegram bot as background daemon thread."""
if os.environ.get('_TELEGRAM_STARTED'):
print("[Telegram] Bot already started — skipping")
return
os.environ['_TELEGRAM_STARTED'] = '1'
token = os.environ.get(config.ENV_TELEGRAM_BOT_TOKEN)
if not token:
print("[Telegram] TELEGRAM_BOT_TOKEN not set — bot disabled")
return
anthropic_key = os.environ.get(config.ENV_ANTHROPIC_API_KEY)
if not anthropic_key:
print("[Telegram] ANTHROPIC_API_KEY not set — bot disabled")
return
t = threading.Thread(target=_bot_polling, args=(token,), daemon=True)
t.start()
print("[Telegram] Bot thread started (polling mode)")
# ---------------------------------------------------------------------------
# STANDALONE
# ---------------------------------------------------------------------------
if __name__ == '__main__':
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
)
token = os.environ.get(config.ENV_TELEGRAM_BOT_TOKEN)
if not token:
print("Set TELEGRAM_BOT_TOKEN in .env")
exit(1)
print("[Telegram] Starting bot in standalone mode...")
_bot_polling(token)