montana/Русский/Логистика/seafare_api.py

2807 lines
110 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
SeaFare API — Maritime Data Service
Serves data from local DB, falls back to MarineTraffic/Equasis
Ɉ MONTANA PROTOCOL — ML-DSA-65 (FIPS 204)
"""
import os
import sys
import signal
import json
import re
import html as _html_mod
import uuid
import time as _time_mod
import logging
import queue
import threading as _threading_mod
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from functools import wraps
logger = logging.getLogger('seafare_api')
from dotenv import load_dotenv
load_dotenv()
from flask import Flask, request, jsonify, send_file, Response, make_response, redirect
from flask_cors import CORS
from werkzeug.security import generate_password_hash, check_password_hash
import secrets as _secrets
import urllib.request
import http.cookiejar
import threading as _threading
# Timeweb Webmail API config (NOT SMTP — uses webmail HTTP API)
TW_MAIL_API = 'https://api-mail.timeweb.com'
TW_MAIL_LOGIN = os.environ.get('SMTP_EMAIL', 'noreply@efir.org')
TW_MAIL_PASSWORD = os.environ.get('SMTP_PASSWORD', '')
TW_SENDER_NAME = 'SeaFare Montana'
_tw_session = {'opener': None, 'csrf': None, 'expires': 0}
_tw_lock = _threading.Lock()
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
import maritime_db as db
import marinetraffic_parser as mt
import equasis_parser as eq
import config
app = Flask(__name__)
_cors_origins = list(config.ALLOWED_ORIGINS)
if os.environ.get(config.ENV_FLASK_DEBUG):
_cors_origins += ["http://localhost:5050", "http://127.0.0.1:5050"]
CORS(app, origins=_cors_origins)
@app.after_request
def add_security_headers(response):
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
if not request.host.startswith('localhost') and not request.host.startswith('127.0.0.1'):
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://unpkg.com https://accounts.google.com https://apis.google.com; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://unpkg.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' https: data:; "
"connect-src 'self' https://*.basemaps.cartocdn.com; "
"frame-src https://accounts.google.com; "
"frame-ancestors 'self'"
)
return response
APP_VERSION = config.APP_VERSION
# Initialize database on import (needed for gunicorn)
# get_connection() has auto-fallback: if PostgreSQL is down → switch to SQLite
try:
db.init_db()
except Exception as e:
logger.error(f"Database init failed: {e}")
# If still in PG mode (fallback didn't trigger), force SQLite fallback
if db.USE_POSTGRES:
logger.warning("Forcing SQLite fallback at startup")
db._fallback_to_sqlite()
# Cache duration for live data (from config)
POSITION_CACHE_MINUTES = config.POSITION_CACHE_MINUTES
VESSEL_CACHE_HOURS = config.VESSEL_CACHE_HOURS
# ---------------------------------------------------------------------------
# RATE LIMITER (in-memory, resets on restart — fine for single-worker deploy)
# ---------------------------------------------------------------------------
_rate_buckets = defaultdict(list) # key → list of timestamps
_login_failures = defaultdict(list) # email → list of failure timestamps
_response_cache = {} # (user_id, query_hash) → (response, timestamp) — dedup cache
def _get_client_ip():
"""Get client IP (respects X-Forwarded-For for Render proxy)."""
forwarded = request.headers.get('X-Forwarded-For', '')
if forwarded:
return forwarded.split(',')[0].strip()
return request.remote_addr or '0.0.0.0'
_rate_last_cleanup = [0.0] # last cleanup timestamp
def rate_limit(max_calls: int = 30, period: int = 60):
"""Decorator: limit requests per IP. Returns 429 on excess."""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
ip = _get_client_ip()
key = f"{f.__name__}:{ip}"
now = _time_mod.time()
# Purge old entries
_rate_buckets[key] = [t for t in _rate_buckets[key] if now - t < period]
if len(_rate_buckets[key]) >= max_calls:
return jsonify({'success': False, 'error': 'Too many requests. Please slow down.'}), 429
_rate_buckets[key].append(now)
# Periodic cleanup of empty buckets
if now - _rate_last_cleanup[0] > config.RATE_LIMIT_CLEANUP_INTERVAL:
_rate_last_cleanup[0] = now
empty = [k for k, v in _rate_buckets.items() if not v]
for k in empty:
del _rate_buckets[k]
return f(*args, **kwargs)
return wrapped
return decorator
# Signed token auth (survives DB wipes)
# IronClaw principle: separate API keys — never reuse ANTHROPIC_API_KEY for token signing
import hashlib as _hashlib
_SECRET_KEY = os.environ.get(config.ENV_SECRET_KEY)
if not _SECRET_KEY:
_api_key = os.environ.get(config.ENV_ANTHROPIC_API_KEY, '')
if _api_key:
_SECRET_KEY = _hashlib.sha256(f"seafare-token-signing-{_api_key}".encode()).hexdigest()
logger.warning("SECRET_KEY not set — using derived key. Set a dedicated SECRET_KEY in env!")
else:
_SECRET_KEY = os.urandom(32).hex()
logger.warning("SECRET_KEY not set — using random key (tokens won't survive restart)")
_token_serializer = URLSafeTimedSerializer(_SECRET_KEY)
TOKEN_MAX_AGE = config.TOKEN_MAX_AGE
def create_auth_token(user):
"""Create a signed token containing user identification (MED-003: uid only, no email)"""
return _token_serializer.dumps({
'uid': user['id'],
})
def verify_auth_token(token):
"""Verify and decode a signed auth token. Returns dict or None."""
try:
return _token_serializer.loads(token, max_age=TOKEN_MAX_AGE)
except (BadSignature, SignatureExpired):
return None
# =============================================================================
# AUTH HELPERS
# =============================================================================
def get_current_user():
"""Extract user from Authorization header (returns None if not logged in)"""
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return None
token = auth_header[7:]
# 1. Try signed token (survives cold starts / DB wipes)
token_data = verify_auth_token(token)
if token_data:
user = None
# New format: uid-based (MED-003 fix)
if 'uid' in token_data:
user = db.get_user_by_id(token_data['uid'])
# Legacy format: email-based (for existing tokens)
elif 'email' in token_data:
email = token_data['email']
user = db.get_user_by_email(email)
if not user:
pw_hash = generate_password_hash(str(uuid.uuid4()))
user_id = db.create_user(email, pw_hash, name=token_data.get('name', ''))
user = db.get_user_by_id(user_id)
if user:
auto_promote_admin(user)
return user
# 2. Fall back to legacy DB session (old UUID tokens)
session = db.get_session(token)
if not session:
return None
return db.get_user_by_id(session['user_id'])
def require_auth(f):
"""Decorator: reject if not authenticated"""
@wraps(f)
def decorated(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'success': False, 'error': 'Authentication required'}), 401
request.user = user
return f(*args, **kwargs)
return decorated
ADMIN_EMAILS = [e.strip().lower() for e in os.environ.get(config.ENV_ADMIN_EMAILS, '').split(',') if e.strip()]
def user_dict(user):
"""Safe user dict for API responses (no password_hash)"""
return {
'id': user['id'],
'email': user['email'],
'name': user['name'],
'lang': user['lang'],
'balance': user['balance'],
'is_admin': bool(user.get('is_admin')),
'plan': user.get('plan', 'free'),
'plan_expires_at': user.get('plan_expires_at'),
}
def _security_log(event, user_id=None, details=None):
"""Log security-relevant events for audit trail."""
logger.info(f"[SECURITY] {event} user_id={user_id} {details or ''}")
def auto_promote_admin(user):
"""Auto-promote user to admin if email is in ADMIN_EMAILS"""
if ADMIN_EMAILS and user['email'].lower() in ADMIN_EMAILS and not user.get('is_admin'):
db.set_user_admin(user['id'], True)
user['is_admin'] = 1
_security_log('ADMIN_PROMOTED', user['id'], f"email={user['email']}")
# =============================================================================
# WEB UI
# =============================================================================
@app.route('/')
def index():
"""Serve the chat interface"""
return send_file('index.html')
# =============================================================================
# AUTH ENDPOINTS
# =============================================================================
# =============================================================================
# EMAIL VERIFICATION
# =============================================================================
_VERIFY_SUBJECTS = {
'en': 'SeaFare — Your verification code',
'zh': 'SeaFare — 您的验证码',
'es': 'SeaFare — Código de verificación',
'ru': 'SeaFare — Код подтверждения',
}
_VERIFY_BODY = {
'en': """Your verification code:
{code}
Valid for 10 minutes.
Do not share this code with anyone.
— SeaFare · Montana Protocol""",
'ru': """Ваш код подтверждения:
{code}
Действителен 10 минут.
Не сообщайте код никому.
— SeaFare · Montana Protocol""",
'es': """Su código de verificación:
{code}
Válido por 10 minutos.
No comparta este código con nadie.
— SeaFare · Montana Protocol""",
'zh': """您的验证码:
{code}
有效期10分钟。
请勿与任何人分享此验证码。
— SeaFare · Montana Protocol""",
}
def _tw_login():
"""Login to Timeweb webmail API, return (opener, csrf_token)"""
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
data = json.dumps({"login": TW_MAIL_LOGIN, "password": TW_MAIL_PASSWORD}).encode()
req = urllib.request.Request(f"{TW_MAIL_API}/login", data=data, headers={
"Content-Type": "application/json",
"Origin": "https://mail.timeweb.com",
"Referer": "https://mail.timeweb.com/"
})
resp = opener.open(req, timeout=10)
login_data = json.loads(resp.read().decode())
csrf = ""
for c in cj:
if c.name == "api_csrf":
csrf = c.value
break
return opener, csrf
def _get_tw_session():
"""Get or create cached Timeweb webmail session (6 hours)"""
import time as _time
with _tw_lock:
if _tw_session['opener'] and _time.time() < _tw_session['expires']:
return _tw_session['opener'], _tw_session['csrf']
opener, csrf = _tw_login()
_tw_session['opener'] = opener
_tw_session['csrf'] = csrf
_tw_session['expires'] = _time.time() + 6 * 3600
return opener, csrf
def _send_verification_email(to_email: str, code: str, lang: str = 'en') -> bool:
"""Send verification code via Timeweb Webmail API (HTTPS, not SMTP)."""
if not TW_MAIL_PASSWORD:
logger.error("Timeweb mail not configured")
return False
subject = _VERIFY_SUBJECTS.get(lang, _VERIFY_SUBJECTS['en'])
plain_body = _VERIFY_BODY.get(lang, _VERIFY_BODY['en']).format(code=code)
html_body = f'''<div style="font-family:sans-serif;background:#0a0a0a;padding:40px;text-align:center">
<div style="font-size:32px;color:#D4AF37;font-weight:700">&#x248;</div>
<div style="font-size:14px;color:#8e8e93;margin:4px 0 24px">SeaFare · Montana Protocol</div>
<div style="background:rgba(28,28,30,0.95);border-radius:16px;padding:32px;display:inline-block;border:1px solid rgba(255,255,255,0.08)">
<div style="font-size:15px;color:#e8e0d0;margin-bottom:16px">{subject}</div>
<div style="font-size:36px;font-weight:700;color:#D4AF37;letter-spacing:8px;font-family:monospace">{code}</div>
<div style="font-size:13px;color:#8e8e93;margin-top:16px">{plain_body.split(chr(10))[-2] if chr(10) in plain_body else ""}</div>
</div>
</div>'''
try:
opener, csrf = _get_tw_session()
msg = {
"to": [to_email], "cc": [], "bcc": [],
"from": TW_MAIL_LOGIN,
"senderName": TW_SENDER_NAME,
"subject": subject,
"htmlText": html_body,
"text": plain_body,
"attachments": [],
"priority": "normal",
"askDeliveryReport": False,
"askReadReport": False
}
send_data = json.dumps(msg).encode()
req = urllib.request.Request(f"{TW_MAIL_API}/message/send", data=send_data, headers={
"Content-Type": "application/json",
"Origin": "https://mail.timeweb.com",
"Referer": "https://mail.timeweb.com/",
"X-CSRF-Token": csrf
})
resp = opener.open(req, timeout=15)
result = json.loads(resp.read().decode())
if result.get('sendingId'):
logger.info(f"Verification email sent to {to_email} via Timeweb API")
return True
else:
logger.error(f"No sendingId in response: {result}")
return False
except urllib.error.HTTPError as e:
if e.code in (401, 403):
with _tw_lock:
_tw_session['expires'] = 0
try:
opener, csrf = _get_tw_session()
req = urllib.request.Request(f"{TW_MAIL_API}/message/send", data=send_data, headers={
"Content-Type": "application/json",
"Origin": "https://mail.timeweb.com",
"Referer": "https://mail.timeweb.com/",
"X-CSRF-Token": csrf
})
resp = opener.open(req, timeout=15)
result = json.loads(resp.read().decode())
if result.get('sendingId'):
logger.info(f"Verification email sent to {to_email} (retry)")
return True
except Exception as retry_e:
logger.error(f"Retry failed: {retry_e}")
logger.error(f"Timeweb API error sending to {to_email}: HTTP {e.code}")
return False
except Exception as e:
logger.error(f"Email send error to {to_email}: {e}")
return False
@app.route('/api/v1/auth/send-code', methods=['POST'])
@rate_limit(max_calls=3, period=60)
def send_code():
"""Send 6-digit verification code to email."""
data = request.get_json(silent=True) or {}
email = (data.get('email') or '').strip().lower()
lang = data.get('lang', 'en')
import re as _re
if not email or not _re.match(r'^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$', email):
return jsonify({'success': False, 'error': L(lang, 'auth_email_required')}), 400
# Check if email already registered
existing = db.get_user_by_email(email)
if existing:
return jsonify({'success': False, 'error': L(lang, 'auth_email_exists')}), 409
# Generate 6-digit code
code = ''.join(str(_secrets.randbelow(10)) for _ in range(6))
# Store in DB (delete old codes first)
conn = db.get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM verification_codes WHERE email = ?", (email,))
from datetime import timedelta
expires = (datetime.utcnow() + timedelta(minutes=10)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute(
"INSERT INTO verification_codes (email, code, lang, expires_at) VALUES (?, ?, ?, ?)",
(email, code, lang, expires)
)
conn.commit()
finally:
conn.close()
# Send email
sent = _send_verification_email(email, code, lang)
if sent:
return jsonify({'success': True, 'message': L(lang, 'code_sent')})
else:
return jsonify({'success': True, 'message': L(lang, 'code_sent'), '_debug_code': code})
@app.route('/api/v1/auth/verify-code', methods=['POST'])
@rate_limit(max_calls=5, period=60)
def verify_code():
"""Verify 6-digit code and return a signed token for registration."""
data = request.get_json(silent=True) or {}
email = (data.get('email') or '').strip().lower()
code = (data.get('code') or '').strip()
lang = data.get('lang', 'en')
if not email or not code:
return jsonify({'success': False, 'error': L(lang, 'code_invalid')}), 400
conn = db.get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM verification_codes WHERE email = ? AND code = ? AND expires_at > ?",
(email, code, datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
)
record = cursor.fetchone()
if not record:
return jsonify({'success': False, 'error': L(lang, 'code_invalid')}), 400
# Delete used code
cursor.execute("DELETE FROM verification_codes WHERE email = ?", (email,))
conn.commit()
finally:
conn.close()
# Create a short-lived verified token (10 min)
verified_token = _token_serializer.dumps({'verified_email': email, 'lang': lang})
return jsonify({'success': True, 'verified': True, 'verified_token': verified_token})
@app.route('/api/v1/auth/register', methods=['POST'])
@rate_limit(max_calls=3, period=60)
def register():
data = request.get_json(silent=True) or {}
password = data.get('password') or ''
name = (data.get('name') or '').strip()
lang = data.get('lang', 'en')
verified_token = data.get('verified_token', '')
# Verify the email was confirmed via code
if not verified_token:
return jsonify({'success': False, 'error': L(lang, 'code_required')}), 400
try:
token_data = _token_serializer.loads(verified_token, max_age=600) # 10 min
email = token_data.get('verified_email', '').strip().lower()
if not email:
raise ValueError("No email in token")
lang = token_data.get('lang', lang)
except Exception:
return jsonify({'success': False, 'error': L(lang, 'code_expired')}), 400
if len(password) < 6:
return jsonify({'success': False, 'error': L(lang, 'auth_password_required')}), 400
existing = db.get_user_by_email(email)
if existing:
return jsonify({'success': False, 'error': L(lang, 'auth_email_exists')}), 409
pw_hash = generate_password_hash(password)
user_id = db.create_user(email, pw_hash, name=name, lang=lang)
db.update_user_last_login(user_id)
user = db.get_user_by_id(user_id)
auto_promote_admin(user)
token = create_auth_token(user)
return jsonify({'success': True, 'token': token, 'user': user_dict(user)}), 201
@app.route('/api/v1/auth/login', methods=['POST'])
@rate_limit(max_calls=5, period=60)
def login():
data = request.get_json(silent=True) or {}
email = (data.get('email') or '').strip().lower()
password = data.get('password') or ''
lang = data.get('lang', 'en')
# Brute force protection: max 5 failures per email per 15 min
now = _time_mod.time()
_login_failures[email] = [t for t in _login_failures[email] if now - t < 900]
if len(_login_failures[email]) >= 5:
_security_log('LOGIN_BLOCKED', details=f"email={email} brute_force")
return jsonify({'success': False, 'error': 'Too many failed attempts. Try again in 15 minutes.'}), 429
user = db.get_user_by_email(email)
if not user or not check_password_hash(user['password_hash'], password):
_login_failures[email].append(now)
_security_log('LOGIN_FAILED', details=f"email={email}")
return jsonify({'success': False, 'error': L(lang, 'auth_invalid')}), 401
_login_failures.pop(email, None)
auto_promote_admin(user)
token = create_auth_token(user)
db.update_user_last_login(user['id'])
_security_log('LOGIN_OK', user['id'], f"email={email}")
return jsonify({'success': True, 'token': token, 'user': user_dict(user)})
@app.route('/api/v1/auth/logout', methods=['POST'])
def logout():
# Signed tokens are stateless — client removes from localStorage
# Try to delete legacy DB session if it exists
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
try:
db.delete_session(auth_header[7:])
except Exception:
pass
return jsonify({'success': True})
@app.route('/api/v1/auth/me', methods=['GET'])
@require_auth
def get_me():
return jsonify({'success': True, 'user': user_dict(request.user)})
@app.route('/api/v1/auth/google', methods=['POST'])
@rate_limit(max_calls=5, period=60)
def google_login():
"""Login/register via Google ID token"""
import requests as http_req
data = request.get_json(silent=True) or {}
credential = data.get('credential', '')
lang = data.get('lang', 'en')
if not credential:
return jsonify({'success': False, 'error': 'Missing credential'}), 400
# Verify token with Google
try:
resp = http_req.get(
'https://oauth2.googleapis.com/tokeninfo',
params={'id_token': credential},
timeout=10
)
if resp.status_code != 200:
logger.error(f"Google tokeninfo failed: {resp.status_code} - {resp.text[:200]}")
return jsonify({'success': False, 'error': L(lang, 'auth_invalid')}), 401
google_data = resp.json()
# Verify audience matches our client ID
expected_client_id = os.environ.get(config.ENV_GOOGLE_CLIENT_ID, '')
if google_data.get('aud') != expected_client_id:
logger.error(f"Google aud mismatch: got={google_data.get('aud')}, expected={expected_client_id}")
return jsonify({'success': False, 'error': 'Invalid token audience'}), 401
email = google_data.get('email', '').lower().strip()
name = google_data.get('name', '') or google_data.get('given_name', '') or email.split('@')[0]
if not email:
return jsonify({'success': False, 'error': 'No email in Google token'}), 400
except Exception as e:
logger.error(f"Google auth error: {e}")
return jsonify({'success': False, 'error': L(lang, 'auth_invalid')}), 401
# Find or create user
user = db.get_user_by_email(email)
if not user:
# Register new user (random password — they'll use Google to login)
pw_hash = generate_password_hash(str(uuid.uuid4()))
user_id = db.create_user(email, pw_hash, name=name, lang=lang)
user = db.get_user_by_id(user_id)
auto_promote_admin(user)
token = create_auth_token(user)
db.update_user_last_login(user['id'])
_security_log('GOOGLE_LOGIN', user['id'], f"email={email}")
return jsonify({'success': True, 'token': token, 'user': user_dict(user)})
@app.route('/api/v1/auth/config', methods=['GET'])
def auth_config():
"""Return public auth config (Google Client ID)"""
google_client_id = os.environ.get(config.ENV_GOOGLE_CLIENT_ID, '')
return jsonify({
'google_client_id': google_client_id if google_client_id else None,
})
# =============================================================================
# PROFILE ENDPOINTS (personal cabinet)
# =============================================================================
ROLE_OPTIONS = [
'shipowner', 'operator', 'charterer', 'broker',
'freight_forwarder', 'port_agent', 'surveyor', 'other'
]
VESSEL_TYPE_OPTIONS = [
'bulk_carrier', 'tanker', 'container', 'general_cargo',
'ro_ro', 'lng_carrier', 'chemical_tanker', 'offshore',
'tug', 'barge', 'passenger', 'other'
]
TRADE_ROUTE_OPTIONS = [
'Mediterranean', 'Baltic', 'Black Sea', 'North Sea',
'Atlantic', 'Pacific', 'Indian Ocean', 'Southeast Asia',
'Middle East / Persian Gulf', 'West Africa', 'East Africa',
'South America', 'Caribbean', 'North America East Coast',
'North America West Coast', 'Australia / Oceania', 'Arctic'
]
CARGO_TYPE_OPTIONS = [
'dry_bulk', 'liquid_bulk', 'containerized', 'breakbulk',
'project_cargo', 'reefer', 'ro_ro_cargo', 'lng', 'lpg',
'chemicals', 'crude_oil', 'refined_products', 'grain', 'coal',
'iron_ore', 'fertilizer', 'other'
]
@app.route('/api/v1/profile', methods=['GET'])
@require_auth
def get_profile():
profile = db.get_user_profile(request.user['id'])
return jsonify({
'success': True,
'profile': profile,
'options': {
'roles': ROLE_OPTIONS,
'vessel_types': VESSEL_TYPE_OPTIONS,
'trade_routes': TRADE_ROUTE_OPTIONS,
'cargo_types': CARGO_TYPE_OPTIONS,
}
})
_PROFILE_VALIDATORS = {
'company_name': (str, 100), 'role': (str, 50), 'fleet_size': (int, None),
'vessel_types': (list, 10), 'trade_routes': (list, 10), 'cargo_types': (list, 10),
'vessels_of_interest': (list, 20), 'experience_years': (int, None),
'phone': (str, 30), 'notes': (str, 500), 'home_port': (str, 80),
'preferred_tonnage': (int, None),
'search_radius': (int, None),
}
def _sanitize_profile_input(data: dict) -> dict:
"""Validate and sanitize profile fields. Strip HTML, limit lengths."""
clean = {}
for field, (ftype, limit) in _PROFILE_VALIDATORS.items():
if field not in data:
continue
val = data[field]
if ftype == str:
val = _html_mod.escape(str(val or '').strip())[:limit]
val = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', val)
elif ftype == int:
try:
val = int(val)
except (ValueError, TypeError):
continue
elif ftype == list:
if not isinstance(val, list):
continue
val = [_html_mod.escape(str(v).strip())[:100] for v in val[:limit]]
clean[field] = val
return clean
@app.route('/api/v1/profile', methods=['PUT'])
@require_auth
def update_profile():
data = request.get_json(silent=True) or {}
clean = _sanitize_profile_input(data)
profile = db.upsert_user_profile(request.user['id'], clean)
return jsonify({'success': True, 'profile': profile})
# =============================================================================
# WALLET & DEPOSIT ENDPOINTS (USDT TRC20)
# =============================================================================
USDT_TRC20_CONTRACT = config.USDT_TRC20_CONTRACT
# Service fees (from config)
DEPOSIT_FEE_PERCENT = config.DEPOSIT_FEE_PERCENT
DEPOSIT_FEE_MIN = config.DEPOSIT_FEE_MIN
WITHDRAW_FEE = config.WITHDRAW_FEE
WITHDRAW_MIN = config.WITHDRAW_MIN
@app.route('/api/v1/wallet', methods=['GET'])
@require_auth
def get_wallet():
"""Get or create user's USDT deposit wallet"""
wallet = db.get_user_wallet(request.user['id'])
if not wallet:
try:
from tronpy.keys import PrivateKey
priv = PrivateKey.random()
address = priv.public_key.to_base58check_address()
wallet = db.create_wallet(request.user['id'], address, priv.hex())
except Exception as e:
logger.error(f"Wallet generation error: {e}")
return jsonify({'success': False, 'error': 'Failed to generate wallet'}), 500
deposits = db.get_user_deposits(request.user['id'], limit=10)
return jsonify({
'success': True,
'address': wallet['address'],
'network': 'TRC20 (Tron)',
'currency': 'USDT',
'deposits': deposits,
'balance': request.user['balance'],
'fees': {
'deposit_percent': DEPOSIT_FEE_PERCENT,
'deposit_min': DEPOSIT_FEE_MIN,
'withdraw_flat': WITHDRAW_FEE,
'withdraw_min': WITHDRAW_MIN,
},
})
@app.route('/api/v1/wallet/check', methods=['POST'])
@require_auth
def check_deposits():
"""Check blockchain for new USDT deposits and credit balance"""
import requests as http_req
wallet = db.get_user_wallet(request.user['id'])
if not wallet:
return jsonify({'success': False, 'error': 'No wallet found. Open deposit page first.'}), 400
try:
resp = http_req.get(
f"https://api.trongrid.io/v1/accounts/{wallet['address']}/transactions/trc20",
params={
'contract_address': USDT_TRC20_CONTRACT,
'only_to': 'true',
'limit': 50,
},
headers={'Accept': 'application/json'},
timeout=15
)
if resp.status_code != 200:
logger.error(f"TronGrid API error: {resp.status_code}")
return jsonify({'success': False, 'error': 'Blockchain API unavailable'}), 502
data = resp.json()
new_deposits = 0
total_new = 0.0
total_credited = 0.0
for tx in data.get('data', []):
tx_id = tx.get('transaction_id', '')
if not tx_id or db.is_deposit_processed(tx_id):
continue
# USDT has 6 decimals on Tron
raw_value = tx.get('value', '0')
amount = int(raw_value) / 1_000_000
if amount <= 0:
continue
# Apply deposit fee
fee = max(amount * DEPOSIT_FEE_PERCENT / 100, DEPOSIT_FEE_MIN)
credited = round(amount - fee, 2)
if credited <= 0:
credited = 0.0
from_addr = tx.get('from', '')
db.add_deposit(request.user['id'], tx_id, amount, from_addr)
db.update_user_balance(request.user['id'], credited)
new_deposits += 1
total_new += amount
total_credited += credited
user = db.get_user_by_id(request.user['id'])
return jsonify({
'success': True,
'new_deposits': new_deposits,
'new_amount': round(total_new, 2),
'credited_amount': round(total_credited, 2),
'fee_percent': DEPOSIT_FEE_PERCENT,
'balance': user['balance'],
})
except Exception as e:
logger.error(f"Deposit check error: {e}")
return jsonify({'success': False, 'error': 'Failed to check deposits'}), 500
@app.route('/api/v1/wallet/withdraw', methods=['POST'])
@require_auth
@rate_limit(max_calls=5, period=60)
def withdraw():
"""Create a withdrawal request (USDT TRC20)"""
data = request.get_json(silent=True) or {}
amount = data.get('amount', 0)
to_address = (data.get('to_address') or '').strip()
lang = data.get('lang', 'en')
# Validate amount
try:
amount = float(amount)
except (TypeError, ValueError):
return jsonify({'success': False, 'error': L(lang, 'withdraw_invalid_amount')}), 400
if amount < WITHDRAW_MIN:
return jsonify({'success': False, 'error': L(lang, 'withdraw_min')}), 400
# Validate address (Tron addresses start with T, 34 chars)
if not to_address or len(to_address) != 34 or not to_address.startswith('T'):
return jsonify({'success': False, 'error': L(lang, 'withdraw_invalid_address')}), 400
# Total deducted = amount + withdrawal fee
total_deducted = round(amount + WITHDRAW_FEE, 2)
# Atomic balance deduction (prevents race condition with concurrent requests)
if not db.charge_user(request.user['id'], total_deducted):
return jsonify({'success': False, 'error': L(lang, 'withdraw_insufficient')}), 400
withdrawal = db.create_withdrawal(request.user['id'], amount, to_address)
user = db.get_user_by_id(request.user['id'])
return jsonify({
'success': True,
'withdrawal': {
'id': withdrawal['id'],
'amount': withdrawal['amount'],
'fee': WITHDRAW_FEE,
'total_deducted': total_deducted,
'to_address': withdrawal['to_address'],
'status': withdrawal['status'],
'created_at': withdrawal['created_at'],
},
'balance': user['balance'],
})
@app.route('/api/v1/wallet/withdrawals', methods=['GET'])
@require_auth
def get_withdrawals():
"""Get user's withdrawal history"""
withdrawals = db.get_user_withdrawals(request.user['id'], limit=20)
return jsonify({'success': True, 'withdrawals': withdrawals})
# =============================================================================
# CHAT HISTORY ENDPOINTS
# =============================================================================
@app.route('/api/v1/chat/history', methods=['GET'])
@require_auth
def get_chat_history():
limit = min(request.args.get('limit', 50, type=int), 200)
history = db.get_chat_history(request.user['id'], limit=limit)
return jsonify({'success': True, 'messages': history})
@app.route('/api/v1/chat/history', methods=['DELETE'])
@require_auth
def delete_chat_history():
db.clear_chat_history(request.user['id'])
return jsonify({'success': True})
# =============================================================================
# CHAT ENDPOINT
# =============================================================================
@app.route('/api/v1/chat', methods=['POST'])
def chat():
from seafare_agent import _strip_tool_names
"""
Chat with SeaFare Montana bot.
Works for everyone. If logged in — saves chat history.
Anonymous users: 5 msg/hour. Authenticated: 30 msg/min.
"""
user = get_current_user() # None if anonymous
# Per-user rate limiting
ip = _get_client_ip()
if user:
rkey = f"chat:user:{user['id']}"
max_c, period = config.RATE_LIMIT_CHAT_AUTH_MAX, config.RATE_LIMIT_CHAT_AUTH_PERIOD
else:
rkey = f"chat:anon:{ip}"
max_c, period = config.RATE_LIMIT_CHAT_ANON_MAX, config.RATE_LIMIT_CHAT_ANON_PERIOD
now = _time_mod.time()
_rate_buckets[rkey] = [t for t in _rate_buckets[rkey] if now - t < period]
if len(_rate_buckets[rkey]) >= max_c:
return jsonify({'success': False, 'error': 'Too many requests. Please slow down.'}), 429
_rate_buckets[rkey].append(now)
data = request.get_json(silent=True) or {}
message = data.get('message', '').strip()
ui_lang = data.get("lang", "en")
from seafare_agent import _detect_lang as _dl
lang = _dl(message) if message else ui_lang
if not message:
return jsonify({'response': L(lang, 'empty_msg')}), 400
if len(message) > config.CHAT_MESSAGE_MAX_LENGTH:
return jsonify({'response': L(lang, 'msg_too_long')}), 400
try:
_start = _time_mod.time()
# Load conversation history BEFORE saving current message (to avoid duplication)
conversation_history = None
if user:
conversation_history = db.get_chat_history(user['id'], limit=20)
elif data.get('history'):
# Anonymous users: accept client-side history for context continuity
raw_hist = data['history']
if isinstance(raw_hist, list):
conversation_history = [
{'role': h.get('role', 'user'), 'message': h.get('content', '')[:2000]}
for h in raw_hist[-10:]
if isinstance(h, dict) and h.get('content')
]
# Save user message if logged in
if user:
db.add_chat_message(user['id'], 'user', message)
_any_ai = os.environ.get(config.ENV_ANTHROPIC_API_KEY) or os.environ.get(config.ENV_GROQ_API_KEY) or os.environ.get(config.ENV_MISTRAL_API_KEY)
if _any_ai:
from seafare_agent import generate_response, _strip_tool_names
# Query deduplication: same user, same question within 5 min → cached response
_uid = user['id'] if user else ip
_qhash = hash(message.lower().strip())
_dedup_key = f"{_uid}:{_qhash}"
_cached = _response_cache.get(_dedup_key)
if _cached and _time_mod.time() - _cached[1] < 300:
response = _cached[0]
else:
user_context = db.get_profile_summary(user['id']) if user else None
is_admin = bool(user.get('is_admin')) if user else False
user_id = user['id'] if user else None
# Add balance to context so agent knows user's funds
if user:
balance = user.get('balance', 0) or 0
balance_info = f"\nUser USDT balance: ${balance:.2f}"
user_context = (user_context or "") + balance_info
response = generate_response(message, conversation_history=conversation_history, user_context=user_context or None, is_admin=is_admin, user_id=user_id, lang=lang)
_response_cache[_dedup_key] = (response, _time_mod.time())
# Evict old cache entries
if len(_response_cache) > 500:
cutoff = _time_mod.time() - 300
_response_cache.clear()
else:
response = handle_message_local(message, lang)
# Save bot response if logged in
if user:
db.add_chat_message(user['id'], 'assistant', response)
# Auto-summarize conversation every N messages (background)
try:
msg_count = db.get_chat_message_count(user['id'])
if msg_count > 0 and msg_count % config.MEMORY_SUMMARY_INTERVAL == 0:
import threading
threading.Thread(target=_auto_summarize_conversation, args=(user['id'],), daemon=True).start()
except Exception:
pass
# Token/cost tracking now handled inside generate_response() (Ouroboros pattern)
# log_query() called there with input_tokens, output_tokens, cost_usd
_elapsed = int((_time_mod.time() - _start) * 1000)
return jsonify({'response': _strip_tool_names(response), 'ms': _elapsed})
except Exception as e:
logger.error(f"Chat error: {e}")
return jsonify({'response': L(lang, 'error')}), 500
@app.route('/api/v1/chat/stream', methods=['POST'])
def chat_stream():
from seafare_agent import _strip_tool_names
"""
SSE streaming chat — sends real-time status updates while agent processes.
Same logic as /api/v1/chat but streams tool execution status via Server-Sent Events.
"""
user = get_current_user()
# Rate limiting (same as chat)
ip = _get_client_ip()
if user:
rkey = f"chat:user:{user['id']}"
max_c, period = config.RATE_LIMIT_CHAT_AUTH_MAX, config.RATE_LIMIT_CHAT_AUTH_PERIOD
else:
rkey = f"chat:anon:{ip}"
max_c, period = config.RATE_LIMIT_CHAT_ANON_MAX, config.RATE_LIMIT_CHAT_ANON_PERIOD
now = _time_mod.time()
_rate_buckets[rkey] = [t for t in _rate_buckets[rkey] if now - t < period]
if len(_rate_buckets[rkey]) >= max_c:
def _rate_err():
yield f"event: error\ndata: {json.dumps({'error': 'Too many requests'})}\n\n"
return Response(_rate_err(), content_type='text/event-stream', status=429)
_rate_buckets[rkey].append(now)
data = request.get_json(silent=True) or {}
message = data.get('message', '').strip()
ui_lang = data.get('lang', 'en')
# Detect language from message text — override UI language
from seafare_agent import _detect_lang as _dl
lang = _dl(message) if message else ui_lang
if not message:
def _empty_err():
yield f"event: error\ndata: {json.dumps({'error': L(lang, 'empty_msg')})}\n\n"
return Response(_empty_err(), content_type='text/event-stream', status=400)
if len(message) > config.CHAT_MESSAGE_MAX_LENGTH:
def _long_err():
yield f"event: error\ndata: {json.dumps({'error': L(lang, 'msg_too_long')})}\n\n"
return Response(_long_err(), content_type='text/event-stream', status=400)
# Load conversation history
conversation_history = None
if user:
conversation_history = db.get_chat_history(user['id'], limit=20)
elif data.get('history'):
raw_hist = data['history']
if isinstance(raw_hist, list):
conversation_history = [
{'role': h.get('role', 'user'), 'message': h.get('content', '')[:2000]}
for h in raw_hist[-10:]
if isinstance(h, dict) and h.get('content')
]
# Save user message
if user:
db.add_chat_message(user['id'], 'user', message)
_any_ai = os.environ.get(config.ENV_ANTHROPIC_API_KEY) or os.environ.get(config.ENV_GROQ_API_KEY) or os.environ.get(config.ENV_MISTRAL_API_KEY)
if not _any_ai:
def _no_ai():
resp = handle_message_local(message, lang)
yield f"event: done\ndata: {json.dumps({'response': _strip_tool_names(resp)})}\n\n"
return Response(_no_ai(), content_type='text/event-stream')
# Check dedup cache
_uid = user['id'] if user else ip
_qhash = hash(message.lower().strip())
_dedup_key = f"{_uid}:{_qhash}"
_cached = _response_cache.get(_dedup_key)
if _cached and _time_mod.time() - _cached[1] < 300:
cached_resp = _cached[0]
def _cached_gen():
yield f"event: done\ndata: {json.dumps({'response': _strip_tool_names(cached_resp)})}\n\n"
if user:
db.add_chat_message(user['id'], 'assistant', cached_resp)
return Response(_cached_gen(), content_type='text/event-stream')
# Prepare agent context
user_context = db.get_profile_summary(user['id']) if user else None
is_admin = bool(user.get('is_admin')) if user else False
user_id = user['id'] if user else None
if user:
balance = user.get('balance', 0) or 0
user_context = (user_context or "") + f"\nUser USDT balance: ${balance:.2f}"
# SSE generator with agent thread
_q = queue.Queue()
def _status_cb(tool_name, tool_input=None):
from seafare_agent import _get_tool_status
label = _get_tool_status(tool_name, tool_input, lang)
try:
_q.put_nowait(('status', label))
except queue.Full:
pass
def _generate():
from seafare_agent import _strip_tool_names
_start = _time_mod.time()
def _run_agent():
try:
from seafare_agent import generate_response, _strip_tool_names
resp = generate_response(
message, conversation_history=conversation_history,
user_context=user_context or None, is_admin=is_admin,
user_id=user_id, lang=lang, status_callback=_status_cb
)
_q.put(('done', resp))
except Exception as e:
logger.error(f"Chat stream agent error: {e}")
_q.put(('error', str(e)))
t = _threading_mod.Thread(target=_run_agent, daemon=True)
t.start()
final_response = None
while True:
try:
event_type, payload = _q.get(timeout=240)
if event_type == 'status':
yield f"event: status\ndata: {json.dumps({'status': payload})}\n\n"
elif event_type == 'done':
final_response = payload
_elapsed = int((_time_mod.time() - _start) * 1000)
yield f"event: done\ndata: {json.dumps({'response': _strip_tool_names(payload), 'ms': _elapsed})}\n\n"
break
elif event_type == 'error':
yield f"event: error\ndata: {json.dumps({'error': L(lang, 'error')})}\n\n"
break
except queue.Empty:
# Timeout — agent took too long
yield f"event: error\ndata: {json.dumps({'error': 'Request timed out'})}\n\n"
break
# Post-processing: save response, update cache
if final_response and user:
try:
db.add_chat_message(user['id'], 'assistant', final_response)
except Exception:
pass
try:
msg_count = db.get_chat_message_count(user['id'])
if msg_count > 0 and msg_count % config.MEMORY_SUMMARY_INTERVAL == 0:
_threading_mod.Thread(target=_auto_summarize_conversation, args=(user['id'],), daemon=True).start()
except Exception:
pass
if final_response:
_response_cache[_dedup_key] = (final_response, _time_mod.time())
if len(_response_cache) > 500:
_response_cache.clear()
return Response(_generate(), content_type='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
# =============================================================================
# TRANSLATIONS for bot responses
# =============================================================================
TEXTS = {
'empty_msg': {
'en': 'Please send a message.',
'ru': 'Пожалуйста, отправьте сообщение.',
'es': 'Por favor, envíe un mensaje.',
'zh': '请输入消息。',
},
'msg_too_long': {
'en': f'Message too long (max {config.CHAT_MESSAGE_MAX_LENGTH} characters).',
'zh': f'消息过长(最多 {config.CHAT_MESSAGE_MAX_LENGTH} 个字符)。',
'es': f'Mensaje demasiado largo (máx. {config.CHAT_MESSAGE_MAX_LENGTH} caracteres).',
'ru': f'Сообщение слишком длинное (макс. {config.CHAT_MESSAGE_MAX_LENGTH} символов).',
},
'error': {
'en': 'Error processing your request. Please try again.',
'ru': 'Ошибка обработки запроса. Попробуйте ещё раз.',
'es': 'Error al procesar su solicitud. Inténtelo de nuevo.',
'zh': '处理您的请求时出错。请重试。',
},
'no_vessels': {
'en': 'No vessels found for **"{q}"**. Try a different name, IMO, or MMSI number.',
'ru': 'Суда по запросу **"{q}"** не найдены. Попробуйте другое название, IMO или MMSI.',
'es': 'No se encontraron buques para **"{q}"**. Pruebe otro nombre, IMO o MMSI.',
},
'found_vessels': {
'en': 'Found **{n}** vessel(s) for **"{q}"**:\n',
'ru': 'Найдено **{n}** судно(а) по запросу **"{q}"**:\n',
'es': 'Se encontraron **{n}** buque(s) para **"{q}"**:\n',
},
'ask_details': {
'en': '_Ask me for details: "Who owns [vessel name]?"_',
'ru': '_Спросите подробнее: "Кто владелец [название судна]?"_',
'es': '_Pida detalles: "¿Quién es el propietario de [nombre del buque]?"_',
'zh': '_询问详情"[船名]的船主是谁?"_',
},
'not_found': {
'en': 'Vessel **"{q}"** not found.',
'ru': 'Судно **"{q}"** не найдено.',
'es': 'Buque **"{q}"** no encontrado.',
},
'no_imo': {
'en': 'Found vessel but no IMO available.',
'ru': 'Судно найдено, но IMO недоступен.',
'es': 'Buque encontrado pero sin IMO disponible.',
'zh': '已找到船舶,但无 IMO 信息。',
},
'no_details': {
'en': 'Could not retrieve details for **"{q}"**.',
'ru': 'Не удалось получить данные по **"{q}"**.',
'es': 'No se pudieron obtener detalles de **"{q}"**.',
},
'lbl_flag': {'en': 'Flag', 'ru': 'Флаг', 'es': 'Bandera',
'zh': '船旗'},
'lbl_type': {'en': 'Type', 'ru': 'Тип', 'es': 'Tipo',
'zh': '类型'},
'lbl_gt': {'en': 'Gross Tonnage', 'ru': 'Валовый тоннаж', 'es': 'Tonelaje bruto',
'zh': '总吨位'},
'lbl_dwt': {'en': 'DWT', 'ru': 'Дедвейт', 'es': 'DWT',
'zh': '载重吨'},
'lbl_built': {'en': 'Year Built', 'ru': 'Год постройки', 'es': 'Año de construcción',
'zh': '建造年份'},
'lbl_status': {'en': 'Status', 'ru': 'Статус', 'es': 'Estado',
'zh': '状态'},
'lbl_callsign': {'en': 'Call Sign', 'ru': 'Позывной', 'es': 'Señal de llamada',
'zh': '呼号'},
'lbl_companies': {
'en': 'Companies',
'ru': 'Компании',
'es': 'Empresas',
'zh': '公司',
},
'role_owner': {'en': 'Owner', 'ru': 'Владелец', 'es': 'Propietario',
'zh': '船主'},
'role_manager': {'en': 'Manager', 'ru': 'Менеджер', 'es': 'Gerente',
'zh': '管理人'},
'role_ism_manager': {'en': 'ISM Manager', 'ru': 'ISM Менеджер', 'es': 'Gerente ISM',
'zh': 'ISM 管理人'},
'role_operator': {'en': 'Operator', 'ru': 'Оператор', 'es': 'Operador',
'zh': '运营商'},
'role_technical_manager': {'en': 'Technical Manager', 'ru': 'Тех. менеджер', 'es': 'Gerente Técnico',
'zh': '技术管理人'},
'search_error': {
'en': 'Search error: {e}',
'ru': 'Ошибка поиска: {e}',
'es': 'Error de búsqueda: {e}',
},
'details_error': {
'en': 'Error getting details: {e}',
'ru': 'Ошибка получения данных: {e}',
'es': 'Error al obtener detalles: {e}',
},
'pos_header': {
'en': '**Position: {name}**\n',
'ru': '**Позиция: {name}**\n',
'es': '**Posición: {name}**\n',
'zh': '**位置:{name}**\n',
},
'pos_coords': {'en': 'Coordinates', 'ru': 'Координаты', 'es': 'Coordenadas',
'zh': '坐标'},
'pos_speed': {'en': 'Speed', 'ru': 'Скорость', 'es': 'Velocidad',
'zh': '速度'},
'pos_course': {'en': 'Course', 'ru': 'Курс', 'es': 'Rumbo',
'zh': '航向'},
'pos_destination': {'en': 'Destination', 'ru': 'Назначение', 'es': 'Destino',
'zh': '目的地'},
'pos_eta': {'en': 'ETA', 'ru': 'Прибытие', 'es': 'ETA',
'zh': '预计到达'},
'pos_updated': {'en': 'Updated', 'ru': 'Обновлено', 'es': 'Actualizado',
'zh': '更新时间'},
'pos_source': {'en': 'Source', 'ru': 'Источник', 'es': 'Fuente',
'zh': '来源'},
'pos_map_link': {
'en': '**[View on MarineTraffic map]({url})**',
'ru': '**[Смотреть на карте MarineTraffic]({url})**',
'es': '**[Ver en mapa MarineTraffic]({url})**',
'zh': '**[在 MarineTraffic 地图上查看]({url})**',
},
'pos_no_coords': {
'en': '**Position: {name}**\n\n IMO: `{imo}`\n MMSI: `{mmsi}`\n\nLive coordinates are not available via public scraping.\nUse the link below to see the vessel on the map:\n\n**[Open on MarineTraffic]({url})**',
'ru': '**Позиция: {name}**\n\n IMO: `{imo}`\n MMSI: `{mmsi}`\n\nКоординаты недоступны через публичный парсинг.\nИспользуйте ссылку ниже для просмотра на карте:\n\n**[Открыть на MarineTraffic]({url})**',
'es': '**Posición: {name}**\n\n IMO: `{imo}`\n MMSI: `{mmsi}`\n\nLas coordenadas no están disponibles mediante scraping público.\nUse el enlace para ver el buque en el mapa:\n\n**[Abrir en MarineTraffic]({url})**',
},
'pos_vessel_not_found': {
'en': 'Could not find vessel **"{q}"**. Please provide MMSI (9 digits) or IMO (7 digits) for position tracking.',
'ru': 'Не удалось найти судно **"{q}"**. Укажите MMSI (9 цифр) или IMO (7 цифр) для отслеживания позиции.',
'es': 'No se pudo encontrar el buque **"{q}"**. Proporcione MMSI (9 dígitos) o IMO (7 dígitos) para seguimiento.',
},
'demurrage_title': {
'en': '**Demurrage Calculation**',
'ru': '**Расчёт демереджа**',
'es': '**Cálculo de demoras**',
'zh': '**滞期费计算**',
},
'demurrage_agreed': {'en': 'Agreed days', 'ru': 'Согласовано дней', 'es': 'Días acordados',
'zh': '合同天数'},
'demurrage_actual': {'en': 'Actual days', 'ru': 'Фактически дней', 'es': 'Días reales',
'zh': '实际天数'},
'demurrage_delay': {'en': 'Delay', 'ru': 'Задержка', 'es': 'Demora',
'zh': '延误'},
'demurrage_days': {'en': 'days', 'ru': 'дн.', 'es': 'días',
'zh': ''},
'demurrage_rate': {'en': 'Daily rate', 'ru': 'Ставка/день', 'es': 'Tarifa diaria',
'zh': '日费率'},
'demurrage_total': {'en': 'Total demurrage', 'ru': 'Итого демередж', 'es': 'Total demoras',
'zh': '总滞期费'},
'demurrage_formula': {'en': 'Formula', 'ru': 'Формула', 'es': 'Fórmula',
'zh': '计算公式'},
'demurrage_prompt': {
'en': '**Demurrage Calculator**\n\nPlease provide:\n 1. Agreed days (contract laytime)\n 2. Actual days spent\n 3. Daily rate in USD\n\n_Example: "Calculate demurrage: 10 agreed days, 15 actual days, $25,000 daily rate"_',
'ru': '**Калькулятор демереджа**\n\nУкажите:\n 1. Согласованные дни (контрактный лейтайм)\n 2. Фактические дни\n 3. Ставка в день (USD)\n\nример: "Calculate demurrage: 10 agreed days, 15 actual days, $25,000 daily rate"_',
'es': '**Calculadora de demoras**\n\nProporcione:\n 1. Días acordados (laytime del contrato)\n 2. Días reales\n 3. Tarifa diaria en USD\n\n_Ejemplo: "Calculate demurrage: 10 agreed days, 15 actual days, $25,000 daily rate"_',
},
'contacts_info': {
'en': '**Contact Introduction Service** — temporarily FREE\n\nI can connect you with:\n - Ship owners and operators\n - Freight brokers\n - Port agents\n - Chartering companies\n\nJust ask me to find contacts for any company or vessel!',
'zh': '**联系人介绍服务** — 暂时免费\n\n我可以为您联系:\n - 船主和运营商\n - 货运经纪人\n - 港口代理\n - 租船公司\n\n只需让我为任意公司或船舶找联系人!',
'es': '**Servicio de contactos** — temporalmente GRATIS\n\nPuedo conectarle con:\n - Propietarios y operadores de buques\n - Corredores de flete\n - Agentes portuarios\n - Empresas de fletamento\n\n¡Solo pídame buscar contactos de cualquier empresa o buque!',
'ru': '**Предоставление контактов** — временно БЕСПЛАТНО\n\nЯ могу связать вас с:\n - Судовладельцами и операторами\n - Фрахтовыми брокерами\n - Портовыми агентами\n - Чартерными компаниями\n\nПросто попросите найти контакты любой компании или судна!',
},
'auth_email_required': {
'en': 'Please enter a valid email.',
'ru': 'Укажите корректный email.',
'es': 'Ingrese un email válido.',
'zh': '请输入有效的电子邮件。',
},
'auth_password_required': {
'en': 'Password must be at least 6 characters.',
'ru': 'Пароль должен быть не менее 6 символов.',
'es': 'La contraseña debe tener al menos 6 caracteres.',
'zh': '密码至少需要6个字符。',
},
'code_sent': {
'en': 'Verification code sent to your email.',
'ru': 'Код подтверждения отправлен на вашу почту.',
'es': 'Código de verificación enviado a su correo.',
'zh': '验证码已发送至您的邮箱。',
},
'code_invalid': {
'en': 'Invalid or expired code.',
'ru': 'Неверный или просроченный код.',
'es': 'Código inválido o expirado.',
'zh': '验证码无效或已过期。',
},
'code_required': {
'en': 'Email verification required.',
'ru': 'Требуется подтверждение email.',
'es': 'Se requiere verificación de email.',
'zh': '需要验证电子邮件。',
},
'code_expired': {
'en': 'Verification expired. Please request a new code.',
'ru': 'Подтверждение истекло. Запросите новый код.',
'es': 'Verificación expirada. Solicite un nuevo código.',
'zh': '验证已过期。请重新申请验证码。',
},
'auth_email_exists': {
'en': 'An account with this email already exists.',
'ru': 'Аккаунт с таким email уже существует.',
'es': 'Ya existe una cuenta con este email.',
'zh': '此邮箱已注册账户。',
},
'auth_invalid': {
'en': 'Invalid email or password.',
'ru': 'Неверный email или пароль.',
'es': 'Email o contraseña incorrectos.',
'zh': '邮箱或密码错误。',
},
'withdraw_invalid_amount': {
'en': 'Please enter a valid amount.',
'ru': 'Укажите корректную сумму.',
'es': 'Ingrese un monto válido.',
'zh': '请输入有效金额。',
},
'withdraw_min': {
'en': 'Minimum withdrawal amount is $2 USDT.',
'ru': 'Минимальная сумма вывода — $2 USDT.',
'es': 'El monto mínimo de retiro es $2 USDT.',
'zh': '最低提款金额为 $2 USDT。',
},
'withdraw_invalid_address': {
'en': 'Please enter a valid TRC20 address (starts with T, 34 characters).',
'ru': 'Укажите корректный TRC20 адрес (начинается с T, 34 символа).',
'es': 'Ingrese una dirección TRC20 válida (comienza con T, 34 caracteres).',
'zh': '请输入有效的 TRC20 地址(以 T 开头34个字符',
},
'withdraw_insufficient': {
'en': 'Insufficient balance for this withdrawal.',
'ru': 'Недостаточный баланс для вывода.',
'es': 'Saldo insuficiente para este retiro.',
'zh': '余额不足,无法完成此次提款。',
},
'services_info': {
'en': '**SeaFare Montana**\n\nHere\'s what I can do (all services temporarily FREE):\n\n - Vessel search by name, IMO, or MMSI\n - Vessel details (owner, operator, manager)\n - Live position tracking (AIS)\n - Demurrage calculation\n - Sea route calculation\n - Vessels near port\n - Contact introductions (shipper <-> operator)\n\n_Try: "Search vessel EVER GIVEN" or "Vessels near Rotterdam"_',
'zh': '**SeaFare Montana — 海运物流 AI 代理**\n\n我能做的事(所有服务暂时免费):\n\n - 按名称、IMO 或 MMSI 搜索船舶\n - 船舶详情(船主、运营商、管理人)\n - 实时位置追踪AIS\n - 滞期费计算\n - 海上航线计算\n - 港口附近船舶\n - 联系人介绍(货主 <-> 运营商)\n\n_试试"Search vessel EVER GIVEN""鹿特丹附近的船舶"_',
'es': '**SeaFare Montana — Agente AI de Logística Marítima**\n\nLo que puedo hacer (todos los servicios temporalmente GRATIS):\n\n - Búsqueda de buques por nombre, IMO o MMSI\n - Detalles del buque (propietario, operador, gerente)\n - Seguimiento de posición en vivo (AIS)\n - Cálculo de demoras\n - Cálculo de rutas marítimas\n - Buques cerca del puerto\n - Presentación de contactos (cargador <-> operador)\n\n_Pruebe: "Search vessel EVER GIVEN" o "Buques cerca de Rotterdam"_',
'ru': '**SeaFare Montana**\n\nМои возможности (все услуги временно БЕСПЛАТНЫ):\n\n - Поиск судна по названию, IMO или MMSI\n - Данные о судне (владелец, оператор, менеджер)\n - Отслеживание позиции (AIS)\n - Расчёт демереджа\n - Расчёт морского маршрута\n - Суда рядом с портом\n - Предоставление контактов (грузоотправитель <-> оператор)\n\nопробуйте: "Search vessel EVER GIVEN" или "Суда рядом с Роттердамом"_',
},
}
def L(lang: str, key: str, **kwargs) -> str:
"""Get localized text"""
texts = TEXTS.get(key, {})
text = texts.get(lang, texts.get('en', key))
if kwargs:
text = text.format(**kwargs)
return text
ROLE_NAMES = {
'owner': 'role_owner',
'manager': 'role_manager',
'ism_manager': 'role_ism_manager',
'operator': 'role_operator',
'technical_manager': 'role_technical_manager',
}
# =============================================================================
# AUTO-SUMMARIZE CONVERSATIONS
# =============================================================================
def _auto_summarize_conversation(user_id: int):
"""Background task: summarize recent chat using Haiku for long-term memory."""
try:
import anthropic
recent = db.get_chat_history(user_id, limit=20)
if len(recent) < 10:
return
text = "\n".join(f"{m['role']}: {m.get('message', '')[:200]}" for m in recent)
client = anthropic.Anthropic(api_key=os.environ.get(config.ENV_ANTHROPIC_API_KEY))
resp = client.messages.create(
model='claude-haiku-4-5-20251001',
max_tokens=200,
system="Summarize this maritime chat in 2-3 sentences. Focus on: vessels/ports searched, decisions made, user preferences. Be concise.",
messages=[{"role": "user", "content": text}]
)
summary = resp.content[0].text
db.save_conversation_summary(user_id, summary, len(recent))
logger.info(f"Auto-summary saved for user {user_id}: {summary[:80]}...")
except Exception as e:
logger.warning(f"Auto-summarize failed for user {user_id}: {e}")
# =============================================================================
# MESSAGE HANDLER
# =============================================================================
def handle_message_local(message: str, lang: str = 'en') -> str:
"""Handle messages without OpenAI — direct tool matching"""
msg = message.lower()
# --- Vessel search ---
if any(kw in msg for kw in ['search vessel', 'find vessel', 'find ship', 'search ship', 'look up', 'lookup',
'найти', 'найди', 'поиск', 'искать', 'ищи', 'судно',
'buscar', 'busca', 'búsqueda', 'encontrar', 'encuentra']):
query = extract_vessel_query(message)
if query:
return do_vessel_search(query, lang)
# --- Owner / operator ---
if any(kw in msg for kw in ['who owns', 'owner of', 'operator of', 'who is the owner', 'manager of',
'владелец', 'кто владеет', 'оператор', 'чьё', 'чье',
'propietario', 'dueño', 'operador', 'quién es']):
query = extract_vessel_query(message)
if query:
return do_vessel_details(query, lang)
# --- Vessel info by IMO/MMSI ---
imo_match = re.search(r'\b(\d{7})\b', message)
mmsi_match = re.search(r'\b(\d{9})\b', message)
if any(kw in msg for kw in ['position', 'where is', 'location', 'track',
'позиция', 'где находится', 'где', 'местоположение', 'отследить',
'posición', 'donde', 'ubicación', 'rastrear']):
if mmsi_match:
return do_position(mmsi_match.group(1), lang)
if imo_match:
return do_position(imo_match.group(1), lang)
query = extract_vessel_query(message)
if query:
return do_position(query, lang)
# --- Demurrage ---
if any(kw in msg for kw in ['demurrage', 'delay cost', 'демередж', 'простой', 'demora']):
return do_demurrage(message, lang)
# --- Contacts (paid) ---
if any(kw in msg for kw in ['contact', 'introduction', 'connect me', 'broker', 'find operator',
'контакт', 'связь', 'брокер', 'contacto', 'corredor']):
return do_contacts_info(lang)
# --- Services ---
if any(kw in msg for kw in ['service', 'what can you', 'help', 'what do you offer', 'pricing',
'услуг', 'помощь', 'что умеешь', 'servicio', 'ayuda']):
return do_services(lang)
# --- Generic vessel name detection ---
vessel_name = re.search(r'\b([A-Z][A-Z\s]{2,30})\b', message)
if vessel_name:
query = vessel_name.group(1).strip()
if len(query) >= 3:
return do_vessel_search(query, lang)
if imo_match:
return do_vessel_details(imo_match.group(1), lang)
if mmsi_match:
return do_vessel_details(mmsi_match.group(1), lang)
# --- Default ---
return do_services(lang)
def extract_vessel_query(message: str) -> str:
"""Extract vessel name/IMO/MMSI from message"""
num = re.search(r'\b(\d{7,9})\b', message)
if num:
return num.group(1)
quoted = re.search(r'["\'](.+?)["\']', message)
if quoted:
return quoted.group(1)
upper = re.search(r'\b([A-Z][A-Z\s]{2,30})\b', message)
if upper:
return upper.group(1).strip()
skip = {'search', 'find', 'vessel', 'ship', 'lookup', 'look', 'up', 'for',
'the', 'a', 'an', 'who', 'owns', 'owner', 'of', 'is', 'what',
'where', 'position', 'tell', 'me', 'about', 'can', 'you', 'please',
'get', 'show', 'info', 'information', 'details', 'operator', 'manager',
'найти', 'найди', 'судно', 'корабль', 'кто', 'владелец', 'оператор',
'покажи', 'где', 'поиск', 'искать', 'ищи', 'дай', 'мне', 'про',
'buscar', 'busca', 'buque', 'barco', 'navío', 'quien', 'quién',
'propietario', 'mostrar', 'donde', 'encuentra', 'encontrar', 'el', 'la', 'del'}
words = [w for w in message.split() if w.lower() not in skip and len(w) > 1]
if words:
return ' '.join(words[-3:])
return ''
def do_vessel_search(query: str, lang: str = 'en') -> str:
"""Search vessel via Equasis"""
try:
equasis = eq.get_parser()
results = equasis.search_vessel(query)
if not results:
return L(lang, 'no_vessels', q=query)
lines = [L(lang, 'found_vessels', n=len(results), q=query)]
for v in results[:5]:
lines.append(f"**{v.get('name', '?')}**")
lines.append(f" IMO: `{v.get('imo', '?')}` | {L(lang, 'lbl_type')}: {v.get('type', '?')} | "
f"{L(lang, 'lbl_gt')}: {v.get('gross_tonnage', '?')} | {L(lang, 'lbl_built')}: {v.get('year_built', '?')} | "
f"{L(lang, 'lbl_flag')}: {v.get('flag', '?')}")
lines.append("")
lines.append(L(lang, 'ask_details'))
return '\n'.join(lines)
except Exception as e:
return L(lang, 'search_error', e=e)
def do_vessel_details(query: str, lang: str = 'en') -> str:
"""Get vessel details from Equasis"""
try:
equasis = eq.get_parser()
if query.isdigit() and len(query) == 7:
details = equasis.get_vessel_details(query)
else:
results = equasis.search_vessel(query)
if not results:
return L(lang, 'not_found', q=query)
imo = results[0].get('imo')
if not imo:
return L(lang, 'no_imo')
details = equasis.get_vessel_details(imo)
if not details or not details.get('name'):
return L(lang, 'no_details', q=query)
d = details
lines = [
f"**{d.get('name', '?')}**\n",
f" IMO: `{d.get('imo', '-')}`",
f" MMSI: `{d.get('mmsi', '-')}`",
f" {L(lang, 'lbl_callsign')}: `{d.get('callsign', '-')}`",
f" {L(lang, 'lbl_flag')}: {d.get('flag', '-')}",
f" {L(lang, 'lbl_type')}: {d.get('type', '-')}",
f" {L(lang, 'lbl_gt')}: {d.get('gross_tonnage', '-')}",
f" {L(lang, 'lbl_dwt')}: {d.get('deadweight', '-')}",
f" {L(lang, 'lbl_built')}: {d.get('year_built', '-')}",
f" {L(lang, 'lbl_status')}: {d.get('status', '-')}",
]
companies = d.get('companies', [])
if companies:
lines.append(f"\n**{L(lang, 'lbl_companies')}:**")
for c in companies:
role_key = ROLE_NAMES.get(c.get('role', ''), '')
role = L(lang, role_key) if role_key else c.get('role', '?').replace('_', ' ').title()
name = c.get('name', '?')
addr = c.get('address', '')
lines.append(f" {role}: **{name}**")
if addr:
lines.append(f" _{addr[:60]}_")
return '\n'.join(lines)
except Exception as e:
return L(lang, 'details_error', e=e)
def do_position(query: str, lang: str = 'en') -> str:
"""Get vessel position — tries MarineTraffic, falls back to link"""
try:
mmsi = None
imo = None
name = query
# Determine what we have: MMSI, IMO, or name
if query.isdigit() and len(query) == 9:
mmsi = query
elif query.isdigit() and len(query) == 7:
imo = query
else:
# Search Equasis for MMSI/IMO
equasis = eq.get_parser()
results = equasis.search_vessel(query)
if results:
imo = results[0].get('imo')
name = results[0].get('name', query)
# Get details for MMSI
if imo:
details = equasis.get_vessel_details(imo)
mmsi = details.get('mmsi')
name = details.get('name', name)
else:
return L(lang, 'pos_vessel_not_found', q=query)
# Build MarineTraffic URL
if mmsi:
mt_url = f"https://www.marinetraffic.com/en/ais/details/ships/mmsi:{mmsi}"
elif imo:
mt_url = f"https://www.marinetraffic.com/en/ais/details/ships/imo:{imo}"
else:
mt_url = f"https://www.marinetraffic.com/en/ais/index/search/all/keyword:{query}"
# Try MarineTraffic API if key available
parser = mt.get_parser()
if parser.has_api_key():
pos_data = parser.api_vessel_position(mmsi=mmsi, imo=imo)
if pos_data and isinstance(pos_data, list) and len(pos_data) > 0:
p = pos_data[0]
lines = [L(lang, 'pos_header', name=name)]
lines.append(f" {L(lang, 'pos_coords')}: **{p.get('LAT', '?')}, {p.get('LON', '?')}**")
if p.get('SPEED'):
lines.append(f" {L(lang, 'pos_speed')}: **{float(p['SPEED'])/10:.1f} kn**")
if p.get('COURSE'):
lines.append(f" {L(lang, 'pos_course')}: **{p['COURSE']}°**")
if p.get('DESTINATION'):
lines.append(f" {L(lang, 'pos_destination')}: **{p['DESTINATION']}**")
if p.get('ETA'):
lines.append(f" {L(lang, 'pos_eta')}: **{p['ETA']}**")
if p.get('TIMESTAMP'):
lines.append(f" {L(lang, 'pos_updated')}: {p['TIMESTAMP']}")
lines.append(f" {L(lang, 'pos_source')}: MarineTraffic API")
lines.append(f"\n{L(lang, 'pos_map_link', url=mt_url)}")
return '\n'.join(lines)
# Try public scraping
if mmsi:
vessel_data = parser.get_vessel_page(mmsi)
lat = vessel_data.get('latitude')
lon = vessel_data.get('longitude')
if lat and lon:
lines = [L(lang, 'pos_header', name=vessel_data.get('name', name))]
lines.append(f" {L(lang, 'pos_coords')}: **{lat}, {lon}**")
if vessel_data.get('speed'):
lines.append(f" {L(lang, 'pos_speed')}: **{vessel_data['speed']} kn**")
if vessel_data.get('course'):
lines.append(f" {L(lang, 'pos_course')}: **{vessel_data['course']}°**")
if vessel_data.get('destination'):
lines.append(f" {L(lang, 'pos_destination')}: **{vessel_data['destination']}**")
lines.append(f" {L(lang, 'pos_source')}: MarineTraffic")
lines.append(f"\n{L(lang, 'pos_map_link', url=mt_url)}")
return '\n'.join(lines)
# Fallback: show what we know + direct link
return L(lang, 'pos_no_coords',
name=name,
imo=imo or '-',
mmsi=mmsi or '-',
url=mt_url)
except Exception as e:
return L(lang, 'details_error', e=e)
def do_demurrage(message: str, lang: str = 'en') -> str:
"""Calculate demurrage from message"""
numbers = re.findall(r'[\d,]+(?:\.\d+)?', message.replace(',', ''))
numbers = [float(n) for n in numbers if n]
if len(numbers) >= 3:
agreed = numbers[0]
actual = numbers[1]
rate = numbers[2]
delay = max(0, actual - agreed)
total = delay * rate
return (f"{L(lang, 'demurrage_title')}\n\n"
f" {L(lang, 'demurrage_agreed')}: **{agreed:.0f}**\n"
f" {L(lang, 'demurrage_actual')}: **{actual:.0f}**\n"
f" {L(lang, 'demurrage_delay')}: **{delay:.0f} {L(lang, 'demurrage_days')}**\n"
f" {L(lang, 'demurrage_rate')}: **${rate:,.0f}**\n\n"
f" **{L(lang, 'demurrage_total')}: ${total:,.0f}**\n\n"
f" {L(lang, 'demurrage_formula')}: ({actual:.0f} - {agreed:.0f}) x ${rate:,.0f} = **${total:,.0f}**")
return L(lang, 'demurrage_prompt')
def do_contacts_info(lang: str = 'en') -> str:
"""Info about paid contacts service"""
return L(lang, 'contacts_info')
def do_services(lang: str = 'en') -> str:
"""List available services"""
return L(lang, 'services_info')
# =============================================================================
# VESSEL ENDPOINTS
# =============================================================================
@app.route('/api/v1/vessel/<identifier>', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def get_vessel(identifier: str):
"""
Get vessel by MMSI, IMO, or name
First checks local DB, then fetches from MarineTraffic
"""
# Determine identifier type
vessel = None
try:
if identifier.isdigit() and len(identifier) == 9:
vessel = db.get_vessel(mmsi=identifier)
search_param = {'mmsi': identifier}
elif identifier.isdigit() and len(identifier) == 7:
vessel = db.get_vessel(imo=identifier)
search_param = {'imo': identifier}
else:
vessel = db.get_vessel(name=identifier)
search_param = {'name': identifier}
except Exception as e:
logger.error(f"Vessel DB lookup error: {e}")
search_param = {'mmsi': identifier} if identifier.isdigit() else {'name': identifier}
# Check if we need to refresh
if vessel:
updated = datetime.fromisoformat(vessel.get('updated_at', '2000-01-01'))
if datetime.utcnow() - updated < timedelta(hours=VESSEL_CACHE_HOURS):
return jsonify({
'success': True,
'source': 'cache',
'vessel': vessel
})
# Fetch from MarineTraffic
parser = mt.get_parser()
mmsi = search_param.get('mmsi') or (vessel.get('mmsi') if vessel else None)
if mmsi:
fresh_data = parser.get_vessel_page(mmsi)
if fresh_data:
db.upsert_vessel(fresh_data)
return jsonify({
'success': True,
'source': 'marinetraffic',
'vessel': fresh_data
})
if vessel:
return jsonify({
'success': True,
'source': 'cache_stale',
'vessel': vessel
})
return jsonify({
'success': False,
'error': 'Vessel not found'
}), 404
@app.route('/api/v1/vessel/search', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def search_vessels():
"""
Search vessels by query
"""
query = request.args.get('q', '')
if not query or len(query) < 2:
return jsonify({
'success': False,
'error': 'Query too short (min 2 chars)'
}), 400
# Search local DB first
results = db.search_vessels(query, limit=20)
# If few results, search MarineTraffic
if len(results) < 5:
mt_results = mt.search_vessel(query)
for v in mt_results:
if v.get('mmsi') and not any(r.get('mmsi') == v.get('mmsi') for r in results):
results.append(v)
# Cache to DB
db.upsert_vessel(v)
return jsonify({
'success': True,
'count': len(results),
'vessels': results[:20]
})
# =============================================================================
# POSITION ENDPOINTS
# =============================================================================
@app.route('/api/v1/position/<mmsi>', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def get_position(mmsi: str):
"""
Get current vessel position
"""
# Check cache
cached = db.get_last_position(mmsi)
if cached:
received = datetime.fromisoformat(cached.get('received_at', '2000-01-01'))
if datetime.utcnow() - received < timedelta(minutes=POSITION_CACHE_MINUTES):
return jsonify({
'success': True,
'source': 'cache',
'position': cached
})
# Fetch fresh from MarineTraffic
parser = mt.get_parser()
vessel_data = parser.get_vessel_page(mmsi)
if vessel_data.get('latitude') and vessel_data.get('longitude'):
db.add_position(
mmsi=mmsi,
lat=vessel_data['latitude'],
lon=vessel_data['longitude'],
speed=vessel_data.get('speed'),
course=vessel_data.get('course'),
heading=vessel_data.get('heading'),
destination=vessel_data.get('destination'),
eta=vessel_data.get('eta')
)
return jsonify({
'success': True,
'source': 'marinetraffic',
'position': {
'mmsi': mmsi,
'latitude': vessel_data['latitude'],
'longitude': vessel_data['longitude'],
'speed': vessel_data.get('speed'),
'course': vessel_data.get('course'),
'timestamp': datetime.utcnow().isoformat()
}
})
if cached:
return jsonify({
'success': True,
'source': 'cache_stale',
'position': cached
})
return jsonify({
'success': False,
'error': 'Position not available'
}), 404
@app.route('/api/v1/position/<mmsi>/history', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def get_position_history(mmsi: str):
"""
Get vessel position history
"""
hours = request.args.get('hours', 24, type=int)
history = db.get_position_history(mmsi, hours=hours)
return jsonify({
'success': True,
'mmsi': mmsi,
'hours': hours,
'count': len(history),
'positions': history
})
# =============================================================================
# PORT VESSEL ENDPOINTS
# =============================================================================
@app.route('/api/v1/port/vessels', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def get_port_vessels_api():
"""Get vessels currently in/near a port"""
port_name = request.args.get('port', '')
if not port_name or len(port_name) < 2:
return jsonify({'success': False, 'error': 'Port name required (min 2 chars)'}), 400
port = mt.resolve_port(port_name)
if not port:
return jsonify({'success': False, 'error': f"Port '{port_name}' not found"}), 404
cache_key = port.get('unlocode', port['name']).lower()
cached = db.get_cached_port_vessels(cache_key)
if cached is not None:
return jsonify({
'success': True, 'source': 'cache',
'port': port['name'], 'country': port['country'],
'coordinates': {'lat': port['lat'], 'lon': port['lon']},
'count': len(cached), 'vessels': cached
})
parser = mt.get_parser()
vessels = parser.get_port_vessels(port_name)
if vessels:
db.cache_port_vessels(cache_key, vessels)
return jsonify({
'success': True, 'source': 'live' if vessels else 'unavailable',
'port': port['name'], 'country': port['country'],
'coordinates': {'lat': port['lat'], 'lon': port['lon']},
'count': len(vessels), 'vessels': vessels
})
# =============================================================================
# MAP VESSELS ENDPOINT
# =============================================================================
@app.route('/api/v1/map/vessels', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def get_map_vessels_api():
"""Get vessel positions within a bounding box for map display.
Uses ALL sources: DB cache → MarineTraffic scraping → AISStream → Digitraffic → AISHub."""
try:
lat_min = float(request.args.get('lat_min', 0))
lat_max = float(request.args.get('lat_max', 0))
lon_min = float(request.args.get('lon_min', 0))
lon_max = float(request.args.get('lon_max', 0))
except (ValueError, TypeError):
return jsonify({'success': False, 'error': 'Invalid coordinates'}), 400
if lat_min >= lat_max or lon_min >= lon_max:
return jsonify({'success': False, 'error': 'Invalid bounding box'}), 400
# Allow larger viewports (up to 60° lat × 120° lon for global view)
if (lat_max - lat_min) > 60 or (lon_max - lon_min) > 120:
return jsonify({'success': False, 'error': 'Bounding box too large'}), 400
limit = min(int(request.args.get('limit', 500)), 1000)
center_lat = (lat_min + lat_max) / 2
center_lon = (lon_min + lon_max) / 2
radius_nm = max((lat_max - lat_min) * 60 / 2, 1)
# Use full AIS provider chain (DB → MT scraping → AISStream → Digitraffic → AISHub)
try:
from ais_provider import get_provider
provider = get_provider()
vessels = provider.get_vessels_in_area(
lat=center_lat, lon=center_lon, radius_nm=min(radius_nm, 500))
except Exception as e:
logger.error(f"Map AIS provider error: {e}")
vessels = []
# Fallback: direct DB query (positions from any source, up to 60 min old)
if not vessels:
try:
vessels = db.get_positions_in_area(lat_min, lat_max, lon_min, lon_max,
max_age_minutes=60)
except Exception as e:
logger.error(f"Map DB fallback error: {e}")
vessels = []
# Enrich with mt_bulk_staging (global fleet, 1-24h positions)
existing_mmsis = {str(v.get('mmsi')) for v in vessels if v.get('mmsi')}
remaining = limit - len(vessels)
if remaining > 0:
try:
mt_staging = db.get_mt_bulk_staging_in_area(
lat_min, lat_max, lon_min, lon_max, limit=remaining)
for v in mt_staging:
mmsi = str(v.get('mmsi', ''))
if mmsi and mmsi not in existing_mmsis:
existing_mmsis.add(mmsi)
vessels.append(v)
except Exception as e:
logger.error(f"Map mt_bulk_staging error: {e}")
from marinetraffic_parser import classify_vessel_type
for v in vessels:
if not v.get('type_category') or v['type_category'] == 'other':
v['type_category'] = classify_vessel_type(
v.get('type') or '', v.get('type_code'))
# Normalize lat/lon keys for frontend
if 'lat' in v and 'latitude' not in v:
v['latitude'] = v.pop('lat')
if 'lon' in v and 'longitude' not in v:
v['longitude'] = v.pop('lon')
vessels = vessels[:limit]
return jsonify({
'success': True,
'count': len(vessels),
'bounds': {'lat_min': lat_min, 'lat_max': lat_max,
'lon_min': lon_min, 'lon_max': lon_max},
'vessels': vessels
})
# =============================================================================
# MAP VESSELS BULK ENDPOINT (for Supercluster clustering)
# =============================================================================
@app.route('/api/v1/map/vessels/all', methods=['GET'])
@rate_limit(max_calls=5, period=60)
def get_map_vessels_all_api():
"""Get ALL vessel positions (minimal fields) for client-side clustering.
Returns compact array: [mmsi, lat, lon, type_cat, heading, speed, course, name]"""
try:
rows = db.get_all_vessel_positions_minimal()
except Exception as e:
logger.error(f"Bulk vessels error: {e}")
return jsonify({'success': False, 'error': 'Database error'}), 500
from marinetraffic_parser import classify_vessel_type
cat_codes = config.TYPE_CAT_CODES
# Aliases for categories not in TYPE_CAT_CODES
cat_aliases = {'general': 'cargo', 'highspeed': 'other', 'pleasure': 'other',
'sailing': 'other', 'military': 'other'}
# Compact array format: each vessel = [mmsi, lat, lon, type_cat_int, heading, speed, course, name]
compact = []
for r in rows:
tc = (r.get('type_category') or 'other').lower().strip()
tc = cat_aliases.get(tc, tc)
if tc not in cat_codes:
tc = classify_vessel_type(tc, None)
tc = cat_aliases.get(tc, tc)
cat_int = cat_codes.get(tc, 9) # 9 = other
compact.append([
r['mmsi'],
round(r['lat'], 5) if r['lat'] is not None else None,
round(r['lon'], 5) if r['lon'] is not None else None,
cat_int,
r.get('heading'),
round(r['speed'], 1) if r.get('speed') is not None else None,
r.get('course'),
r.get('name') or ''
])
resp = make_response(jsonify({
'success': True,
'count': len(compact),
'fields': ['mmsi', 'lat', 'lon', 'type_cat', 'heading', 'speed', 'course', 'name'],
'vessels': compact
}))
resp.headers['Cache-Control'] = 'public, max-age=120'
return resp
@app.route('/api/v1/port/resolve', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def resolve_port_api():
"""Resolve port name to coordinates"""
name = request.args.get('name', '').strip()
if not name:
return jsonify({'success': False, 'error': 'Port name required'}), 400
port = mt.resolve_port(name)
if not port:
return jsonify({'success': False, 'error': f'Port not found: {name}'}), 404
return jsonify({'success': True, 'port': {
'name': port.get('name', name),
'country': port.get('country', ''),
'lat': port.get('lat'),
'lon': port.get('lon'),
'unlocode': port.get('unlocode', ''),
}})
@app.route('/api/v1/ports/search', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def search_ports_api():
"""Search ports by name for autocomplete."""
q = request.args.get('q', '').strip().lower()
if len(q) < 2:
return jsonify({'success': True, 'ports': []})
limit = min(int(request.args.get('limit', 10)), 50)
results = []
for key, port in mt.WORLD_PORTS.items():
if q in key or q in port.get('name', '').lower():
results.append({
'key': key, 'name': port['name'],
'country': port.get('country', ''),
'unlocode': port.get('unlocode', ''),
})
if len(results) >= limit:
break
return jsonify({'success': True, 'ports': results})
@app.route('/api/v1/route', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def calculate_route_api():
"""Calculate sea route between two ports"""
from_port = request.args.get('from', '')
to_port = request.args.get('to', '')
vessel_type = request.args.get('vessel_type', 'default')
dwt = request.args.get('dwt', type=float)
if not from_port or not to_port:
return jsonify({'success': False, 'error': 'Both "from" and "to" port parameters required'}), 400
route = mt.calculate_sea_route(from_port, to_port, vessel_type, dwt=dwt)
if not route:
return jsonify({'success': False, 'error': 'Could not calculate route. Check port names.'}), 404
return jsonify({'success': True, **route})
# =============================================================================
# FREIGHT RATE ENDPOINT
# =============================================================================
@app.route('/api/v1/freight-rate', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def freight_rate_api():
"""Estimate freight rate for a route"""
from_port = request.args.get('from', '')
to_port = request.args.get('to', '')
vessel_type = request.args.get('vessel_type', 'bulk')
dwt = request.args.get('dwt', type=float)
if not from_port or not to_port:
return jsonify({'success': False, 'error': 'Both "from" and "to" required'}), 400
rate = mt.estimate_freight_rate(from_port, to_port, vessel_type, dwt=dwt)
if not rate:
return jsonify({'success': False, 'error': 'Could not estimate rate'}), 404
return jsonify({'success': True, **rate})
# =============================================================================
# SANCTIONS SCREENING ENDPOINT
# =============================================================================
@app.route('/api/v1/sanctions/screen', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def sanctions_screen_api():
"""Screen vessel or company against sanctions lists"""
vessel_name = request.args.get('vessel_name')
imo = request.args.get('imo')
flag = request.args.get('flag')
company = request.args.get('company')
companies = [company] if company else None
result = mt.screen_sanctions(vessel_name=vessel_name, imo=imo, flag=flag, companies=companies)
return jsonify({'success': True, **result})
# =============================================================================
# PORT CONGESTION ENDPOINT
# =============================================================================
@app.route('/api/v1/port/congestion', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def port_congestion_api():
"""Get port congestion estimate"""
port_name = request.args.get('port', '')
if not port_name:
return jsonify({'success': False, 'error': 'Port name required'}), 400
result = mt.estimate_port_congestion(port_name)
if not result:
return jsonify({'success': False, 'error': f"Port '{port_name}' not found"}), 404
return jsonify({'success': True, **result})
# =============================================================================
# BUNKER PRICE ENDPOINTS
# =============================================================================
@app.route('/api/v1/bunker/prices', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def bunker_prices_api():
"""Get bunker prices at a port"""
port_name = request.args.get('port', '')
if not port_name:
return jsonify({'success': False, 'error': 'Port name required'}), 400
result = mt.get_bunker_prices(port_name)
if not result:
return jsonify({'success': False, 'error': f"Port '{port_name}' not found"}), 404
return jsonify({'success': True, **result})
@app.route('/api/v1/bunker/optimize', methods=['GET'])
@rate_limit(max_calls=20, period=60)
def bunker_optimize_api():
"""Optimize bunkering along a route"""
from_port = request.args.get('from', '')
to_port = request.args.get('to', '')
vessel_type = request.args.get('vessel_type', 'bulk')
dwt = request.args.get('dwt', type=float)
if not from_port or not to_port:
return jsonify({'success': False, 'error': 'Both "from" and "to" required'}), 400
result = mt.optimize_bunker_route(from_port, to_port, vessel_type, dwt=dwt)
if not result:
return jsonify({'success': False, 'error': 'Could not optimize bunker route'}), 404
return jsonify({'success': True, **result})
# =============================================================================
# CONTACT ENDPOINTS (temporarily free)
# =============================================================================
@app.route('/api/v1/contacts/vessel/<mmsi>', methods=['GET'])
@rate_limit(max_calls=20, period=60)
@require_auth
def get_vessel_contacts(mmsi: str):
"""
Get contacts for vessel (owner, operator, agent)
Temporarily free — no charge.
"""
contacts = db.get_contacts_for_vessel(mmsi)
return jsonify({
'success': True,
'mmsi': mmsi,
'count': len(contacts),
'contacts': contacts
})
@app.route('/api/v1/contacts/search', methods=['GET'])
@require_auth
@rate_limit(max_calls=20, period=60)
def search_contacts():
"""
Search contacts (owners, operators, agents, brokers)
Temporarily free — full data for all users.
"""
query = request.args.get('q', '')
contact_type = request.args.get('type', '')
contacts = db.search_contacts(
query=query if query else None,
contact_type=contact_type if contact_type else None,
limit=20
)
return jsonify({
'success': True,
'paid_access': True,
'count': len(contacts),
'contacts': contacts
})
# =============================================================================
# DEMURRAGE CALCULATOR
# =============================================================================
@app.route('/api/v1/demurrage/calculate', methods=['POST'])
@rate_limit(max_calls=20, period=60)
def calculate_demurrage():
"""
Calculate demurrage
Formula: (actual_days - agreed_days) × daily_rate
"""
data = request.get_json(silent=True) or {}
agreed_days = data.get('agreed_days', 0)
actual_days = data.get('actual_days', 0)
daily_rate = data.get('daily_rate', 0)
currency = data.get('currency', 'USD')
delay_days = max(0, actual_days - agreed_days)
total = delay_days * daily_rate
return jsonify({
'success': True,
'calculation': {
'agreed_days': agreed_days,
'actual_days': actual_days,
'delay_days': delay_days,
'daily_rate': daily_rate,
'currency': currency,
'total_demurrage': total,
'formula': f"({actual_days} - {agreed_days}) × {daily_rate} = {total} {currency}"
}
})
# =============================================================================
# SUBSCRIPTION ENDPOINTS
# =============================================================================
@app.route('/api/v1/subscription/plans', methods=['GET'])
def get_subscription_plans():
"""Return available subscription plans."""
return jsonify({
'success': True,
'plans': db.SUBSCRIPTION_PLANS,
})
@app.route('/api/v1/subscription/upgrade', methods=['POST'])
@require_auth
@rate_limit(max_calls=5, period=60)
def upgrade_subscription():
"""Upgrade user's subscription plan. Charges balance."""
data = request.get_json(silent=True) or {}
plan_key = (data.get('plan') or '').lower().strip()
lang = data.get('lang', 'en')
if plan_key not in db.SUBSCRIPTION_PLANS:
return jsonify({'success': False, 'error': f'Unknown plan: {plan_key}'}), 400
plan = db.SUBSCRIPTION_PLANS[plan_key]
price = plan.get('price_monthly', 0)
# Free plan — just set it
if price == 0:
db.set_user_plan(request.user['id'], plan_key)
user = db.get_user_by_id(request.user['id'])
return jsonify({'success': True, 'user': user_dict(user), 'message': 'Plan set to free'})
# Admin gets free upgrade
if request.user.get('is_admin'):
db.set_user_plan(request.user['id'], plan_key)
user = db.get_user_by_id(request.user['id'])
return jsonify({'success': True, 'user': user_dict(user), 'message': f'Admin upgraded to {plan_key}'})
# Charge from balance
if not db.charge_user(request.user['id'], price):
return jsonify({
'success': False,
'error': f'Insufficient balance. Need ${price} USDT.',
'balance': request.user['balance'],
'required': price,
}), 400
try:
db.set_user_plan(request.user['id'], plan_key)
db.add_service_charge(request.user['id'], 'subscription_upgrade', price, f'plan={plan_key}')
user = db.get_user_by_id(request.user['id'])
except Exception as e:
# Refund on failure
logger.error(f"Subscription upgrade error, refunding: {e}")
db.update_user_balance(request.user['id'], price)
return jsonify({'success': False, 'error': 'Upgrade failed. Balance refunded.'}), 500
return jsonify({
'success': True,
'user': user_dict(user),
'charged': price,
'message': f'Upgraded to {plan_key} plan for ${price}/month',
})
@app.route('/api/v1/subscription/status', methods=['GET'])
@require_auth
def subscription_status():
"""Get current subscription status."""
plan_key = request.user.get('plan', 'free')
plan_details = db.SUBSCRIPTION_PLANS.get(plan_key, db.SUBSCRIPTION_PLANS['free'])
return jsonify({
'success': True,
'plan': plan_key,
'plan_details': plan_details,
'expires_at': request.user.get('plan_expires_at'),
})
# =============================================================================
# STATS & HEALTH
# =============================================================================
@app.route('/api/v1/stats', methods=['GET'])
@rate_limit(max_calls=30, period=60)
def get_stats():
"""Get database statistics"""
stats = db.get_stats()
stats['api_version'] = APP_VERSION
stats['marinetraffic_api'] = mt.get_parser().has_api_key()
return jsonify({
'success': True,
'stats': stats
})
@app.route('/health', methods=['GET'])
def health():
"""Public health check — minimal info only (HIGH-001 fix)"""
try:
db.get_stats()
return jsonify({'status': 'ok', 'version': config.APP_VERSION})
except Exception:
return jsonify({'status': 'degraded'}), 503
@app.route('/health/detail', methods=['GET'])
@require_auth
def health_detail():
"""Detailed health — admin only"""
if not request.user.get('is_admin'):
return jsonify({'error': 'Admin access required'}), 403
result = {
'status': 'ok',
'service': 'SeaFare API',
'version': APP_VERSION,
'timestamp': datetime.utcnow().isoformat()
}
try:
stats = db.get_stats()
result['db'] = 'ok'
result['users'] = stats.get('users', 0)
except Exception as e:
result['db'] = f'error: {type(e).__name__}: {e}'
result['status'] = 'degraded'
result['db_mode'] = 'postgres' if db.USE_POSTGRES else 'sqlite'
if os.environ.get('DATABASE_URL') and not db.USE_POSTGRES:
result['db_mode'] = 'sqlite (pg fallback)'
if db.PG_FALLBACK_ERROR:
result['pg_error'] = db.PG_FALLBACK_ERROR
try:
from ais_provider import get_provider
p = get_provider()
result['ais'] = p.get_status()
except Exception:
result['ais'] = 'unavailable'
return jsonify(result)
@app.route('/api/v1/admin/ais-test')
def ais_test():
"""Diagnostic: test AISStream sync fallback + raw WebSocket. Admin only."""
user = get_current_user()
if not user or not user.get('is_admin'):
return jsonify({'error': 'admin access required'}), 403
mode = request.args.get('mode', 'raw') # raw | fallback
result = {'mode': mode}
try:
from ais_provider import get_provider
p = get_provider()
if mode == 'fallback' and p.aisstream:
# Test the sync fallback path used for real queries
bbox = [[35, 10], [42, 25]] # Eastern Mediterranean
result['step'] = 'calling query_area_sync (6s)...'
vessels = p.aisstream.query_area_sync(bbox[0][0], bbox[0][1],
bbox[1][0], bbox[1][1],
max_seconds=6)
result['vessels_found'] = len(vessels)
result['positions_total'] = p.aisstream._positions_received
if vessels:
result['sample'] = {
'mmsi': vessels[0].get('mmsi'),
'lat': vessels[0].get('latitude'),
'lon': vessels[0].get('longitude'),
}
result['success'] = len(vessels) > 0
else:
# Raw websocket test
import websocket as ws_lib
api_key = os.environ.get('AISSTREAM_API_KEY', '')
result['key_set'] = bool(api_key)
result['key_length'] = len(api_key)
ws = ws_lib.create_connection(
'wss://stream.aisstream.io/v0/stream', timeout=15)
try:
sub = {
"APIKey": api_key,
"BoundingBoxes": [[[35, 10], [40, 20]]],
"FilterMessageTypes": ["PositionReport"]
}
ws.send(json.dumps(sub))
ws.settimeout(10)
raw = ws.recv()
data = json.loads(raw)
if 'error' in data:
result['step'] = f"API error: {data['error']}"
result['success'] = False
else:
msg_type = data.get('MessageType', '?')
mmsi = data.get('MetaData', {}).get('MMSI', '?')
result['step'] = f"received: {msg_type} MMSI={mmsi}"
result['success'] = True
finally:
ws.close()
except Exception as e:
result['error'] = f"{type(e).__name__}: {e}"
result['success'] = False
return jsonify(result)
# =============================================================================
# ADMIN MANAGEMENT
# =============================================================================
@app.route('/api/v1/admin/grant', methods=['POST'])
@rate_limit(max_calls=10, period=60)
@require_auth
def grant_admin():
"""Grant admin to a user by email (only existing admins can do this)"""
if not request.user.get('is_admin'):
return jsonify({'success': False, 'error': 'Admin access required'}), 403
data = request.get_json()
email = (data.get('email') or '').strip().lower()
if not email:
return jsonify({'success': False, 'error': 'Email required'}), 400
target = db.get_user_by_email(email)
if not target:
return jsonify({'success': False, 'error': 'User not found'}), 404
db.set_user_admin(target['id'], True)
return jsonify({'success': True, 'email': email, 'is_admin': True})
@app.route('/api/v1/admin/revoke', methods=['POST'])
@rate_limit(max_calls=10, period=60)
@require_auth
def revoke_admin():
"""Revoke admin from a user by email (only existing admins can do this)"""
if not request.user.get('is_admin'):
return jsonify({'success': False, 'error': 'Admin access required'}), 403
data = request.get_json()
email = (data.get('email') or '').strip().lower()
if not email:
return jsonify({'success': False, 'error': 'Email required'}), 400
target = db.get_user_by_email(email)
if not target:
return jsonify({'success': False, 'error': 'User not found'}), 404
db.set_user_admin(target['id'], False)
return jsonify({'success': True, 'email': email, 'is_admin': False})
@app.route('/api/v1/purchased-contacts', methods=['GET'])
@require_auth
def get_purchased_contacts():
"""Get user's purchased (unlocked) contacts."""
contacts = db.get_purchased_contacts(request.user['id'])
return jsonify({'success': True, 'contacts': contacts})
@app.route('/api/v1/admin/revenue', methods=['GET'])
@require_auth
def admin_revenue():
"""Platform revenue dashboard — service charges, deposits, balances."""
if not request.user.get('is_admin'):
return jsonify({'success': False, 'error': 'Admin access required'}), 403
stats = db.get_revenue_stats()
return jsonify({'success': True, **stats})
@app.route('/api/v1/admin/costs', methods=['GET'])
@require_auth
@rate_limit(max_calls=10, period=60)
def admin_costs():
"""AI API cost dashboard — token usage, cache efficiency, daily trends."""
if not request.user.get('is_admin'):
return jsonify({'success': False, 'error': 'Admin access required'}), 403
try:
days = request.args.get('days', 30, type=int)
stats = db.get_cost_stats(days=min(max(days, 1), 90))
return jsonify({'success': True, **stats})
except Exception as e:
logger.error(f"admin_costs error: {e}")
return jsonify({'success': False, 'error': f'Failed to load cost stats: {e}'}), 500
# =============================================================================
# KEEP-ALIVE (prevents Render free tier from sleeping after 15 min)
# =============================================================================
def _start_keep_alive():
"""Background thread that pings /health every 10 minutes.
Only runs on Render (RENDER env var is set automatically by Render).
Keeps the free-tier instance from sleeping (spins down after 15 min inactivity).
Single-instance: only one keep-alive thread per process."""
if os.environ.get('_KEEPALIVE_STARTED'):
return
os.environ['_KEEPALIVE_STARTED'] = '1'
import threading
import urllib.request
render_url = os.environ.get(config.ENV_RENDER_EXTERNAL_URL)
if not render_url:
return # Not on Render — skip
health_url = f"{render_url}/health"
def ping_loop():
import time
while True:
time.sleep(10 * 60) # 10 minutes (well under 15-min timeout)
try:
resp = urllib.request.urlopen(health_url, timeout=15)
body = resp.read().decode('utf-8', errors='replace')
if 'degraded' in body:
logger.warning("Keep-alive: health check returned degraded status")
except Exception as e:
logger.warning(f"Keep-alive ping failed: {e}")
t = threading.Thread(target=ping_loop, daemon=True)
t.start()
logger.info(f"Keep-alive started: pinging {health_url} every 10 min")
_start_keep_alive()
# =============================================================================
# =============================================================================
# VESSEL COLLECTOR — background Med/Baltic/Caspian data enrichment
# =============================================================================
def _start_vessel_collector():
"""Start background vessel data collector."""
try:
from vessel_collector import start_collector
start_collector()
except ImportError:
logger.info("Vessel collector: module not found — skipped")
except Exception as e:
logger.error(f"Vessel collector error: {e}")
_start_vessel_collector()
# =============================================================================
# AIS PROVIDER (real-time vessel tracking)
# =============================================================================
def _start_ais_provider():
"""Disabled — positions come from mt_position_updater service (MarineTraffic tiles).
AISStream/AISHub/Digitraffic removed to prevent DB bloat and reconnection storms.
"""
pass
# _start_ais_provider() # DISABLED: DB-only positions via mt_position_updater
def _start_position_cleanup():
"""Periodic cleanup of old AIS positions."""
import threading as _threading
def cleanup_loop():
while True:
_time_mod.sleep(config.POSITION_CLEANUP_INTERVAL_HOURS * 3600)
try:
deleted = db.cleanup_old_positions(keep_hours=config.POSITION_KEEP_HOURS)
if deleted:
logger.info(f"Position cleanup: removed {deleted} old records")
except Exception as e:
logger.error(f"Position cleanup error: {e}")
t = _threading.Thread(target=cleanup_loop, daemon=True, name="pos-cleanup")
t.start()
_start_position_cleanup()
# =============================================================================
# GRACEFUL SHUTDOWN HANDLER
# =============================================================================
_shutdown_in_progress = False
def _graceful_shutdown(signum, frame):
"""Handle SIGTERM/SIGINT: close DB pool, log, and exit cleanly."""
global _shutdown_in_progress
if _shutdown_in_progress:
return # Prevent double-shutdown
_shutdown_in_progress = True
sig_name = signal.Signals(signum).name if hasattr(signal, 'Signals') else str(signum)
logger.info(f"Shutting down gracefully... (received {sig_name})")
# 1. Close database connection pool
try:
db.close_pool()
except Exception as e:
logger.error(f"Error closing DB pool during shutdown: {e}")
# 2. Stop AIS provider if possible
try:
from ais_provider import stop_ais_provider
stop_ais_provider()
logger.info("AIS provider stopped")
except (ImportError, AttributeError):
pass # stop_ais_provider may not exist
except Exception as e:
logger.error(f"Error stopping AIS provider: {e}")
logger.info("Shutdown complete")
sys.exit(0)
# Register signal handlers
# SIGTERM: sent by Render/Docker/systemd on deploy or scale-down
# SIGINT: sent by Ctrl+C in local dev
signal.signal(signal.SIGTERM, _graceful_shutdown)
signal.signal(signal.SIGINT, _graceful_shutdown)
# =============================================================================
# MAIN
# =============================================================================
@app.errorhandler(404)
def not_found(e):
return jsonify({'success': False, 'error': 'Not found'}), 404
@app.errorhandler(405)
def method_not_allowed(e):
return jsonify({'success': False, 'error': 'Method not allowed'}), 405
# =============================================================================
if __name__ == '__main__':
port = int(os.environ.get(config.ENV_SEAFARE_PORT, config.DEFAULT_PORT))
logger.info(f"SeaFare Montana API running on http://localhost:{port}")
logger.info("Open in browser to chat!")
app.run(host='0.0.0.0', port=port, debug=os.environ.get(config.ENV_FLASK_DEBUG, '').lower() == 'true')