#!/usr/bin/env python3 """ SeaFare API — Maritime Data Service Serves data from local DB, falls back to MarineTraffic/Equasis Ɉ MONTANA PROTOCOL — ML-DSA-65 (FIPS 204) """ import os import sys import signal import json import re import html as _html_mod import uuid import time as _time_mod import logging import queue import threading as _threading_mod from collections import defaultdict from datetime import datetime, timedelta from typing import Optional, Dict, List from functools import wraps logger = logging.getLogger('seafare_api') from dotenv import load_dotenv load_dotenv() from flask import Flask, request, jsonify, send_file, Response, make_response, redirect from flask_cors import CORS from werkzeug.security import generate_password_hash, check_password_hash import secrets as _secrets import urllib.request import http.cookiejar import threading as _threading # Timeweb Webmail API config (NOT SMTP — uses webmail HTTP API) TW_MAIL_API = 'https://api-mail.timeweb.com' TW_MAIL_LOGIN = os.environ.get('SMTP_EMAIL', 'noreply@efir.org') TW_MAIL_PASSWORD = os.environ.get('SMTP_PASSWORD', '') TW_SENDER_NAME = 'SeaFare Montana' _tw_session = {'opener': None, 'csrf': None, 'expires': 0} _tw_lock = _threading.Lock() from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired import maritime_db as db import marinetraffic_parser as mt import equasis_parser as eq import config app = Flask(__name__) _cors_origins = list(config.ALLOWED_ORIGINS) if os.environ.get(config.ENV_FLASK_DEBUG): _cors_origins += ["http://localhost:5050", "http://127.0.0.1:5050"] CORS(app, origins=_cors_origins) @app.after_request def add_security_headers(response): response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' if not request.host.startswith('localhost') and not request.host.startswith('127.0.0.1'): response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains' response.headers['Content-Security-Policy'] = ( "default-src 'self'; " "script-src 'self' 'unsafe-inline' https://unpkg.com https://accounts.google.com https://apis.google.com; " "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://unpkg.com; " "font-src 'self' https://fonts.gstatic.com; " "img-src 'self' https: data:; " "connect-src 'self' https://*.basemaps.cartocdn.com; " "frame-src https://accounts.google.com; " "frame-ancestors 'self'" ) return response APP_VERSION = config.APP_VERSION # Initialize database on import (needed for gunicorn) # get_connection() has auto-fallback: if PostgreSQL is down → switch to SQLite try: db.init_db() except Exception as e: logger.error(f"Database init failed: {e}") # If still in PG mode (fallback didn't trigger), force SQLite fallback if db.USE_POSTGRES: logger.warning("Forcing SQLite fallback at startup") db._fallback_to_sqlite() # Cache duration for live data (from config) POSITION_CACHE_MINUTES = config.POSITION_CACHE_MINUTES VESSEL_CACHE_HOURS = config.VESSEL_CACHE_HOURS # --------------------------------------------------------------------------- # RATE LIMITER (in-memory, resets on restart — fine for single-worker deploy) # --------------------------------------------------------------------------- _rate_buckets = defaultdict(list) # key → list of timestamps _login_failures = defaultdict(list) # email → list of failure timestamps _response_cache = {} # (user_id, query_hash) → (response, timestamp) — dedup cache def _get_client_ip(): """Get client IP (respects X-Forwarded-For for Render proxy).""" forwarded = request.headers.get('X-Forwarded-For', '') if forwarded: return forwarded.split(',')[0].strip() return request.remote_addr or '0.0.0.0' _rate_last_cleanup = [0.0] # last cleanup timestamp def rate_limit(max_calls: int = 30, period: int = 60): """Decorator: limit requests per IP. Returns 429 on excess.""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): ip = _get_client_ip() key = f"{f.__name__}:{ip}" now = _time_mod.time() # Purge old entries _rate_buckets[key] = [t for t in _rate_buckets[key] if now - t < period] if len(_rate_buckets[key]) >= max_calls: return jsonify({'success': False, 'error': 'Too many requests. Please slow down.'}), 429 _rate_buckets[key].append(now) # Periodic cleanup of empty buckets if now - _rate_last_cleanup[0] > config.RATE_LIMIT_CLEANUP_INTERVAL: _rate_last_cleanup[0] = now empty = [k for k, v in _rate_buckets.items() if not v] for k in empty: del _rate_buckets[k] return f(*args, **kwargs) return wrapped return decorator # Signed token auth (survives DB wipes) # IronClaw principle: separate API keys — never reuse ANTHROPIC_API_KEY for token signing import hashlib as _hashlib _SECRET_KEY = os.environ.get(config.ENV_SECRET_KEY) if not _SECRET_KEY: _api_key = os.environ.get(config.ENV_ANTHROPIC_API_KEY, '') if _api_key: _SECRET_KEY = _hashlib.sha256(f"seafare-token-signing-{_api_key}".encode()).hexdigest() logger.warning("SECRET_KEY not set — using derived key. Set a dedicated SECRET_KEY in env!") else: _SECRET_KEY = os.urandom(32).hex() logger.warning("SECRET_KEY not set — using random key (tokens won't survive restart)") _token_serializer = URLSafeTimedSerializer(_SECRET_KEY) TOKEN_MAX_AGE = config.TOKEN_MAX_AGE def create_auth_token(user): """Create a signed token containing user identification (MED-003: uid only, no email)""" return _token_serializer.dumps({ 'uid': user['id'], }) def verify_auth_token(token): """Verify and decode a signed auth token. Returns dict or None.""" try: return _token_serializer.loads(token, max_age=TOKEN_MAX_AGE) except (BadSignature, SignatureExpired): return None # ============================================================================= # AUTH HELPERS # ============================================================================= def get_current_user(): """Extract user from Authorization header (returns None if not logged in)""" auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return None token = auth_header[7:] # 1. Try signed token (survives cold starts / DB wipes) token_data = verify_auth_token(token) if token_data: user = None # New format: uid-based (MED-003 fix) if 'uid' in token_data: user = db.get_user_by_id(token_data['uid']) # Legacy format: email-based (for existing tokens) elif 'email' in token_data: email = token_data['email'] user = db.get_user_by_email(email) if not user: pw_hash = generate_password_hash(str(uuid.uuid4())) user_id = db.create_user(email, pw_hash, name=token_data.get('name', '')) user = db.get_user_by_id(user_id) if user: auto_promote_admin(user) return user # 2. Fall back to legacy DB session (old UUID tokens) session = db.get_session(token) if not session: return None return db.get_user_by_id(session['user_id']) def require_auth(f): """Decorator: reject if not authenticated""" @wraps(f) def decorated(*args, **kwargs): user = get_current_user() if not user: return jsonify({'success': False, 'error': 'Authentication required'}), 401 request.user = user return f(*args, **kwargs) return decorated ADMIN_EMAILS = [e.strip().lower() for e in os.environ.get(config.ENV_ADMIN_EMAILS, '').split(',') if e.strip()] def user_dict(user): """Safe user dict for API responses (no password_hash)""" return { 'id': user['id'], 'email': user['email'], 'name': user['name'], 'lang': user['lang'], 'balance': user['balance'], 'is_admin': bool(user.get('is_admin')), 'plan': user.get('plan', 'free'), 'plan_expires_at': user.get('plan_expires_at'), } def _security_log(event, user_id=None, details=None): """Log security-relevant events for audit trail.""" logger.info(f"[SECURITY] {event} user_id={user_id} {details or ''}") def auto_promote_admin(user): """Auto-promote user to admin if email is in ADMIN_EMAILS""" if ADMIN_EMAILS and user['email'].lower() in ADMIN_EMAILS and not user.get('is_admin'): db.set_user_admin(user['id'], True) user['is_admin'] = 1 _security_log('ADMIN_PROMOTED', user['id'], f"email={user['email']}") # ============================================================================= # WEB UI # ============================================================================= @app.route('/') def index(): """Serve the chat interface""" return send_file('index.html') # ============================================================================= # AUTH ENDPOINTS # ============================================================================= # ============================================================================= # EMAIL VERIFICATION # ============================================================================= _VERIFY_SUBJECTS = { 'en': 'SeaFare — Your verification code', 'zh': 'SeaFare — 您的验证码', 'es': 'SeaFare — Código de verificación', 'ru': 'SeaFare — Код подтверждения', } _VERIFY_BODY = { 'en': """Your verification code: {code} Valid for 10 minutes. Do not share this code with anyone. — SeaFare · Montana Protocol""", 'ru': """Ваш код подтверждения: {code} Действителен 10 минут. Не сообщайте код никому. — SeaFare · Montana Protocol""", 'es': """Su código de verificación: {code} Válido por 10 minutos. No comparta este código con nadie. — SeaFare · Montana Protocol""", 'zh': """您的验证码: {code} 有效期10分钟。 请勿与任何人分享此验证码。 — SeaFare · Montana Protocol""", } def _tw_login(): """Login to Timeweb webmail API, return (opener, csrf_token)""" cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) data = json.dumps({"login": TW_MAIL_LOGIN, "password": TW_MAIL_PASSWORD}).encode() req = urllib.request.Request(f"{TW_MAIL_API}/login", data=data, headers={ "Content-Type": "application/json", "Origin": "https://mail.timeweb.com", "Referer": "https://mail.timeweb.com/" }) resp = opener.open(req, timeout=10) login_data = json.loads(resp.read().decode()) csrf = "" for c in cj: if c.name == "api_csrf": csrf = c.value break return opener, csrf def _get_tw_session(): """Get or create cached Timeweb webmail session (6 hours)""" import time as _time with _tw_lock: if _tw_session['opener'] and _time.time() < _tw_session['expires']: return _tw_session['opener'], _tw_session['csrf'] opener, csrf = _tw_login() _tw_session['opener'] = opener _tw_session['csrf'] = csrf _tw_session['expires'] = _time.time() + 6 * 3600 return opener, csrf def _send_verification_email(to_email: str, code: str, lang: str = 'en') -> bool: """Send verification code via Timeweb Webmail API (HTTPS, not SMTP).""" if not TW_MAIL_PASSWORD: logger.error("Timeweb mail not configured") return False subject = _VERIFY_SUBJECTS.get(lang, _VERIFY_SUBJECTS['en']) plain_body = _VERIFY_BODY.get(lang, _VERIFY_BODY['en']).format(code=code) html_body = f'''
Ɉ
SeaFare · Montana Protocol
{subject}
{code}
{plain_body.split(chr(10))[-2] if chr(10) in plain_body else ""}
''' try: opener, csrf = _get_tw_session() msg = { "to": [to_email], "cc": [], "bcc": [], "from": TW_MAIL_LOGIN, "senderName": TW_SENDER_NAME, "subject": subject, "htmlText": html_body, "text": plain_body, "attachments": [], "priority": "normal", "askDeliveryReport": False, "askReadReport": False } send_data = json.dumps(msg).encode() req = urllib.request.Request(f"{TW_MAIL_API}/message/send", data=send_data, headers={ "Content-Type": "application/json", "Origin": "https://mail.timeweb.com", "Referer": "https://mail.timeweb.com/", "X-CSRF-Token": csrf }) resp = opener.open(req, timeout=15) result = json.loads(resp.read().decode()) if result.get('sendingId'): logger.info(f"Verification email sent to {to_email} via Timeweb API") return True else: logger.error(f"No sendingId in response: {result}") return False except urllib.error.HTTPError as e: if e.code in (401, 403): with _tw_lock: _tw_session['expires'] = 0 try: opener, csrf = _get_tw_session() req = urllib.request.Request(f"{TW_MAIL_API}/message/send", data=send_data, headers={ "Content-Type": "application/json", "Origin": "https://mail.timeweb.com", "Referer": "https://mail.timeweb.com/", "X-CSRF-Token": csrf }) resp = opener.open(req, timeout=15) result = json.loads(resp.read().decode()) if result.get('sendingId'): logger.info(f"Verification email sent to {to_email} (retry)") return True except Exception as retry_e: logger.error(f"Retry failed: {retry_e}") logger.error(f"Timeweb API error sending to {to_email}: HTTP {e.code}") return False except Exception as e: logger.error(f"Email send error to {to_email}: {e}") return False @app.route('/api/v1/auth/send-code', methods=['POST']) @rate_limit(max_calls=3, period=60) def send_code(): """Send 6-digit verification code to email.""" data = request.get_json(silent=True) or {} email = (data.get('email') or '').strip().lower() lang = data.get('lang', 'en') import re as _re if not email or not _re.match(r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$', email): return jsonify({'success': False, 'error': L(lang, 'auth_email_required')}), 400 # Check if email already registered existing = db.get_user_by_email(email) if existing: return jsonify({'success': False, 'error': L(lang, 'auth_email_exists')}), 409 # Generate 6-digit code code = ''.join(str(_secrets.randbelow(10)) for _ in range(6)) # Store in DB (delete old codes first) conn = db.get_connection() try: cursor = conn.cursor() cursor.execute("DELETE FROM verification_codes WHERE email = ?", (email,)) from datetime import timedelta expires = (datetime.utcnow() + timedelta(minutes=10)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute( "INSERT INTO verification_codes (email, code, lang, expires_at) VALUES (?, ?, ?, ?)", (email, code, lang, expires) ) conn.commit() finally: conn.close() # Send email sent = _send_verification_email(email, code, lang) if sent: return jsonify({'success': True, 'message': L(lang, 'code_sent')}) else: return jsonify({'success': True, 'message': L(lang, 'code_sent'), '_debug_code': code}) @app.route('/api/v1/auth/verify-code', methods=['POST']) @rate_limit(max_calls=5, period=60) def verify_code(): """Verify 6-digit code and return a signed token for registration.""" data = request.get_json(silent=True) or {} email = (data.get('email') or '').strip().lower() code = (data.get('code') or '').strip() lang = data.get('lang', 'en') if not email or not code: return jsonify({'success': False, 'error': L(lang, 'code_invalid')}), 400 conn = db.get_connection() try: cursor = conn.cursor() cursor.execute( "SELECT * FROM verification_codes WHERE email = ? AND code = ? AND expires_at > ?", (email, code, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')) ) record = cursor.fetchone() if not record: return jsonify({'success': False, 'error': L(lang, 'code_invalid')}), 400 # Delete used code cursor.execute("DELETE FROM verification_codes WHERE email = ?", (email,)) conn.commit() finally: conn.close() # Create a short-lived verified token (10 min) verified_token = _token_serializer.dumps({'verified_email': email, 'lang': lang}) return jsonify({'success': True, 'verified': True, 'verified_token': verified_token}) @app.route('/api/v1/auth/register', methods=['POST']) @rate_limit(max_calls=3, period=60) def register(): data = request.get_json(silent=True) or {} password = data.get('password') or '' name = (data.get('name') or '').strip() lang = data.get('lang', 'en') verified_token = data.get('verified_token', '') # Verify the email was confirmed via code if not verified_token: return jsonify({'success': False, 'error': L(lang, 'code_required')}), 400 try: token_data = _token_serializer.loads(verified_token, max_age=600) # 10 min email = token_data.get('verified_email', '').strip().lower() if not email: raise ValueError("No email in token") lang = token_data.get('lang', lang) except Exception: return jsonify({'success': False, 'error': L(lang, 'code_expired')}), 400 if len(password) < 6: return jsonify({'success': False, 'error': L(lang, 'auth_password_required')}), 400 existing = db.get_user_by_email(email) if existing: return jsonify({'success': False, 'error': L(lang, 'auth_email_exists')}), 409 pw_hash = generate_password_hash(password) user_id = db.create_user(email, pw_hash, name=name, lang=lang) db.update_user_last_login(user_id) user = db.get_user_by_id(user_id) auto_promote_admin(user) token = create_auth_token(user) return jsonify({'success': True, 'token': token, 'user': user_dict(user)}), 201 @app.route('/api/v1/auth/login', methods=['POST']) @rate_limit(max_calls=5, period=60) def login(): data = request.get_json(silent=True) or {} email = (data.get('email') or '').strip().lower() password = data.get('password') or '' lang = data.get('lang', 'en') # Brute force protection: max 5 failures per email per 15 min now = _time_mod.time() _login_failures[email] = [t for t in _login_failures[email] if now - t < 900] if len(_login_failures[email]) >= 5: _security_log('LOGIN_BLOCKED', details=f"email={email} brute_force") return jsonify({'success': False, 'error': 'Too many failed attempts. Try again in 15 minutes.'}), 429 user = db.get_user_by_email(email) if not user or not check_password_hash(user['password_hash'], password): _login_failures[email].append(now) _security_log('LOGIN_FAILED', details=f"email={email}") return jsonify({'success': False, 'error': L(lang, 'auth_invalid')}), 401 _login_failures.pop(email, None) auto_promote_admin(user) token = create_auth_token(user) db.update_user_last_login(user['id']) _security_log('LOGIN_OK', user['id'], f"email={email}") return jsonify({'success': True, 'token': token, 'user': user_dict(user)}) @app.route('/api/v1/auth/logout', methods=['POST']) def logout(): # Signed tokens are stateless — client removes from localStorage # Try to delete legacy DB session if it exists auth_header = request.headers.get('Authorization', '') if auth_header.startswith('Bearer '): try: db.delete_session(auth_header[7:]) except Exception: pass return jsonify({'success': True}) @app.route('/api/v1/auth/me', methods=['GET']) @require_auth def get_me(): return jsonify({'success': True, 'user': user_dict(request.user)}) @app.route('/api/v1/auth/google', methods=['POST']) @rate_limit(max_calls=5, period=60) def google_login(): """Login/register via Google ID token""" import requests as http_req data = request.get_json(silent=True) or {} credential = data.get('credential', '') lang = data.get('lang', 'en') if not credential: return jsonify({'success': False, 'error': 'Missing credential'}), 400 # Verify token with Google try: resp = http_req.get( 'https://oauth2.googleapis.com/tokeninfo', params={'id_token': credential}, timeout=10 ) if resp.status_code != 200: logger.error(f"Google tokeninfo failed: {resp.status_code} - {resp.text[:200]}") return jsonify({'success': False, 'error': L(lang, 'auth_invalid')}), 401 google_data = resp.json() # Verify audience matches our client ID expected_client_id = os.environ.get(config.ENV_GOOGLE_CLIENT_ID, '') if google_data.get('aud') != expected_client_id: logger.error(f"Google aud mismatch: got={google_data.get('aud')}, expected={expected_client_id}") return jsonify({'success': False, 'error': 'Invalid token audience'}), 401 email = google_data.get('email', '').lower().strip() name = google_data.get('name', '') or google_data.get('given_name', '') or email.split('@')[0] if not email: return jsonify({'success': False, 'error': 'No email in Google token'}), 400 except Exception as e: logger.error(f"Google auth error: {e}") return jsonify({'success': False, 'error': L(lang, 'auth_invalid')}), 401 # Find or create user user = db.get_user_by_email(email) if not user: # Register new user (random password — they'll use Google to login) pw_hash = generate_password_hash(str(uuid.uuid4())) user_id = db.create_user(email, pw_hash, name=name, lang=lang) user = db.get_user_by_id(user_id) auto_promote_admin(user) token = create_auth_token(user) db.update_user_last_login(user['id']) _security_log('GOOGLE_LOGIN', user['id'], f"email={email}") return jsonify({'success': True, 'token': token, 'user': user_dict(user)}) @app.route('/api/v1/auth/config', methods=['GET']) def auth_config(): """Return public auth config (Google Client ID)""" google_client_id = os.environ.get(config.ENV_GOOGLE_CLIENT_ID, '') return jsonify({ 'google_client_id': google_client_id if google_client_id else None, }) # ============================================================================= # PROFILE ENDPOINTS (personal cabinet) # ============================================================================= ROLE_OPTIONS = [ 'shipowner', 'operator', 'charterer', 'broker', 'freight_forwarder', 'port_agent', 'surveyor', 'other' ] VESSEL_TYPE_OPTIONS = [ 'bulk_carrier', 'tanker', 'container', 'general_cargo', 'ro_ro', 'lng_carrier', 'chemical_tanker', 'offshore', 'tug', 'barge', 'passenger', 'other' ] TRADE_ROUTE_OPTIONS = [ 'Mediterranean', 'Baltic', 'Black Sea', 'North Sea', 'Atlantic', 'Pacific', 'Indian Ocean', 'Southeast Asia', 'Middle East / Persian Gulf', 'West Africa', 'East Africa', 'South America', 'Caribbean', 'North America East Coast', 'North America West Coast', 'Australia / Oceania', 'Arctic' ] CARGO_TYPE_OPTIONS = [ 'dry_bulk', 'liquid_bulk', 'containerized', 'breakbulk', 'project_cargo', 'reefer', 'ro_ro_cargo', 'lng', 'lpg', 'chemicals', 'crude_oil', 'refined_products', 'grain', 'coal', 'iron_ore', 'fertilizer', 'other' ] @app.route('/api/v1/profile', methods=['GET']) @require_auth def get_profile(): profile = db.get_user_profile(request.user['id']) return jsonify({ 'success': True, 'profile': profile, 'options': { 'roles': ROLE_OPTIONS, 'vessel_types': VESSEL_TYPE_OPTIONS, 'trade_routes': TRADE_ROUTE_OPTIONS, 'cargo_types': CARGO_TYPE_OPTIONS, } }) _PROFILE_VALIDATORS = { 'company_name': (str, 100), 'role': (str, 50), 'fleet_size': (int, None), 'vessel_types': (list, 10), 'trade_routes': (list, 10), 'cargo_types': (list, 10), 'vessels_of_interest': (list, 20), 'experience_years': (int, None), 'phone': (str, 30), 'notes': (str, 500), 'home_port': (str, 80), 'preferred_tonnage': (int, None), 'search_radius': (int, None), } def _sanitize_profile_input(data: dict) -> dict: """Validate and sanitize profile fields. Strip HTML, limit lengths.""" clean = {} for field, (ftype, limit) in _PROFILE_VALIDATORS.items(): if field not in data: continue val = data[field] if ftype == str: val = _html_mod.escape(str(val or '').strip())[:limit] val = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', val) elif ftype == int: try: val = int(val) except (ValueError, TypeError): continue elif ftype == list: if not isinstance(val, list): continue val = [_html_mod.escape(str(v).strip())[:100] for v in val[:limit]] clean[field] = val return clean @app.route('/api/v1/profile', methods=['PUT']) @require_auth def update_profile(): data = request.get_json(silent=True) or {} clean = _sanitize_profile_input(data) profile = db.upsert_user_profile(request.user['id'], clean) return jsonify({'success': True, 'profile': profile}) # ============================================================================= # WALLET & DEPOSIT ENDPOINTS (USDT TRC20) # ============================================================================= USDT_TRC20_CONTRACT = config.USDT_TRC20_CONTRACT # Service fees (from config) DEPOSIT_FEE_PERCENT = config.DEPOSIT_FEE_PERCENT DEPOSIT_FEE_MIN = config.DEPOSIT_FEE_MIN WITHDRAW_FEE = config.WITHDRAW_FEE WITHDRAW_MIN = config.WITHDRAW_MIN @app.route('/api/v1/wallet', methods=['GET']) @require_auth def get_wallet(): """Get or create user's USDT deposit wallet""" wallet = db.get_user_wallet(request.user['id']) if not wallet: try: from tronpy.keys import PrivateKey priv = PrivateKey.random() address = priv.public_key.to_base58check_address() wallet = db.create_wallet(request.user['id'], address, priv.hex()) except Exception as e: logger.error(f"Wallet generation error: {e}") return jsonify({'success': False, 'error': 'Failed to generate wallet'}), 500 deposits = db.get_user_deposits(request.user['id'], limit=10) return jsonify({ 'success': True, 'address': wallet['address'], 'network': 'TRC20 (Tron)', 'currency': 'USDT', 'deposits': deposits, 'balance': request.user['balance'], 'fees': { 'deposit_percent': DEPOSIT_FEE_PERCENT, 'deposit_min': DEPOSIT_FEE_MIN, 'withdraw_flat': WITHDRAW_FEE, 'withdraw_min': WITHDRAW_MIN, }, }) @app.route('/api/v1/wallet/check', methods=['POST']) @require_auth def check_deposits(): """Check blockchain for new USDT deposits and credit balance""" import requests as http_req wallet = db.get_user_wallet(request.user['id']) if not wallet: return jsonify({'success': False, 'error': 'No wallet found. Open deposit page first.'}), 400 try: resp = http_req.get( f"https://api.trongrid.io/v1/accounts/{wallet['address']}/transactions/trc20", params={ 'contract_address': USDT_TRC20_CONTRACT, 'only_to': 'true', 'limit': 50, }, headers={'Accept': 'application/json'}, timeout=15 ) if resp.status_code != 200: logger.error(f"TronGrid API error: {resp.status_code}") return jsonify({'success': False, 'error': 'Blockchain API unavailable'}), 502 data = resp.json() new_deposits = 0 total_new = 0.0 total_credited = 0.0 for tx in data.get('data', []): tx_id = tx.get('transaction_id', '') if not tx_id or db.is_deposit_processed(tx_id): continue # USDT has 6 decimals on Tron raw_value = tx.get('value', '0') amount = int(raw_value) / 1_000_000 if amount <= 0: continue # Apply deposit fee fee = max(amount * DEPOSIT_FEE_PERCENT / 100, DEPOSIT_FEE_MIN) credited = round(amount - fee, 2) if credited <= 0: credited = 0.0 from_addr = tx.get('from', '') db.add_deposit(request.user['id'], tx_id, amount, from_addr) db.update_user_balance(request.user['id'], credited) new_deposits += 1 total_new += amount total_credited += credited user = db.get_user_by_id(request.user['id']) return jsonify({ 'success': True, 'new_deposits': new_deposits, 'new_amount': round(total_new, 2), 'credited_amount': round(total_credited, 2), 'fee_percent': DEPOSIT_FEE_PERCENT, 'balance': user['balance'], }) except Exception as e: logger.error(f"Deposit check error: {e}") return jsonify({'success': False, 'error': 'Failed to check deposits'}), 500 @app.route('/api/v1/wallet/withdraw', methods=['POST']) @require_auth @rate_limit(max_calls=5, period=60) def withdraw(): """Create a withdrawal request (USDT TRC20)""" data = request.get_json(silent=True) or {} amount = data.get('amount', 0) to_address = (data.get('to_address') or '').strip() lang = data.get('lang', 'en') # Validate amount try: amount = float(amount) except (TypeError, ValueError): return jsonify({'success': False, 'error': L(lang, 'withdraw_invalid_amount')}), 400 if amount < WITHDRAW_MIN: return jsonify({'success': False, 'error': L(lang, 'withdraw_min')}), 400 # Validate address (Tron addresses start with T, 34 chars) if not to_address or len(to_address) != 34 or not to_address.startswith('T'): return jsonify({'success': False, 'error': L(lang, 'withdraw_invalid_address')}), 400 # Total deducted = amount + withdrawal fee total_deducted = round(amount + WITHDRAW_FEE, 2) # Atomic balance deduction (prevents race condition with concurrent requests) if not db.charge_user(request.user['id'], total_deducted): return jsonify({'success': False, 'error': L(lang, 'withdraw_insufficient')}), 400 withdrawal = db.create_withdrawal(request.user['id'], amount, to_address) user = db.get_user_by_id(request.user['id']) return jsonify({ 'success': True, 'withdrawal': { 'id': withdrawal['id'], 'amount': withdrawal['amount'], 'fee': WITHDRAW_FEE, 'total_deducted': total_deducted, 'to_address': withdrawal['to_address'], 'status': withdrawal['status'], 'created_at': withdrawal['created_at'], }, 'balance': user['balance'], }) @app.route('/api/v1/wallet/withdrawals', methods=['GET']) @require_auth def get_withdrawals(): """Get user's withdrawal history""" withdrawals = db.get_user_withdrawals(request.user['id'], limit=20) return jsonify({'success': True, 'withdrawals': withdrawals}) # ============================================================================= # CHAT HISTORY ENDPOINTS # ============================================================================= @app.route('/api/v1/chat/history', methods=['GET']) @require_auth def get_chat_history(): limit = min(request.args.get('limit', 50, type=int), 200) history = db.get_chat_history(request.user['id'], limit=limit) return jsonify({'success': True, 'messages': history}) @app.route('/api/v1/chat/history', methods=['DELETE']) @require_auth def delete_chat_history(): db.clear_chat_history(request.user['id']) return jsonify({'success': True}) # ============================================================================= # CHAT ENDPOINT # ============================================================================= @app.route('/api/v1/chat', methods=['POST']) def chat(): from seafare_agent import _strip_tool_names """ Chat with SeaFare Montana bot. Works for everyone. If logged in — saves chat history. Anonymous users: 5 msg/hour. Authenticated: 30 msg/min. """ user = get_current_user() # None if anonymous # Per-user rate limiting ip = _get_client_ip() if user: rkey = f"chat:user:{user['id']}" max_c, period = config.RATE_LIMIT_CHAT_AUTH_MAX, config.RATE_LIMIT_CHAT_AUTH_PERIOD else: rkey = f"chat:anon:{ip}" max_c, period = config.RATE_LIMIT_CHAT_ANON_MAX, config.RATE_LIMIT_CHAT_ANON_PERIOD now = _time_mod.time() _rate_buckets[rkey] = [t for t in _rate_buckets[rkey] if now - t < period] if len(_rate_buckets[rkey]) >= max_c: return jsonify({'success': False, 'error': 'Too many requests. Please slow down.'}), 429 _rate_buckets[rkey].append(now) data = request.get_json(silent=True) or {} message = data.get('message', '').strip() ui_lang = data.get("lang", "en") from seafare_agent import _detect_lang as _dl lang = _dl(message) if message else ui_lang if not message: return jsonify({'response': L(lang, 'empty_msg')}), 400 if len(message) > config.CHAT_MESSAGE_MAX_LENGTH: return jsonify({'response': L(lang, 'msg_too_long')}), 400 try: _start = _time_mod.time() # Load conversation history BEFORE saving current message (to avoid duplication) conversation_history = None if user: conversation_history = db.get_chat_history(user['id'], limit=20) elif data.get('history'): # Anonymous users: accept client-side history for context continuity raw_hist = data['history'] if isinstance(raw_hist, list): conversation_history = [ {'role': h.get('role', 'user'), 'message': h.get('content', '')[:2000]} for h in raw_hist[-10:] if isinstance(h, dict) and h.get('content') ] # Save user message if logged in if user: db.add_chat_message(user['id'], 'user', message) _any_ai = os.environ.get(config.ENV_ANTHROPIC_API_KEY) or os.environ.get(config.ENV_GROQ_API_KEY) or os.environ.get(config.ENV_MISTRAL_API_KEY) if _any_ai: from seafare_agent import generate_response, _strip_tool_names # Query deduplication: same user, same question within 5 min → cached response _uid = user['id'] if user else ip _qhash = hash(message.lower().strip()) _dedup_key = f"{_uid}:{_qhash}" _cached = _response_cache.get(_dedup_key) if _cached and _time_mod.time() - _cached[1] < 300: response = _cached[0] else: user_context = db.get_profile_summary(user['id']) if user else None is_admin = bool(user.get('is_admin')) if user else False user_id = user['id'] if user else None # Add balance to context so agent knows user's funds if user: balance = user.get('balance', 0) or 0 balance_info = f"\nUser USDT balance: ${balance:.2f}" user_context = (user_context or "") + balance_info response = generate_response(message, conversation_history=conversation_history, user_context=user_context or None, is_admin=is_admin, user_id=user_id, lang=lang) _response_cache[_dedup_key] = (response, _time_mod.time()) # Evict old cache entries if len(_response_cache) > 500: cutoff = _time_mod.time() - 300 _response_cache.clear() else: response = handle_message_local(message, lang) # Save bot response if logged in if user: db.add_chat_message(user['id'], 'assistant', response) # Auto-summarize conversation every N messages (background) try: msg_count = db.get_chat_message_count(user['id']) if msg_count > 0 and msg_count % config.MEMORY_SUMMARY_INTERVAL == 0: import threading threading.Thread(target=_auto_summarize_conversation, args=(user['id'],), daemon=True).start() except Exception: pass # Token/cost tracking now handled inside generate_response() (Ouroboros pattern) # log_query() called there with input_tokens, output_tokens, cost_usd _elapsed = int((_time_mod.time() - _start) * 1000) return jsonify({'response': _strip_tool_names(response), 'ms': _elapsed}) except Exception as e: logger.error(f"Chat error: {e}") return jsonify({'response': L(lang, 'error')}), 500 @app.route('/api/v1/chat/stream', methods=['POST']) def chat_stream(): from seafare_agent import _strip_tool_names """ SSE streaming chat — sends real-time status updates while agent processes. Same logic as /api/v1/chat but streams tool execution status via Server-Sent Events. """ user = get_current_user() # Rate limiting (same as chat) ip = _get_client_ip() if user: rkey = f"chat:user:{user['id']}" max_c, period = config.RATE_LIMIT_CHAT_AUTH_MAX, config.RATE_LIMIT_CHAT_AUTH_PERIOD else: rkey = f"chat:anon:{ip}" max_c, period = config.RATE_LIMIT_CHAT_ANON_MAX, config.RATE_LIMIT_CHAT_ANON_PERIOD now = _time_mod.time() _rate_buckets[rkey] = [t for t in _rate_buckets[rkey] if now - t < period] if len(_rate_buckets[rkey]) >= max_c: def _rate_err(): yield f"event: error\ndata: {json.dumps({'error': 'Too many requests'})}\n\n" return Response(_rate_err(), content_type='text/event-stream', status=429) _rate_buckets[rkey].append(now) data = request.get_json(silent=True) or {} message = data.get('message', '').strip() ui_lang = data.get('lang', 'en') # Detect language from message text — override UI language from seafare_agent import _detect_lang as _dl lang = _dl(message) if message else ui_lang if not message: def _empty_err(): yield f"event: error\ndata: {json.dumps({'error': L(lang, 'empty_msg')})}\n\n" return Response(_empty_err(), content_type='text/event-stream', status=400) if len(message) > config.CHAT_MESSAGE_MAX_LENGTH: def _long_err(): yield f"event: error\ndata: {json.dumps({'error': L(lang, 'msg_too_long')})}\n\n" return Response(_long_err(), content_type='text/event-stream', status=400) # Load conversation history conversation_history = None if user: conversation_history = db.get_chat_history(user['id'], limit=20) elif data.get('history'): raw_hist = data['history'] if isinstance(raw_hist, list): conversation_history = [ {'role': h.get('role', 'user'), 'message': h.get('content', '')[:2000]} for h in raw_hist[-10:] if isinstance(h, dict) and h.get('content') ] # Save user message if user: db.add_chat_message(user['id'], 'user', message) _any_ai = os.environ.get(config.ENV_ANTHROPIC_API_KEY) or os.environ.get(config.ENV_GROQ_API_KEY) or os.environ.get(config.ENV_MISTRAL_API_KEY) if not _any_ai: def _no_ai(): resp = handle_message_local(message, lang) yield f"event: done\ndata: {json.dumps({'response': _strip_tool_names(resp)})}\n\n" return Response(_no_ai(), content_type='text/event-stream') # Check dedup cache _uid = user['id'] if user else ip _qhash = hash(message.lower().strip()) _dedup_key = f"{_uid}:{_qhash}" _cached = _response_cache.get(_dedup_key) if _cached and _time_mod.time() - _cached[1] < 300: cached_resp = _cached[0] def _cached_gen(): yield f"event: done\ndata: {json.dumps({'response': _strip_tool_names(cached_resp)})}\n\n" if user: db.add_chat_message(user['id'], 'assistant', cached_resp) return Response(_cached_gen(), content_type='text/event-stream') # Prepare agent context user_context = db.get_profile_summary(user['id']) if user else None is_admin = bool(user.get('is_admin')) if user else False user_id = user['id'] if user else None if user: balance = user.get('balance', 0) or 0 user_context = (user_context or "") + f"\nUser USDT balance: ${balance:.2f}" # SSE generator with agent thread _q = queue.Queue() def _status_cb(tool_name, tool_input=None): from seafare_agent import _get_tool_status label = _get_tool_status(tool_name, tool_input, lang) try: _q.put_nowait(('status', label)) except queue.Full: pass def _generate(): from seafare_agent import _strip_tool_names _start = _time_mod.time() def _run_agent(): try: from seafare_agent import generate_response, _strip_tool_names resp = generate_response( message, conversation_history=conversation_history, user_context=user_context or None, is_admin=is_admin, user_id=user_id, lang=lang, status_callback=_status_cb ) _q.put(('done', resp)) except Exception as e: logger.error(f"Chat stream agent error: {e}") _q.put(('error', str(e))) t = _threading_mod.Thread(target=_run_agent, daemon=True) t.start() final_response = None while True: try: event_type, payload = _q.get(timeout=240) if event_type == 'status': yield f"event: status\ndata: {json.dumps({'status': payload})}\n\n" elif event_type == 'done': final_response = payload _elapsed = int((_time_mod.time() - _start) * 1000) yield f"event: done\ndata: {json.dumps({'response': _strip_tool_names(payload), 'ms': _elapsed})}\n\n" break elif event_type == 'error': yield f"event: error\ndata: {json.dumps({'error': L(lang, 'error')})}\n\n" break except queue.Empty: # Timeout — agent took too long yield f"event: error\ndata: {json.dumps({'error': 'Request timed out'})}\n\n" break # Post-processing: save response, update cache if final_response and user: try: db.add_chat_message(user['id'], 'assistant', final_response) except Exception: pass try: msg_count = db.get_chat_message_count(user['id']) if msg_count > 0 and msg_count % config.MEMORY_SUMMARY_INTERVAL == 0: _threading_mod.Thread(target=_auto_summarize_conversation, args=(user['id'],), daemon=True).start() except Exception: pass if final_response: _response_cache[_dedup_key] = (final_response, _time_mod.time()) if len(_response_cache) > 500: _response_cache.clear() return Response(_generate(), content_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) # ============================================================================= # TRANSLATIONS for bot responses # ============================================================================= TEXTS = { 'empty_msg': { 'en': 'Please send a message.', 'ru': 'Пожалуйста, отправьте сообщение.', 'es': 'Por favor, envíe un mensaje.', 'zh': '请输入消息。', }, 'msg_too_long': { 'en': f'Message too long (max {config.CHAT_MESSAGE_MAX_LENGTH} characters).', 'zh': f'消息过长(最多 {config.CHAT_MESSAGE_MAX_LENGTH} 个字符)。', 'es': f'Mensaje demasiado largo (máx. {config.CHAT_MESSAGE_MAX_LENGTH} caracteres).', 'ru': f'Сообщение слишком длинное (макс. {config.CHAT_MESSAGE_MAX_LENGTH} символов).', }, 'error': { 'en': 'Error processing your request. Please try again.', 'ru': 'Ошибка обработки запроса. Попробуйте ещё раз.', 'es': 'Error al procesar su solicitud. Inténtelo de nuevo.', 'zh': '处理您的请求时出错。请重试。', }, 'no_vessels': { 'en': 'No vessels found for **"{q}"**. Try a different name, IMO, or MMSI number.', 'ru': 'Суда по запросу **"{q}"** не найдены. Попробуйте другое название, IMO или MMSI.', 'es': 'No se encontraron buques para **"{q}"**. Pruebe otro nombre, IMO o MMSI.', }, 'found_vessels': { 'en': 'Found **{n}** vessel(s) for **"{q}"**:\n', 'ru': 'Найдено **{n}** судно(а) по запросу **"{q}"**:\n', 'es': 'Se encontraron **{n}** buque(s) para **"{q}"**:\n', }, 'ask_details': { 'en': '_Ask me for details: "Who owns [vessel name]?"_', 'ru': '_Спросите подробнее: "Кто владелец [название судна]?"_', 'es': '_Pida detalles: "¿Quién es el propietario de [nombre del buque]?"_', 'zh': '_询问详情:"[船名]的船主是谁?"_', }, 'not_found': { 'en': 'Vessel **"{q}"** not found.', 'ru': 'Судно **"{q}"** не найдено.', 'es': 'Buque **"{q}"** no encontrado.', }, 'no_imo': { 'en': 'Found vessel but no IMO available.', 'ru': 'Судно найдено, но IMO недоступен.', 'es': 'Buque encontrado pero sin IMO disponible.', 'zh': '已找到船舶,但无 IMO 信息。', }, 'no_details': { 'en': 'Could not retrieve details for **"{q}"**.', 'ru': 'Не удалось получить данные по **"{q}"**.', 'es': 'No se pudieron obtener detalles de **"{q}"**.', }, 'lbl_flag': {'en': 'Flag', 'ru': 'Флаг', 'es': 'Bandera', 'zh': '船旗'}, 'lbl_type': {'en': 'Type', 'ru': 'Тип', 'es': 'Tipo', 'zh': '类型'}, 'lbl_gt': {'en': 'Gross Tonnage', 'ru': 'Валовый тоннаж', 'es': 'Tonelaje bruto', 'zh': '总吨位'}, 'lbl_dwt': {'en': 'DWT', 'ru': 'Дедвейт', 'es': 'DWT', 'zh': '载重吨'}, 'lbl_built': {'en': 'Year Built', 'ru': 'Год постройки', 'es': 'Año de construcción', 'zh': '建造年份'}, 'lbl_status': {'en': 'Status', 'ru': 'Статус', 'es': 'Estado', 'zh': '状态'}, 'lbl_callsign': {'en': 'Call Sign', 'ru': 'Позывной', 'es': 'Señal de llamada', 'zh': '呼号'}, 'lbl_companies': { 'en': 'Companies', 'ru': 'Компании', 'es': 'Empresas', 'zh': '公司', }, 'role_owner': {'en': 'Owner', 'ru': 'Владелец', 'es': 'Propietario', 'zh': '船主'}, 'role_manager': {'en': 'Manager', 'ru': 'Менеджер', 'es': 'Gerente', 'zh': '管理人'}, 'role_ism_manager': {'en': 'ISM Manager', 'ru': 'ISM Менеджер', 'es': 'Gerente ISM', 'zh': 'ISM 管理人'}, 'role_operator': {'en': 'Operator', 'ru': 'Оператор', 'es': 'Operador', 'zh': '运营商'}, 'role_technical_manager': {'en': 'Technical Manager', 'ru': 'Тех. менеджер', 'es': 'Gerente Técnico', 'zh': '技术管理人'}, 'search_error': { 'en': 'Search error: {e}', 'ru': 'Ошибка поиска: {e}', 'es': 'Error de búsqueda: {e}', }, 'details_error': { 'en': 'Error getting details: {e}', 'ru': 'Ошибка получения данных: {e}', 'es': 'Error al obtener detalles: {e}', }, 'pos_header': { 'en': '**Position: {name}**\n', 'ru': '**Позиция: {name}**\n', 'es': '**Posición: {name}**\n', 'zh': '**位置:{name}**\n', }, 'pos_coords': {'en': 'Coordinates', 'ru': 'Координаты', 'es': 'Coordenadas', 'zh': '坐标'}, 'pos_speed': {'en': 'Speed', 'ru': 'Скорость', 'es': 'Velocidad', 'zh': '速度'}, 'pos_course': {'en': 'Course', 'ru': 'Курс', 'es': 'Rumbo', 'zh': '航向'}, 'pos_destination': {'en': 'Destination', 'ru': 'Назначение', 'es': 'Destino', 'zh': '目的地'}, 'pos_eta': {'en': 'ETA', 'ru': 'Прибытие', 'es': 'ETA', 'zh': '预计到达'}, 'pos_updated': {'en': 'Updated', 'ru': 'Обновлено', 'es': 'Actualizado', 'zh': '更新时间'}, 'pos_source': {'en': 'Source', 'ru': 'Источник', 'es': 'Fuente', 'zh': '来源'}, 'pos_map_link': { 'en': '**[View on MarineTraffic map]({url})**', 'ru': '**[Смотреть на карте MarineTraffic]({url})**', 'es': '**[Ver en mapa MarineTraffic]({url})**', 'zh': '**[在 MarineTraffic 地图上查看]({url})**', }, 'pos_no_coords': { 'en': '**Position: {name}**\n\n IMO: `{imo}`\n MMSI: `{mmsi}`\n\nLive coordinates are not available via public scraping.\nUse the link below to see the vessel on the map:\n\n**[Open on MarineTraffic]({url})**', 'ru': '**Позиция: {name}**\n\n IMO: `{imo}`\n MMSI: `{mmsi}`\n\nКоординаты недоступны через публичный парсинг.\nИспользуйте ссылку ниже для просмотра на карте:\n\n**[Открыть на MarineTraffic]({url})**', 'es': '**Posición: {name}**\n\n IMO: `{imo}`\n MMSI: `{mmsi}`\n\nLas coordenadas no están disponibles mediante scraping público.\nUse el enlace para ver el buque en el mapa:\n\n**[Abrir en MarineTraffic]({url})**', }, 'pos_vessel_not_found': { 'en': 'Could not find vessel **"{q}"**. Please provide MMSI (9 digits) or IMO (7 digits) for position tracking.', 'ru': 'Не удалось найти судно **"{q}"**. Укажите MMSI (9 цифр) или IMO (7 цифр) для отслеживания позиции.', 'es': 'No se pudo encontrar el buque **"{q}"**. Proporcione MMSI (9 dígitos) o IMO (7 dígitos) para seguimiento.', }, 'demurrage_title': { 'en': '**Demurrage Calculation**', 'ru': '**Расчёт демереджа**', 'es': '**Cálculo de demoras**', 'zh': '**滞期费计算**', }, 'demurrage_agreed': {'en': 'Agreed days', 'ru': 'Согласовано дней', 'es': 'Días acordados', 'zh': '合同天数'}, 'demurrage_actual': {'en': 'Actual days', 'ru': 'Фактически дней', 'es': 'Días reales', 'zh': '实际天数'}, 'demurrage_delay': {'en': 'Delay', 'ru': 'Задержка', 'es': 'Demora', 'zh': '延误'}, 'demurrage_days': {'en': 'days', 'ru': 'дн.', 'es': 'días', 'zh': '天'}, 'demurrage_rate': {'en': 'Daily rate', 'ru': 'Ставка/день', 'es': 'Tarifa diaria', 'zh': '日费率'}, 'demurrage_total': {'en': 'Total demurrage', 'ru': 'Итого демередж', 'es': 'Total demoras', 'zh': '总滞期费'}, 'demurrage_formula': {'en': 'Formula', 'ru': 'Формула', 'es': 'Fórmula', 'zh': '计算公式'}, 'demurrage_prompt': { 'en': '**Demurrage Calculator**\n\nPlease provide:\n 1. Agreed days (contract laytime)\n 2. Actual days spent\n 3. Daily rate in USD\n\n_Example: "Calculate demurrage: 10 agreed days, 15 actual days, $25,000 daily rate"_', 'ru': '**Калькулятор демереджа**\n\nУкажите:\n 1. Согласованные дни (контрактный лейтайм)\n 2. Фактические дни\n 3. Ставка в день (USD)\n\n_Пример: "Calculate demurrage: 10 agreed days, 15 actual days, $25,000 daily rate"_', 'es': '**Calculadora de demoras**\n\nProporcione:\n 1. Días acordados (laytime del contrato)\n 2. Días reales\n 3. Tarifa diaria en USD\n\n_Ejemplo: "Calculate demurrage: 10 agreed days, 15 actual days, $25,000 daily rate"_', }, 'contacts_info': { 'en': '**Contact Introduction Service** — temporarily FREE\n\nI can connect you with:\n - Ship owners and operators\n - Freight brokers\n - Port agents\n - Chartering companies\n\nJust ask me to find contacts for any company or vessel!', 'zh': '**联系人介绍服务** — 暂时免费\n\n我可以为您联系:\n - 船主和运营商\n - 货运经纪人\n - 港口代理\n - 租船公司\n\n只需让我为任意公司或船舶找联系人!', 'es': '**Servicio de contactos** — temporalmente GRATIS\n\nPuedo conectarle con:\n - Propietarios y operadores de buques\n - Corredores de flete\n - Agentes portuarios\n - Empresas de fletamento\n\n¡Solo pídame buscar contactos de cualquier empresa o buque!', 'ru': '**Предоставление контактов** — временно БЕСПЛАТНО\n\nЯ могу связать вас с:\n - Судовладельцами и операторами\n - Фрахтовыми брокерами\n - Портовыми агентами\n - Чартерными компаниями\n\nПросто попросите найти контакты любой компании или судна!', }, 'auth_email_required': { 'en': 'Please enter a valid email.', 'ru': 'Укажите корректный email.', 'es': 'Ingrese un email válido.', 'zh': '请输入有效的电子邮件。', }, 'auth_password_required': { 'en': 'Password must be at least 6 characters.', 'ru': 'Пароль должен быть не менее 6 символов.', 'es': 'La contraseña debe tener al menos 6 caracteres.', 'zh': '密码至少需要6个字符。', }, 'code_sent': { 'en': 'Verification code sent to your email.', 'ru': 'Код подтверждения отправлен на вашу почту.', 'es': 'Código de verificación enviado a su correo.', 'zh': '验证码已发送至您的邮箱。', }, 'code_invalid': { 'en': 'Invalid or expired code.', 'ru': 'Неверный или просроченный код.', 'es': 'Código inválido o expirado.', 'zh': '验证码无效或已过期。', }, 'code_required': { 'en': 'Email verification required.', 'ru': 'Требуется подтверждение email.', 'es': 'Se requiere verificación de email.', 'zh': '需要验证电子邮件。', }, 'code_expired': { 'en': 'Verification expired. Please request a new code.', 'ru': 'Подтверждение истекло. Запросите новый код.', 'es': 'Verificación expirada. Solicite un nuevo código.', 'zh': '验证已过期。请重新申请验证码。', }, 'auth_email_exists': { 'en': 'An account with this email already exists.', 'ru': 'Аккаунт с таким email уже существует.', 'es': 'Ya existe una cuenta con este email.', 'zh': '此邮箱已注册账户。', }, 'auth_invalid': { 'en': 'Invalid email or password.', 'ru': 'Неверный email или пароль.', 'es': 'Email o contraseña incorrectos.', 'zh': '邮箱或密码错误。', }, 'withdraw_invalid_amount': { 'en': 'Please enter a valid amount.', 'ru': 'Укажите корректную сумму.', 'es': 'Ingrese un monto válido.', 'zh': '请输入有效金额。', }, 'withdraw_min': { 'en': 'Minimum withdrawal amount is $2 USDT.', 'ru': 'Минимальная сумма вывода — $2 USDT.', 'es': 'El monto mínimo de retiro es $2 USDT.', 'zh': '最低提款金额为 $2 USDT。', }, 'withdraw_invalid_address': { 'en': 'Please enter a valid TRC20 address (starts with T, 34 characters).', 'ru': 'Укажите корректный TRC20 адрес (начинается с T, 34 символа).', 'es': 'Ingrese una dirección TRC20 válida (comienza con T, 34 caracteres).', 'zh': '请输入有效的 TRC20 地址(以 T 开头,34个字符)。', }, 'withdraw_insufficient': { 'en': 'Insufficient balance for this withdrawal.', 'ru': 'Недостаточный баланс для вывода.', 'es': 'Saldo insuficiente para este retiro.', 'zh': '余额不足,无法完成此次提款。', }, 'services_info': { 'en': '**SeaFare Montana**\n\nHere\'s what I can do (all services temporarily FREE):\n\n - Vessel search by name, IMO, or MMSI\n - Vessel details (owner, operator, manager)\n - Live position tracking (AIS)\n - Demurrage calculation\n - Sea route calculation\n - Vessels near port\n - Contact introductions (shipper <-> operator)\n\n_Try: "Search vessel EVER GIVEN" or "Vessels near Rotterdam"_', 'zh': '**SeaFare Montana — 海运物流 AI 代理**\n\n我能做的事(所有服务暂时免费):\n\n - 按名称、IMO 或 MMSI 搜索船舶\n - 船舶详情(船主、运营商、管理人)\n - 实时位置追踪(AIS)\n - 滞期费计算\n - 海上航线计算\n - 港口附近船舶\n - 联系人介绍(货主 <-> 运营商)\n\n_试试:"Search vessel EVER GIVEN" 或 "鹿特丹附近的船舶"_', 'es': '**SeaFare Montana — Agente AI de Logística Marítima**\n\nLo que puedo hacer (todos los servicios temporalmente GRATIS):\n\n - Búsqueda de buques por nombre, IMO o MMSI\n - Detalles del buque (propietario, operador, gerente)\n - Seguimiento de posición en vivo (AIS)\n - Cálculo de demoras\n - Cálculo de rutas marítimas\n - Buques cerca del puerto\n - Presentación de contactos (cargador <-> operador)\n\n_Pruebe: "Search vessel EVER GIVEN" o "Buques cerca de Rotterdam"_', 'ru': '**SeaFare Montana**\n\nМои возможности (все услуги временно БЕСПЛАТНЫ):\n\n - Поиск судна по названию, IMO или MMSI\n - Данные о судне (владелец, оператор, менеджер)\n - Отслеживание позиции (AIS)\n - Расчёт демереджа\n - Расчёт морского маршрута\n - Суда рядом с портом\n - Предоставление контактов (грузоотправитель <-> оператор)\n\n_Попробуйте: "Search vessel EVER GIVEN" или "Суда рядом с Роттердамом"_', }, } def L(lang: str, key: str, **kwargs) -> str: """Get localized text""" texts = TEXTS.get(key, {}) text = texts.get(lang, texts.get('en', key)) if kwargs: text = text.format(**kwargs) return text ROLE_NAMES = { 'owner': 'role_owner', 'manager': 'role_manager', 'ism_manager': 'role_ism_manager', 'operator': 'role_operator', 'technical_manager': 'role_technical_manager', } # ============================================================================= # AUTO-SUMMARIZE CONVERSATIONS # ============================================================================= def _auto_summarize_conversation(user_id: int): """Background task: summarize recent chat using Haiku for long-term memory.""" try: import anthropic recent = db.get_chat_history(user_id, limit=20) if len(recent) < 10: return text = "\n".join(f"{m['role']}: {m.get('message', '')[:200]}" for m in recent) client = anthropic.Anthropic(api_key=os.environ.get(config.ENV_ANTHROPIC_API_KEY)) resp = client.messages.create( model='claude-haiku-4-5-20251001', max_tokens=200, system="Summarize this maritime chat in 2-3 sentences. Focus on: vessels/ports searched, decisions made, user preferences. Be concise.", messages=[{"role": "user", "content": text}] ) summary = resp.content[0].text db.save_conversation_summary(user_id, summary, len(recent)) logger.info(f"Auto-summary saved for user {user_id}: {summary[:80]}...") except Exception as e: logger.warning(f"Auto-summarize failed for user {user_id}: {e}") # ============================================================================= # MESSAGE HANDLER # ============================================================================= def handle_message_local(message: str, lang: str = 'en') -> str: """Handle messages without OpenAI — direct tool matching""" msg = message.lower() # --- Vessel search --- if any(kw in msg for kw in ['search vessel', 'find vessel', 'find ship', 'search ship', 'look up', 'lookup', 'найти', 'найди', 'поиск', 'искать', 'ищи', 'судно', 'buscar', 'busca', 'búsqueda', 'encontrar', 'encuentra']): query = extract_vessel_query(message) if query: return do_vessel_search(query, lang) # --- Owner / operator --- if any(kw in msg for kw in ['who owns', 'owner of', 'operator of', 'who is the owner', 'manager of', 'владелец', 'кто владеет', 'оператор', 'чьё', 'чье', 'propietario', 'dueño', 'operador', 'quién es']): query = extract_vessel_query(message) if query: return do_vessel_details(query, lang) # --- Vessel info by IMO/MMSI --- imo_match = re.search(r'\b(\d{7})\b', message) mmsi_match = re.search(r'\b(\d{9})\b', message) if any(kw in msg for kw in ['position', 'where is', 'location', 'track', 'позиция', 'где находится', 'где', 'местоположение', 'отследить', 'posición', 'donde', 'ubicación', 'rastrear']): if mmsi_match: return do_position(mmsi_match.group(1), lang) if imo_match: return do_position(imo_match.group(1), lang) query = extract_vessel_query(message) if query: return do_position(query, lang) # --- Demurrage --- if any(kw in msg for kw in ['demurrage', 'delay cost', 'демередж', 'простой', 'demora']): return do_demurrage(message, lang) # --- Contacts (paid) --- if any(kw in msg for kw in ['contact', 'introduction', 'connect me', 'broker', 'find operator', 'контакт', 'связь', 'брокер', 'contacto', 'corredor']): return do_contacts_info(lang) # --- Services --- if any(kw in msg for kw in ['service', 'what can you', 'help', 'what do you offer', 'pricing', 'услуг', 'помощь', 'что умеешь', 'servicio', 'ayuda']): return do_services(lang) # --- Generic vessel name detection --- vessel_name = re.search(r'\b([A-Z][A-Z\s]{2,30})\b', message) if vessel_name: query = vessel_name.group(1).strip() if len(query) >= 3: return do_vessel_search(query, lang) if imo_match: return do_vessel_details(imo_match.group(1), lang) if mmsi_match: return do_vessel_details(mmsi_match.group(1), lang) # --- Default --- return do_services(lang) def extract_vessel_query(message: str) -> str: """Extract vessel name/IMO/MMSI from message""" num = re.search(r'\b(\d{7,9})\b', message) if num: return num.group(1) quoted = re.search(r'["\'](.+?)["\']', message) if quoted: return quoted.group(1) upper = re.search(r'\b([A-Z][A-Z\s]{2,30})\b', message) if upper: return upper.group(1).strip() skip = {'search', 'find', 'vessel', 'ship', 'lookup', 'look', 'up', 'for', 'the', 'a', 'an', 'who', 'owns', 'owner', 'of', 'is', 'what', 'where', 'position', 'tell', 'me', 'about', 'can', 'you', 'please', 'get', 'show', 'info', 'information', 'details', 'operator', 'manager', 'найти', 'найди', 'судно', 'корабль', 'кто', 'владелец', 'оператор', 'покажи', 'где', 'поиск', 'искать', 'ищи', 'дай', 'мне', 'про', 'buscar', 'busca', 'buque', 'barco', 'navío', 'quien', 'quién', 'propietario', 'mostrar', 'donde', 'encuentra', 'encontrar', 'el', 'la', 'del'} words = [w for w in message.split() if w.lower() not in skip and len(w) > 1] if words: return ' '.join(words[-3:]) return '' def do_vessel_search(query: str, lang: str = 'en') -> str: """Search vessel via Equasis""" try: equasis = eq.get_parser() results = equasis.search_vessel(query) if not results: return L(lang, 'no_vessels', q=query) lines = [L(lang, 'found_vessels', n=len(results), q=query)] for v in results[:5]: lines.append(f"**{v.get('name', '?')}**") lines.append(f" IMO: `{v.get('imo', '?')}` | {L(lang, 'lbl_type')}: {v.get('type', '?')} | " f"{L(lang, 'lbl_gt')}: {v.get('gross_tonnage', '?')} | {L(lang, 'lbl_built')}: {v.get('year_built', '?')} | " f"{L(lang, 'lbl_flag')}: {v.get('flag', '?')}") lines.append("") lines.append(L(lang, 'ask_details')) return '\n'.join(lines) except Exception as e: return L(lang, 'search_error', e=e) def do_vessel_details(query: str, lang: str = 'en') -> str: """Get vessel details from Equasis""" try: equasis = eq.get_parser() if query.isdigit() and len(query) == 7: details = equasis.get_vessel_details(query) else: results = equasis.search_vessel(query) if not results: return L(lang, 'not_found', q=query) imo = results[0].get('imo') if not imo: return L(lang, 'no_imo') details = equasis.get_vessel_details(imo) if not details or not details.get('name'): return L(lang, 'no_details', q=query) d = details lines = [ f"**{d.get('name', '?')}**\n", f" IMO: `{d.get('imo', '-')}`", f" MMSI: `{d.get('mmsi', '-')}`", f" {L(lang, 'lbl_callsign')}: `{d.get('callsign', '-')}`", f" {L(lang, 'lbl_flag')}: {d.get('flag', '-')}", f" {L(lang, 'lbl_type')}: {d.get('type', '-')}", f" {L(lang, 'lbl_gt')}: {d.get('gross_tonnage', '-')}", f" {L(lang, 'lbl_dwt')}: {d.get('deadweight', '-')}", f" {L(lang, 'lbl_built')}: {d.get('year_built', '-')}", f" {L(lang, 'lbl_status')}: {d.get('status', '-')}", ] companies = d.get('companies', []) if companies: lines.append(f"\n**{L(lang, 'lbl_companies')}:**") for c in companies: role_key = ROLE_NAMES.get(c.get('role', ''), '') role = L(lang, role_key) if role_key else c.get('role', '?').replace('_', ' ').title() name = c.get('name', '?') addr = c.get('address', '') lines.append(f" {role}: **{name}**") if addr: lines.append(f" _{addr[:60]}_") return '\n'.join(lines) except Exception as e: return L(lang, 'details_error', e=e) def do_position(query: str, lang: str = 'en') -> str: """Get vessel position — tries MarineTraffic, falls back to link""" try: mmsi = None imo = None name = query # Determine what we have: MMSI, IMO, or name if query.isdigit() and len(query) == 9: mmsi = query elif query.isdigit() and len(query) == 7: imo = query else: # Search Equasis for MMSI/IMO equasis = eq.get_parser() results = equasis.search_vessel(query) if results: imo = results[0].get('imo') name = results[0].get('name', query) # Get details for MMSI if imo: details = equasis.get_vessel_details(imo) mmsi = details.get('mmsi') name = details.get('name', name) else: return L(lang, 'pos_vessel_not_found', q=query) # Build MarineTraffic URL if mmsi: mt_url = f"https://www.marinetraffic.com/en/ais/details/ships/mmsi:{mmsi}" elif imo: mt_url = f"https://www.marinetraffic.com/en/ais/details/ships/imo:{imo}" else: mt_url = f"https://www.marinetraffic.com/en/ais/index/search/all/keyword:{query}" # Try MarineTraffic API if key available parser = mt.get_parser() if parser.has_api_key(): pos_data = parser.api_vessel_position(mmsi=mmsi, imo=imo) if pos_data and isinstance(pos_data, list) and len(pos_data) > 0: p = pos_data[0] lines = [L(lang, 'pos_header', name=name)] lines.append(f" {L(lang, 'pos_coords')}: **{p.get('LAT', '?')}, {p.get('LON', '?')}**") if p.get('SPEED'): lines.append(f" {L(lang, 'pos_speed')}: **{float(p['SPEED'])/10:.1f} kn**") if p.get('COURSE'): lines.append(f" {L(lang, 'pos_course')}: **{p['COURSE']}°**") if p.get('DESTINATION'): lines.append(f" {L(lang, 'pos_destination')}: **{p['DESTINATION']}**") if p.get('ETA'): lines.append(f" {L(lang, 'pos_eta')}: **{p['ETA']}**") if p.get('TIMESTAMP'): lines.append(f" {L(lang, 'pos_updated')}: {p['TIMESTAMP']}") lines.append(f" {L(lang, 'pos_source')}: MarineTraffic API") lines.append(f"\n{L(lang, 'pos_map_link', url=mt_url)}") return '\n'.join(lines) # Try public scraping if mmsi: vessel_data = parser.get_vessel_page(mmsi) lat = vessel_data.get('latitude') lon = vessel_data.get('longitude') if lat and lon: lines = [L(lang, 'pos_header', name=vessel_data.get('name', name))] lines.append(f" {L(lang, 'pos_coords')}: **{lat}, {lon}**") if vessel_data.get('speed'): lines.append(f" {L(lang, 'pos_speed')}: **{vessel_data['speed']} kn**") if vessel_data.get('course'): lines.append(f" {L(lang, 'pos_course')}: **{vessel_data['course']}°**") if vessel_data.get('destination'): lines.append(f" {L(lang, 'pos_destination')}: **{vessel_data['destination']}**") lines.append(f" {L(lang, 'pos_source')}: MarineTraffic") lines.append(f"\n{L(lang, 'pos_map_link', url=mt_url)}") return '\n'.join(lines) # Fallback: show what we know + direct link return L(lang, 'pos_no_coords', name=name, imo=imo or '-', mmsi=mmsi or '-', url=mt_url) except Exception as e: return L(lang, 'details_error', e=e) def do_demurrage(message: str, lang: str = 'en') -> str: """Calculate demurrage from message""" numbers = re.findall(r'[\d,]+(?:\.\d+)?', message.replace(',', '')) numbers = [float(n) for n in numbers if n] if len(numbers) >= 3: agreed = numbers[0] actual = numbers[1] rate = numbers[2] delay = max(0, actual - agreed) total = delay * rate return (f"{L(lang, 'demurrage_title')}\n\n" f" {L(lang, 'demurrage_agreed')}: **{agreed:.0f}**\n" f" {L(lang, 'demurrage_actual')}: **{actual:.0f}**\n" f" {L(lang, 'demurrage_delay')}: **{delay:.0f} {L(lang, 'demurrage_days')}**\n" f" {L(lang, 'demurrage_rate')}: **${rate:,.0f}**\n\n" f" **{L(lang, 'demurrage_total')}: ${total:,.0f}**\n\n" f" {L(lang, 'demurrage_formula')}: ({actual:.0f} - {agreed:.0f}) x ${rate:,.0f} = **${total:,.0f}**") return L(lang, 'demurrage_prompt') def do_contacts_info(lang: str = 'en') -> str: """Info about paid contacts service""" return L(lang, 'contacts_info') def do_services(lang: str = 'en') -> str: """List available services""" return L(lang, 'services_info') # ============================================================================= # VESSEL ENDPOINTS # ============================================================================= @app.route('/api/v1/vessel/', methods=['GET']) @rate_limit(max_calls=30, period=60) def get_vessel(identifier: str): """ Get vessel by MMSI, IMO, or name First checks local DB, then fetches from MarineTraffic """ # Determine identifier type vessel = None try: if identifier.isdigit() and len(identifier) == 9: vessel = db.get_vessel(mmsi=identifier) search_param = {'mmsi': identifier} elif identifier.isdigit() and len(identifier) == 7: vessel = db.get_vessel(imo=identifier) search_param = {'imo': identifier} else: vessel = db.get_vessel(name=identifier) search_param = {'name': identifier} except Exception as e: logger.error(f"Vessel DB lookup error: {e}") search_param = {'mmsi': identifier} if identifier.isdigit() else {'name': identifier} # Check if we need to refresh if vessel: updated = datetime.fromisoformat(vessel.get('updated_at', '2000-01-01')) if datetime.utcnow() - updated < timedelta(hours=VESSEL_CACHE_HOURS): return jsonify({ 'success': True, 'source': 'cache', 'vessel': vessel }) # Fetch from MarineTraffic parser = mt.get_parser() mmsi = search_param.get('mmsi') or (vessel.get('mmsi') if vessel else None) if mmsi: fresh_data = parser.get_vessel_page(mmsi) if fresh_data: db.upsert_vessel(fresh_data) return jsonify({ 'success': True, 'source': 'marinetraffic', 'vessel': fresh_data }) if vessel: return jsonify({ 'success': True, 'source': 'cache_stale', 'vessel': vessel }) return jsonify({ 'success': False, 'error': 'Vessel not found' }), 404 @app.route('/api/v1/vessel/search', methods=['GET']) @rate_limit(max_calls=30, period=60) def search_vessels(): """ Search vessels by query """ query = request.args.get('q', '') if not query or len(query) < 2: return jsonify({ 'success': False, 'error': 'Query too short (min 2 chars)' }), 400 # Search local DB first results = db.search_vessels(query, limit=20) # If few results, search MarineTraffic if len(results) < 5: mt_results = mt.search_vessel(query) for v in mt_results: if v.get('mmsi') and not any(r.get('mmsi') == v.get('mmsi') for r in results): results.append(v) # Cache to DB db.upsert_vessel(v) return jsonify({ 'success': True, 'count': len(results), 'vessels': results[:20] }) # ============================================================================= # POSITION ENDPOINTS # ============================================================================= @app.route('/api/v1/position/', methods=['GET']) @rate_limit(max_calls=30, period=60) def get_position(mmsi: str): """ Get current vessel position """ # Check cache cached = db.get_last_position(mmsi) if cached: received = datetime.fromisoformat(cached.get('received_at', '2000-01-01')) if datetime.utcnow() - received < timedelta(minutes=POSITION_CACHE_MINUTES): return jsonify({ 'success': True, 'source': 'cache', 'position': cached }) # Fetch fresh from MarineTraffic parser = mt.get_parser() vessel_data = parser.get_vessel_page(mmsi) if vessel_data.get('latitude') and vessel_data.get('longitude'): db.add_position( mmsi=mmsi, lat=vessel_data['latitude'], lon=vessel_data['longitude'], speed=vessel_data.get('speed'), course=vessel_data.get('course'), heading=vessel_data.get('heading'), destination=vessel_data.get('destination'), eta=vessel_data.get('eta') ) return jsonify({ 'success': True, 'source': 'marinetraffic', 'position': { 'mmsi': mmsi, 'latitude': vessel_data['latitude'], 'longitude': vessel_data['longitude'], 'speed': vessel_data.get('speed'), 'course': vessel_data.get('course'), 'timestamp': datetime.utcnow().isoformat() } }) if cached: return jsonify({ 'success': True, 'source': 'cache_stale', 'position': cached }) return jsonify({ 'success': False, 'error': 'Position not available' }), 404 @app.route('/api/v1/position//history', methods=['GET']) @rate_limit(max_calls=30, period=60) def get_position_history(mmsi: str): """ Get vessel position history """ hours = request.args.get('hours', 24, type=int) history = db.get_position_history(mmsi, hours=hours) return jsonify({ 'success': True, 'mmsi': mmsi, 'hours': hours, 'count': len(history), 'positions': history }) # ============================================================================= # PORT VESSEL ENDPOINTS # ============================================================================= @app.route('/api/v1/port/vessels', methods=['GET']) @rate_limit(max_calls=20, period=60) def get_port_vessels_api(): """Get vessels currently in/near a port""" port_name = request.args.get('port', '') if not port_name or len(port_name) < 2: return jsonify({'success': False, 'error': 'Port name required (min 2 chars)'}), 400 port = mt.resolve_port(port_name) if not port: return jsonify({'success': False, 'error': f"Port '{port_name}' not found"}), 404 cache_key = port.get('unlocode', port['name']).lower() cached = db.get_cached_port_vessels(cache_key) if cached is not None: return jsonify({ 'success': True, 'source': 'cache', 'port': port['name'], 'country': port['country'], 'coordinates': {'lat': port['lat'], 'lon': port['lon']}, 'count': len(cached), 'vessels': cached }) parser = mt.get_parser() vessels = parser.get_port_vessels(port_name) if vessels: db.cache_port_vessels(cache_key, vessels) return jsonify({ 'success': True, 'source': 'live' if vessels else 'unavailable', 'port': port['name'], 'country': port['country'], 'coordinates': {'lat': port['lat'], 'lon': port['lon']}, 'count': len(vessels), 'vessels': vessels }) # ============================================================================= # MAP VESSELS ENDPOINT # ============================================================================= @app.route('/api/v1/map/vessels', methods=['GET']) @rate_limit(max_calls=20, period=60) def get_map_vessels_api(): """Get vessel positions within a bounding box for map display. Uses ALL sources: DB cache → MarineTraffic scraping → AISStream → Digitraffic → AISHub.""" try: lat_min = float(request.args.get('lat_min', 0)) lat_max = float(request.args.get('lat_max', 0)) lon_min = float(request.args.get('lon_min', 0)) lon_max = float(request.args.get('lon_max', 0)) except (ValueError, TypeError): return jsonify({'success': False, 'error': 'Invalid coordinates'}), 400 if lat_min >= lat_max or lon_min >= lon_max: return jsonify({'success': False, 'error': 'Invalid bounding box'}), 400 # Allow larger viewports (up to 60° lat × 120° lon for global view) if (lat_max - lat_min) > 60 or (lon_max - lon_min) > 120: return jsonify({'success': False, 'error': 'Bounding box too large'}), 400 limit = min(int(request.args.get('limit', 500)), 1000) center_lat = (lat_min + lat_max) / 2 center_lon = (lon_min + lon_max) / 2 radius_nm = max((lat_max - lat_min) * 60 / 2, 1) # Use full AIS provider chain (DB → MT scraping → AISStream → Digitraffic → AISHub) try: from ais_provider import get_provider provider = get_provider() vessels = provider.get_vessels_in_area( lat=center_lat, lon=center_lon, radius_nm=min(radius_nm, 500)) except Exception as e: logger.error(f"Map AIS provider error: {e}") vessels = [] # Fallback: direct DB query (positions from any source, up to 60 min old) if not vessels: try: vessels = db.get_positions_in_area(lat_min, lat_max, lon_min, lon_max, max_age_minutes=60) except Exception as e: logger.error(f"Map DB fallback error: {e}") vessels = [] # Enrich with mt_bulk_staging (global fleet, 1-24h positions) existing_mmsis = {str(v.get('mmsi')) for v in vessels if v.get('mmsi')} remaining = limit - len(vessels) if remaining > 0: try: mt_staging = db.get_mt_bulk_staging_in_area( lat_min, lat_max, lon_min, lon_max, limit=remaining) for v in mt_staging: mmsi = str(v.get('mmsi', '')) if mmsi and mmsi not in existing_mmsis: existing_mmsis.add(mmsi) vessels.append(v) except Exception as e: logger.error(f"Map mt_bulk_staging error: {e}") from marinetraffic_parser import classify_vessel_type for v in vessels: if not v.get('type_category') or v['type_category'] == 'other': v['type_category'] = classify_vessel_type( v.get('type') or '', v.get('type_code')) # Normalize lat/lon keys for frontend if 'lat' in v and 'latitude' not in v: v['latitude'] = v.pop('lat') if 'lon' in v and 'longitude' not in v: v['longitude'] = v.pop('lon') vessels = vessels[:limit] return jsonify({ 'success': True, 'count': len(vessels), 'bounds': {'lat_min': lat_min, 'lat_max': lat_max, 'lon_min': lon_min, 'lon_max': lon_max}, 'vessels': vessels }) # ============================================================================= # MAP VESSELS BULK ENDPOINT (for Supercluster clustering) # ============================================================================= @app.route('/api/v1/map/vessels/all', methods=['GET']) @rate_limit(max_calls=5, period=60) def get_map_vessels_all_api(): """Get ALL vessel positions (minimal fields) for client-side clustering. Returns compact array: [mmsi, lat, lon, type_cat, heading, speed, course, name]""" try: rows = db.get_all_vessel_positions_minimal() except Exception as e: logger.error(f"Bulk vessels error: {e}") return jsonify({'success': False, 'error': 'Database error'}), 500 from marinetraffic_parser import classify_vessel_type cat_codes = config.TYPE_CAT_CODES # Aliases for categories not in TYPE_CAT_CODES cat_aliases = {'general': 'cargo', 'highspeed': 'other', 'pleasure': 'other', 'sailing': 'other', 'military': 'other'} # Compact array format: each vessel = [mmsi, lat, lon, type_cat_int, heading, speed, course, name] compact = [] for r in rows: tc = (r.get('type_category') or 'other').lower().strip() tc = cat_aliases.get(tc, tc) if tc not in cat_codes: tc = classify_vessel_type(tc, None) tc = cat_aliases.get(tc, tc) cat_int = cat_codes.get(tc, 9) # 9 = other compact.append([ r['mmsi'], round(r['lat'], 5) if r['lat'] is not None else None, round(r['lon'], 5) if r['lon'] is not None else None, cat_int, r.get('heading'), round(r['speed'], 1) if r.get('speed') is not None else None, r.get('course'), r.get('name') or '' ]) resp = make_response(jsonify({ 'success': True, 'count': len(compact), 'fields': ['mmsi', 'lat', 'lon', 'type_cat', 'heading', 'speed', 'course', 'name'], 'vessels': compact })) resp.headers['Cache-Control'] = 'public, max-age=120' return resp @app.route('/api/v1/port/resolve', methods=['GET']) @rate_limit(max_calls=30, period=60) def resolve_port_api(): """Resolve port name to coordinates""" name = request.args.get('name', '').strip() if not name: return jsonify({'success': False, 'error': 'Port name required'}), 400 port = mt.resolve_port(name) if not port: return jsonify({'success': False, 'error': f'Port not found: {name}'}), 404 return jsonify({'success': True, 'port': { 'name': port.get('name', name), 'country': port.get('country', ''), 'lat': port.get('lat'), 'lon': port.get('lon'), 'unlocode': port.get('unlocode', ''), }}) @app.route('/api/v1/ports/search', methods=['GET']) @rate_limit(max_calls=30, period=60) def search_ports_api(): """Search ports by name for autocomplete.""" q = request.args.get('q', '').strip().lower() if len(q) < 2: return jsonify({'success': True, 'ports': []}) limit = min(int(request.args.get('limit', 10)), 50) results = [] for key, port in mt.WORLD_PORTS.items(): if q in key or q in port.get('name', '').lower(): results.append({ 'key': key, 'name': port['name'], 'country': port.get('country', ''), 'unlocode': port.get('unlocode', ''), }) if len(results) >= limit: break return jsonify({'success': True, 'ports': results}) @app.route('/api/v1/route', methods=['GET']) @rate_limit(max_calls=20, period=60) def calculate_route_api(): """Calculate sea route between two ports""" from_port = request.args.get('from', '') to_port = request.args.get('to', '') vessel_type = request.args.get('vessel_type', 'default') dwt = request.args.get('dwt', type=float) if not from_port or not to_port: return jsonify({'success': False, 'error': 'Both "from" and "to" port parameters required'}), 400 route = mt.calculate_sea_route(from_port, to_port, vessel_type, dwt=dwt) if not route: return jsonify({'success': False, 'error': 'Could not calculate route. Check port names.'}), 404 return jsonify({'success': True, **route}) # ============================================================================= # FREIGHT RATE ENDPOINT # ============================================================================= @app.route('/api/v1/freight-rate', methods=['GET']) @rate_limit(max_calls=20, period=60) def freight_rate_api(): """Estimate freight rate for a route""" from_port = request.args.get('from', '') to_port = request.args.get('to', '') vessel_type = request.args.get('vessel_type', 'bulk') dwt = request.args.get('dwt', type=float) if not from_port or not to_port: return jsonify({'success': False, 'error': 'Both "from" and "to" required'}), 400 rate = mt.estimate_freight_rate(from_port, to_port, vessel_type, dwt=dwt) if not rate: return jsonify({'success': False, 'error': 'Could not estimate rate'}), 404 return jsonify({'success': True, **rate}) # ============================================================================= # SANCTIONS SCREENING ENDPOINT # ============================================================================= @app.route('/api/v1/sanctions/screen', methods=['GET']) @rate_limit(max_calls=20, period=60) def sanctions_screen_api(): """Screen vessel or company against sanctions lists""" vessel_name = request.args.get('vessel_name') imo = request.args.get('imo') flag = request.args.get('flag') company = request.args.get('company') companies = [company] if company else None result = mt.screen_sanctions(vessel_name=vessel_name, imo=imo, flag=flag, companies=companies) return jsonify({'success': True, **result}) # ============================================================================= # PORT CONGESTION ENDPOINT # ============================================================================= @app.route('/api/v1/port/congestion', methods=['GET']) @rate_limit(max_calls=20, period=60) def port_congestion_api(): """Get port congestion estimate""" port_name = request.args.get('port', '') if not port_name: return jsonify({'success': False, 'error': 'Port name required'}), 400 result = mt.estimate_port_congestion(port_name) if not result: return jsonify({'success': False, 'error': f"Port '{port_name}' not found"}), 404 return jsonify({'success': True, **result}) # ============================================================================= # BUNKER PRICE ENDPOINTS # ============================================================================= @app.route('/api/v1/bunker/prices', methods=['GET']) @rate_limit(max_calls=20, period=60) def bunker_prices_api(): """Get bunker prices at a port""" port_name = request.args.get('port', '') if not port_name: return jsonify({'success': False, 'error': 'Port name required'}), 400 result = mt.get_bunker_prices(port_name) if not result: return jsonify({'success': False, 'error': f"Port '{port_name}' not found"}), 404 return jsonify({'success': True, **result}) @app.route('/api/v1/bunker/optimize', methods=['GET']) @rate_limit(max_calls=20, period=60) def bunker_optimize_api(): """Optimize bunkering along a route""" from_port = request.args.get('from', '') to_port = request.args.get('to', '') vessel_type = request.args.get('vessel_type', 'bulk') dwt = request.args.get('dwt', type=float) if not from_port or not to_port: return jsonify({'success': False, 'error': 'Both "from" and "to" required'}), 400 result = mt.optimize_bunker_route(from_port, to_port, vessel_type, dwt=dwt) if not result: return jsonify({'success': False, 'error': 'Could not optimize bunker route'}), 404 return jsonify({'success': True, **result}) # ============================================================================= # CONTACT ENDPOINTS (temporarily free) # ============================================================================= @app.route('/api/v1/contacts/vessel/', methods=['GET']) @rate_limit(max_calls=20, period=60) @require_auth def get_vessel_contacts(mmsi: str): """ Get contacts for vessel (owner, operator, agent) Temporarily free — no charge. """ contacts = db.get_contacts_for_vessel(mmsi) return jsonify({ 'success': True, 'mmsi': mmsi, 'count': len(contacts), 'contacts': contacts }) @app.route('/api/v1/contacts/search', methods=['GET']) @require_auth @rate_limit(max_calls=20, period=60) def search_contacts(): """ Search contacts (owners, operators, agents, brokers) Temporarily free — full data for all users. """ query = request.args.get('q', '') contact_type = request.args.get('type', '') contacts = db.search_contacts( query=query if query else None, contact_type=contact_type if contact_type else None, limit=20 ) return jsonify({ 'success': True, 'paid_access': True, 'count': len(contacts), 'contacts': contacts }) # ============================================================================= # DEMURRAGE CALCULATOR # ============================================================================= @app.route('/api/v1/demurrage/calculate', methods=['POST']) @rate_limit(max_calls=20, period=60) def calculate_demurrage(): """ Calculate demurrage Formula: (actual_days - agreed_days) × daily_rate """ data = request.get_json(silent=True) or {} agreed_days = data.get('agreed_days', 0) actual_days = data.get('actual_days', 0) daily_rate = data.get('daily_rate', 0) currency = data.get('currency', 'USD') delay_days = max(0, actual_days - agreed_days) total = delay_days * daily_rate return jsonify({ 'success': True, 'calculation': { 'agreed_days': agreed_days, 'actual_days': actual_days, 'delay_days': delay_days, 'daily_rate': daily_rate, 'currency': currency, 'total_demurrage': total, 'formula': f"({actual_days} - {agreed_days}) × {daily_rate} = {total} {currency}" } }) # ============================================================================= # SUBSCRIPTION ENDPOINTS # ============================================================================= @app.route('/api/v1/subscription/plans', methods=['GET']) def get_subscription_plans(): """Return available subscription plans.""" return jsonify({ 'success': True, 'plans': db.SUBSCRIPTION_PLANS, }) @app.route('/api/v1/subscription/upgrade', methods=['POST']) @require_auth @rate_limit(max_calls=5, period=60) def upgrade_subscription(): """Upgrade user's subscription plan. Charges balance.""" data = request.get_json(silent=True) or {} plan_key = (data.get('plan') or '').lower().strip() lang = data.get('lang', 'en') if plan_key not in db.SUBSCRIPTION_PLANS: return jsonify({'success': False, 'error': f'Unknown plan: {plan_key}'}), 400 plan = db.SUBSCRIPTION_PLANS[plan_key] price = plan.get('price_monthly', 0) # Free plan — just set it if price == 0: db.set_user_plan(request.user['id'], plan_key) user = db.get_user_by_id(request.user['id']) return jsonify({'success': True, 'user': user_dict(user), 'message': 'Plan set to free'}) # Admin gets free upgrade if request.user.get('is_admin'): db.set_user_plan(request.user['id'], plan_key) user = db.get_user_by_id(request.user['id']) return jsonify({'success': True, 'user': user_dict(user), 'message': f'Admin upgraded to {plan_key}'}) # Charge from balance if not db.charge_user(request.user['id'], price): return jsonify({ 'success': False, 'error': f'Insufficient balance. Need ${price} USDT.', 'balance': request.user['balance'], 'required': price, }), 400 try: db.set_user_plan(request.user['id'], plan_key) db.add_service_charge(request.user['id'], 'subscription_upgrade', price, f'plan={plan_key}') user = db.get_user_by_id(request.user['id']) except Exception as e: # Refund on failure logger.error(f"Subscription upgrade error, refunding: {e}") db.update_user_balance(request.user['id'], price) return jsonify({'success': False, 'error': 'Upgrade failed. Balance refunded.'}), 500 return jsonify({ 'success': True, 'user': user_dict(user), 'charged': price, 'message': f'Upgraded to {plan_key} plan for ${price}/month', }) @app.route('/api/v1/subscription/status', methods=['GET']) @require_auth def subscription_status(): """Get current subscription status.""" plan_key = request.user.get('plan', 'free') plan_details = db.SUBSCRIPTION_PLANS.get(plan_key, db.SUBSCRIPTION_PLANS['free']) return jsonify({ 'success': True, 'plan': plan_key, 'plan_details': plan_details, 'expires_at': request.user.get('plan_expires_at'), }) # ============================================================================= # STATS & HEALTH # ============================================================================= @app.route('/api/v1/stats', methods=['GET']) @rate_limit(max_calls=30, period=60) def get_stats(): """Get database statistics""" stats = db.get_stats() stats['api_version'] = APP_VERSION stats['marinetraffic_api'] = mt.get_parser().has_api_key() return jsonify({ 'success': True, 'stats': stats }) @app.route('/health', methods=['GET']) def health(): """Public health check — minimal info only (HIGH-001 fix)""" try: db.get_stats() return jsonify({'status': 'ok', 'version': config.APP_VERSION}) except Exception: return jsonify({'status': 'degraded'}), 503 @app.route('/health/detail', methods=['GET']) @require_auth def health_detail(): """Detailed health — admin only""" if not request.user.get('is_admin'): return jsonify({'error': 'Admin access required'}), 403 result = { 'status': 'ok', 'service': 'SeaFare API', 'version': APP_VERSION, 'timestamp': datetime.utcnow().isoformat() } try: stats = db.get_stats() result['db'] = 'ok' result['users'] = stats.get('users', 0) except Exception as e: result['db'] = f'error: {type(e).__name__}: {e}' result['status'] = 'degraded' result['db_mode'] = 'postgres' if db.USE_POSTGRES else 'sqlite' if os.environ.get('DATABASE_URL') and not db.USE_POSTGRES: result['db_mode'] = 'sqlite (pg fallback)' if db.PG_FALLBACK_ERROR: result['pg_error'] = db.PG_FALLBACK_ERROR try: from ais_provider import get_provider p = get_provider() result['ais'] = p.get_status() except Exception: result['ais'] = 'unavailable' return jsonify(result) @app.route('/api/v1/admin/ais-test') def ais_test(): """Diagnostic: test AISStream sync fallback + raw WebSocket. Admin only.""" user = get_current_user() if not user or not user.get('is_admin'): return jsonify({'error': 'admin access required'}), 403 mode = request.args.get('mode', 'raw') # raw | fallback result = {'mode': mode} try: from ais_provider import get_provider p = get_provider() if mode == 'fallback' and p.aisstream: # Test the sync fallback path used for real queries bbox = [[35, 10], [42, 25]] # Eastern Mediterranean result['step'] = 'calling query_area_sync (6s)...' vessels = p.aisstream.query_area_sync(bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1], max_seconds=6) result['vessels_found'] = len(vessels) result['positions_total'] = p.aisstream._positions_received if vessels: result['sample'] = { 'mmsi': vessels[0].get('mmsi'), 'lat': vessels[0].get('latitude'), 'lon': vessels[0].get('longitude'), } result['success'] = len(vessels) > 0 else: # Raw websocket test import websocket as ws_lib api_key = os.environ.get('AISSTREAM_API_KEY', '') result['key_set'] = bool(api_key) result['key_length'] = len(api_key) ws = ws_lib.create_connection( 'wss://stream.aisstream.io/v0/stream', timeout=15) try: sub = { "APIKey": api_key, "BoundingBoxes": [[[35, 10], [40, 20]]], "FilterMessageTypes": ["PositionReport"] } ws.send(json.dumps(sub)) ws.settimeout(10) raw = ws.recv() data = json.loads(raw) if 'error' in data: result['step'] = f"API error: {data['error']}" result['success'] = False else: msg_type = data.get('MessageType', '?') mmsi = data.get('MetaData', {}).get('MMSI', '?') result['step'] = f"received: {msg_type} MMSI={mmsi}" result['success'] = True finally: ws.close() except Exception as e: result['error'] = f"{type(e).__name__}: {e}" result['success'] = False return jsonify(result) # ============================================================================= # ADMIN MANAGEMENT # ============================================================================= @app.route('/api/v1/admin/grant', methods=['POST']) @rate_limit(max_calls=10, period=60) @require_auth def grant_admin(): """Grant admin to a user by email (only existing admins can do this)""" if not request.user.get('is_admin'): return jsonify({'success': False, 'error': 'Admin access required'}), 403 data = request.get_json() email = (data.get('email') or '').strip().lower() if not email: return jsonify({'success': False, 'error': 'Email required'}), 400 target = db.get_user_by_email(email) if not target: return jsonify({'success': False, 'error': 'User not found'}), 404 db.set_user_admin(target['id'], True) return jsonify({'success': True, 'email': email, 'is_admin': True}) @app.route('/api/v1/admin/revoke', methods=['POST']) @rate_limit(max_calls=10, period=60) @require_auth def revoke_admin(): """Revoke admin from a user by email (only existing admins can do this)""" if not request.user.get('is_admin'): return jsonify({'success': False, 'error': 'Admin access required'}), 403 data = request.get_json() email = (data.get('email') or '').strip().lower() if not email: return jsonify({'success': False, 'error': 'Email required'}), 400 target = db.get_user_by_email(email) if not target: return jsonify({'success': False, 'error': 'User not found'}), 404 db.set_user_admin(target['id'], False) return jsonify({'success': True, 'email': email, 'is_admin': False}) @app.route('/api/v1/purchased-contacts', methods=['GET']) @require_auth def get_purchased_contacts(): """Get user's purchased (unlocked) contacts.""" contacts = db.get_purchased_contacts(request.user['id']) return jsonify({'success': True, 'contacts': contacts}) @app.route('/api/v1/admin/revenue', methods=['GET']) @require_auth def admin_revenue(): """Platform revenue dashboard — service charges, deposits, balances.""" if not request.user.get('is_admin'): return jsonify({'success': False, 'error': 'Admin access required'}), 403 stats = db.get_revenue_stats() return jsonify({'success': True, **stats}) @app.route('/api/v1/admin/costs', methods=['GET']) @require_auth @rate_limit(max_calls=10, period=60) def admin_costs(): """AI API cost dashboard — token usage, cache efficiency, daily trends.""" if not request.user.get('is_admin'): return jsonify({'success': False, 'error': 'Admin access required'}), 403 try: days = request.args.get('days', 30, type=int) stats = db.get_cost_stats(days=min(max(days, 1), 90)) return jsonify({'success': True, **stats}) except Exception as e: logger.error(f"admin_costs error: {e}") return jsonify({'success': False, 'error': f'Failed to load cost stats: {e}'}), 500 # ============================================================================= # KEEP-ALIVE (prevents Render free tier from sleeping after 15 min) # ============================================================================= def _start_keep_alive(): """Background thread that pings /health every 10 minutes. Only runs on Render (RENDER env var is set automatically by Render). Keeps the free-tier instance from sleeping (spins down after 15 min inactivity). Single-instance: only one keep-alive thread per process.""" if os.environ.get('_KEEPALIVE_STARTED'): return os.environ['_KEEPALIVE_STARTED'] = '1' import threading import urllib.request render_url = os.environ.get(config.ENV_RENDER_EXTERNAL_URL) if not render_url: return # Not on Render — skip health_url = f"{render_url}/health" def ping_loop(): import time while True: time.sleep(10 * 60) # 10 minutes (well under 15-min timeout) try: resp = urllib.request.urlopen(health_url, timeout=15) body = resp.read().decode('utf-8', errors='replace') if 'degraded' in body: logger.warning("Keep-alive: health check returned degraded status") except Exception as e: logger.warning(f"Keep-alive ping failed: {e}") t = threading.Thread(target=ping_loop, daemon=True) t.start() logger.info(f"Keep-alive started: pinging {health_url} every 10 min") _start_keep_alive() # ============================================================================= # ============================================================================= # VESSEL COLLECTOR — background Med/Baltic/Caspian data enrichment # ============================================================================= def _start_vessel_collector(): """Start background vessel data collector.""" try: from vessel_collector import start_collector start_collector() except ImportError: logger.info("Vessel collector: module not found — skipped") except Exception as e: logger.error(f"Vessel collector error: {e}") _start_vessel_collector() # ============================================================================= # AIS PROVIDER (real-time vessel tracking) # ============================================================================= def _start_ais_provider(): """Disabled — positions come from mt_position_updater service (MarineTraffic tiles). AISStream/AISHub/Digitraffic removed to prevent DB bloat and reconnection storms. """ pass # _start_ais_provider() # DISABLED: DB-only positions via mt_position_updater def _start_position_cleanup(): """Periodic cleanup of old AIS positions.""" import threading as _threading def cleanup_loop(): while True: _time_mod.sleep(config.POSITION_CLEANUP_INTERVAL_HOURS * 3600) try: deleted = db.cleanup_old_positions(keep_hours=config.POSITION_KEEP_HOURS) if deleted: logger.info(f"Position cleanup: removed {deleted} old records") except Exception as e: logger.error(f"Position cleanup error: {e}") t = _threading.Thread(target=cleanup_loop, daemon=True, name="pos-cleanup") t.start() _start_position_cleanup() # ============================================================================= # GRACEFUL SHUTDOWN HANDLER # ============================================================================= _shutdown_in_progress = False def _graceful_shutdown(signum, frame): """Handle SIGTERM/SIGINT: close DB pool, log, and exit cleanly.""" global _shutdown_in_progress if _shutdown_in_progress: return # Prevent double-shutdown _shutdown_in_progress = True sig_name = signal.Signals(signum).name if hasattr(signal, 'Signals') else str(signum) logger.info(f"Shutting down gracefully... (received {sig_name})") # 1. Close database connection pool try: db.close_pool() except Exception as e: logger.error(f"Error closing DB pool during shutdown: {e}") # 2. Stop AIS provider if possible try: from ais_provider import stop_ais_provider stop_ais_provider() logger.info("AIS provider stopped") except (ImportError, AttributeError): pass # stop_ais_provider may not exist except Exception as e: logger.error(f"Error stopping AIS provider: {e}") logger.info("Shutdown complete") sys.exit(0) # Register signal handlers # SIGTERM: sent by Render/Docker/systemd on deploy or scale-down # SIGINT: sent by Ctrl+C in local dev signal.signal(signal.SIGTERM, _graceful_shutdown) signal.signal(signal.SIGINT, _graceful_shutdown) # ============================================================================= # MAIN # ============================================================================= @app.errorhandler(404) def not_found(e): return jsonify({'success': False, 'error': 'Not found'}), 404 @app.errorhandler(405) def method_not_allowed(e): return jsonify({'success': False, 'error': 'Method not allowed'}), 405 # ============================================================================= if __name__ == '__main__': port = int(os.environ.get(config.ENV_SEAFARE_PORT, config.DEFAULT_PORT)) logger.info(f"SeaFare Montana API running on http://localhost:{port}") logger.info("Open in browser to chat!") app.run(host='0.0.0.0', port=port, debug=os.environ.get(config.ENV_FLASK_DEBUG, '').lower() == 'true')