montana/Русский/Логистика/maritime_db.py

3784 lines
145 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Maritime Database Schema and Operations
SeaFare_Montana Data Layer
Supports PostgreSQL (production) and SQLite (development).
Wallet private keys are encrypted at rest via Fernet (AES-128-CBC).
Ɉ MONTANA PROTOCOL ML-DSA-65 (FIPS 204)
"""
import os
import re
import json
import base64
import hashlib
import logging
import sqlite3
import threading
from datetime import datetime, timedelta
from pathlib import Path
logger = logging.getLogger('maritime_db')
import config
# =============================================================================
# DATABASE BACKEND DETECTION
# =============================================================================
DATABASE_URL = (os.environ.get(config.ENV_DATABASE_URL) or '').strip() or None
USE_POSTGRES = bool(DATABASE_URL)
PG_FALLBACK_ERROR = None # Stores the reason PG failed (for /health diagnostics)
if USE_POSTGRES:
import psycopg2
import psycopg2.extras
import psycopg2.pool
import psycopg2.extensions
from psycopg2.pool import ThreadedConnectionPool
# Make psycopg2 return strings instead of Python datetime/Decimal objects.
# This ensures all query results are JSON-serializable without custom encoders.
def _cast_to_str(value, cursor):
return str(value) if value is not None else None
def _cast_numeric(value, cursor):
return float(value) if value is not None else None
# OIDs: 1114=TIMESTAMP, 1184=TIMESTAMPTZ, 1082=DATE, 1083=TIME, 1266=TIMETZ
for _oid, _name in [(1114, 'TS'), (1184, 'TSTZ'), (1082, 'DT'), (1083, 'TM'), (1266, 'TMTZ')]:
psycopg2.extensions.register_type(
psycopg2.extensions.new_type((_oid,), f'_STR_{_name}', _cast_to_str))
# OID 1700=NUMERIC → float
psycopg2.extensions.register_type(
psycopg2.extensions.new_type((1700,), '_FLOAT_NUM', _cast_numeric))
DB_PATH = Path(__file__).parent / "data" / "maritime.db"
# PostgreSQL connection pool (lazy-initialized on first get_connection() call)
_pg_pool = None
_pg_pool_lock = threading.Lock()
# =============================================================================
# POSTGRESQL ↔ SQLITE COMPATIBILITY WRAPPERS
# =============================================================================
class _PgCursorWrapper:
"""Wraps psycopg2 cursor: ? → %s placeholders, lastrowid via RETURNING."""
def __init__(self, cursor):
self._cursor = cursor
self._last_id = None
def execute(self, sql, params=None):
sql = sql.replace('?', '%s')
stripped = sql.strip().upper()
# Auto-add RETURNING id for plain INSERT statements
if stripped.startswith('INSERT') and 'RETURNING' not in stripped:
sql = sql.rstrip().rstrip(';') + ' RETURNING id'
self._cursor.execute(sql, params or ())
try:
row = self._cursor.fetchone()
self._last_id = row[0] if row else None
except Exception:
self._last_id = None
else:
self._last_id = None
self._cursor.execute(sql, params or ())
def fetchone(self):
return self._cursor.fetchone()
def fetchall(self):
return self._cursor.fetchall()
@property
def lastrowid(self):
return self._last_id
@property
def rowcount(self):
return self._cursor.rowcount
class _PgConnectionWrapper:
"""Wraps psycopg2 connection for SQLite-compatible API.
When backed by a connection pool, close() returns the connection
to the pool instead of destroying it."""
def __init__(self, conn, pool=None):
self._conn = conn
self._pool = pool
def cursor(self):
return _PgCursorWrapper(
self._conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
)
def execute(self, sql, params=None):
cursor = self.cursor()
cursor.execute(sql, params)
return cursor
def commit(self):
self._conn.commit()
def rollback(self):
self._conn.rollback()
def close(self):
if self._pool is not None:
# Rollback any open transaction before returning to pool
# (prevents "idle in transaction" state in pool)
try:
self._conn.rollback()
except Exception:
pass
self._pool.putconn(self._conn)
else:
self._conn.close()
def _make_external_url(url):
"""Convert Render internal DB URL to external if internal DNS fails.
Internal: postgresql://user:pass@dpg-xxx-a/db
External: postgresql://user:pass@dpg-xxx-a.oregon-postgres.render.com/db?sslmode=require
"""
from urllib.parse import urlparse, urlunparse
parsed = urlparse(url)
host = parsed.hostname or ''
# Only convert Render internal hostnames (dpg-xxx-a pattern without dots)
if host.startswith('dpg-') and '.' not in host:
# Try common Render regions
for region in ['oregon-postgres.render.com', 'ohio-postgres.render.com',
'frankfurt-postgres.render.com', 'singapore-postgres.render.com']:
ext_host = f"{host}.{region}"
new_netloc = parsed.netloc.replace(host, ext_host)
ext_url = urlunparse(parsed._replace(netloc=new_netloc))
# External connections require SSL
if '?' not in ext_url:
ext_url += '?sslmode=require'
elif 'sslmode' not in ext_url:
ext_url += '&sslmode=require'
try:
import socket
socket.getaddrinfo(ext_host, parsed.port or 5432)
logger.info(f"Resolved external PostgreSQL host: {ext_host}")
return ext_url
except socket.gaierror:
continue
return None
def _fallback_to_sqlite(reason='unknown'):
"""Switch from PostgreSQL to SQLite when PG is completely unavailable.
SQLite data is ephemeral on Render (lost on redeploy) but keeps the app functional."""
global USE_POSTGRES, PG_FALLBACK_ERROR
USE_POSTGRES = False
PG_FALLBACK_ERROR = str(reason)
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
logger.warning("FALLBACK: Switched to SQLite mode (PostgreSQL unavailable)")
try:
init_db()
logger.info("SQLite database initialized successfully")
except sqlite3.DatabaseError as e:
if any(x in str(e).lower() for x in ('malformed', 'corrupt', 'disk i/o')):
logger.warning(f"SQLite corrupted during fallback: {e}. Recreating...")
try:
DB_PATH.unlink(missing_ok=True)
except Exception:
pass
try:
init_db()
logger.info("SQLite recreated after corruption during fallback")
except Exception as e2:
logger.error(f"SQLite re-init also failed: {e2}")
else:
logger.error(f"SQLite init failed: {e}")
except Exception as e:
logger.error(f"SQLite init failed: {e}")
def _get_pg_pool():
"""Lazily initialize the PostgreSQL ThreadedConnectionPool (thread-safe)."""
global _pg_pool
if _pg_pool is not None:
return _pg_pool
with _pg_pool_lock:
# Double-check after acquiring lock
if _pg_pool is not None:
return _pg_pool
url = DATABASE_URL
if url.startswith('postgres://'):
url = url.replace('postgres://', 'postgresql://', 1)
# Detect pooler endpoints (Neon, Supabase) — they don't support startup options
is_pooler = '-pooler' in url or 'pgbouncer' in url
pool_opts = {} if is_pooler else {'options': '-c statement_timeout=10000'}
# Try internal URL first, fall back to external if DNS fails
try:
_pg_pool = ThreadedConnectionPool(
minconn=config.PG_POOL_MIN_CONN,
maxconn=config.PG_POOL_MAX_CONN,
dsn=url,
connect_timeout=5,
**pool_opts,
)
return _pg_pool
except psycopg2.OperationalError as e:
err_msg = str(e)
if 'unsupported startup parameter' in err_msg and not is_pooler:
# Retry without options for pooler compatibility
logger.warning("Retrying PostgreSQL without startup options (pooler detected)")
_pg_pool = ThreadedConnectionPool(
minconn=config.PG_POOL_MIN_CONN,
maxconn=config.PG_POOL_MAX_CONN,
dsn=url,
connect_timeout=5,
)
return _pg_pool
if 'could not translate host name' in err_msg or 'Name or service not known' in err_msg:
logger.warning(f"Internal PostgreSQL DNS failed, trying external URL: {e}")
ext_url = _make_external_url(url)
if ext_url:
try:
_pg_pool = ThreadedConnectionPool(
minconn=config.PG_POOL_MIN_CONN,
maxconn=config.PG_POOL_MAX_CONN,
dsn=ext_url,
connect_timeout=10,
)
logger.info("Connected to PostgreSQL via external URL")
return _pg_pool
except psycopg2.OperationalError as ext_e:
logger.error(f"External PostgreSQL also failed: {ext_e}")
raise
def close_pool():
"""Close the PostgreSQL connection pool (if it exists).
Safe to call on SQLite (no-op). Used during graceful shutdown."""
global _pg_pool
with _pg_pool_lock:
if _pg_pool is not None:
try:
_pg_pool.closeall()
logger.info("PostgreSQL connection pool closed")
except Exception as e:
logger.error(f"Error closing PostgreSQL pool: {e}")
finally:
_pg_pool = None
else:
logger.info("No connection pool to close (SQLite mode or pool not initialized)")
def _reset_pg_pool():
"""Reset the PostgreSQL connection pool (e.g., after OperationalError)."""
global _pg_pool
with _pg_pool_lock:
if _pg_pool is not None:
try:
_pg_pool.closeall()
except Exception:
pass
_pg_pool = None
logger.warning("PostgreSQL connection pool reset (will reconnect on next query)")
def get_connection():
"""Get database connection (PostgreSQL in production, SQLite locally).
PostgreSQL connections come from a ThreadedConnectionPool (min=1, max=5).
If PostgreSQL is completely unreachable, auto-falls back to SQLite.
Callers MUST call .close() to return the connection to the pool."""
if USE_POSTGRES:
for attempt in range(2):
try:
pool = _get_pg_pool()
except Exception as e:
logger.error(f"PostgreSQL pool creation failed (attempt {attempt+1}): {e}")
if attempt == 0:
_reset_pg_pool()
continue
# All PostgreSQL attempts exhausted — fall back to SQLite
logger.error("PostgreSQL completely unreachable, falling back to SQLite")
_fallback_to_sqlite(reason=f"pool creation failed: {e}")
return _get_sqlite_connection()
try:
conn = pool.getconn()
# Verify connection is alive (fast check)
conn.cursor().execute("SELECT 1")
conn.rollback() # Close the implicit transaction from health check
return _PgConnectionWrapper(conn, pool=pool)
except psycopg2.pool.PoolError as e:
if attempt == 0:
_reset_pg_pool()
continue
logger.error("PostgreSQL pool exhausted, falling back to SQLite")
_fallback_to_sqlite(reason=f"pool exhausted: {e}")
return _get_sqlite_connection()
except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
logger.warning(f"PostgreSQL connection failed (attempt {attempt+1}): {e}")
try:
pool.putconn(conn, close=True)
except Exception:
pass
if attempt == 0:
_reset_pg_pool()
continue
# All retries exhausted — fall back to SQLite
logger.error("PostgreSQL connection failed after retries, falling back to SQLite")
_fallback_to_sqlite(reason=f"connection failed: {e}")
return _get_sqlite_connection()
return _get_sqlite_connection()
def _get_sqlite_connection():
"""Get a SQLite connection (used as primary in dev or fallback in production).
Auto-heals corrupted databases by deleting and recreating them."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
# Quick integrity check — detect corruption early
try:
conn.execute("SELECT 1")
except sqlite3.DatabaseError as e:
if any(x in str(e).lower() for x in ('malformed', 'corrupt', 'disk i/o')):
logger.warning(f"SQLite DB corrupted: {e}. Deleting and recreating...")
try:
conn.close()
except Exception:
pass
try:
DB_PATH.unlink(missing_ok=True)
except Exception as del_e:
logger.error(f"Failed to delete corrupted DB: {del_e}")
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
try:
init_db()
logger.info("SQLite database recreated after corruption")
except Exception as init_e:
logger.error(f"Failed to reinitialize DB: {init_e}")
else:
raise
return conn
def _adapt_schema(sql):
"""Adapt CREATE TABLE SQL for PostgreSQL."""
if not USE_POSTGRES:
return sql
sql = re.sub(
r'INTEGER\s+PRIMARY\s+KEY\s+AUTOINCREMENT',
'SERIAL PRIMARY KEY',
sql,
flags=re.IGNORECASE,
)
return sql
# =============================================================================
# WALLET KEY ENCRYPTION (Fernet / AES-128-CBC)
# =============================================================================
def _get_encryption_key():
"""Derive Fernet key from SECRET_KEY env var."""
secret = os.environ.get(config.ENV_SECRET_KEY) or os.environ.get(config.ENV_ANTHROPIC_API_KEY)
if not secret:
raise RuntimeError("SECRET_KEY or ANTHROPIC_API_KEY must be set for wallet encryption")
key = hashlib.sha256(secret.encode()).digest()
return base64.urlsafe_b64encode(key)
def _encrypt_private_key(plaintext: str) -> str:
"""Encrypt private key for storage. Returns 'enc:<ciphertext>'."""
from cryptography.fernet import Fernet
f = Fernet(_get_encryption_key())
return 'enc:' + f.encrypt(plaintext.encode()).decode()
def _decrypt_private_key(stored: str) -> str:
"""Decrypt private key from storage. Handles legacy unencrypted keys."""
if not stored or not stored.startswith('enc:'):
return stored # Legacy unencrypted key
from cryptography.fernet import Fernet
f = Fernet(_get_encryption_key())
return f.decrypt(stored[4:].encode()).decode()
# =============================================================================
# SCHEMA INITIALIZATION
# =============================================================================
def init_db():
"""Initialize database schema."""
conn = get_connection()
try:
if not USE_POSTGRES:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
cursor = conn.cursor()
# Vessels table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS vessels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mmsi TEXT,
imo TEXT,
name TEXT,
type TEXT,
type_code INTEGER,
flag TEXT,
flag_code TEXT,
callsign TEXT,
length REAL,
width REAL,
draught REAL,
gross_tonnage INTEGER,
deadweight INTEGER,
year_built INTEGER,
status TEXT,
owner TEXT,
owner_country TEXT,
operator TEXT,
operator_country TEXT,
companies_json TEXT,
engine_type TEXT,
speed_max REAL,
website TEXT,
beneficial_owner TEXT,
beneficial_owner_country TEXT,
registered_owner TEXT,
registered_owner_country TEXT,
commercial_manager TEXT,
commercial_manager_country TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source TEXT DEFAULT 'internal'
)
"""))
# Migration: add missing columns
if USE_POSTGRES:
cursor.execute(
"SELECT column_name FROM information_schema.columns WHERE table_name = 'vessels'"
)
existing_cols = {row[0] for row in cursor.fetchall()}
else:
cursor.execute("PRAGMA table_info(vessels)")
existing_cols = {row[1] for row in cursor.fetchall()}
for col_name, col_def in [('companies_json', 'TEXT'), ('status', 'TEXT')]:
if col_name not in existing_cols:
try:
if USE_POSTGRES:
cursor.execute(f"ALTER TABLE vessels ADD COLUMN IF NOT EXISTS {col_name} {col_def}")
else:
cursor.execute(f"ALTER TABLE vessels ADD COLUMN {col_name} {col_def}")
except Exception:
pass
# Positions table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mmsi TEXT NOT NULL,
latitude REAL NOT NULL,
longitude REAL NOT NULL,
speed REAL,
course REAL,
heading REAL,
status TEXT,
destination TEXT,
eta TEXT,
timestamp TIMESTAMP NOT NULL,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Contacts table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
company_name TEXT NOT NULL,
country TEXT,
city TEXT,
address TEXT,
phone TEXT,
email TEXT,
website TEXT,
contact_person TEXT,
notes TEXT,
vessels_mmsi TEXT,
source TEXT,
verified INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Port calls
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS port_calls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mmsi TEXT NOT NULL,
port_name TEXT NOT NULL,
port_code TEXT,
country TEXT,
arrival TIMESTAMP,
departure TIMESTAMP,
berth TEXT,
cargo_type TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Demurrage records
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS demurrage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mmsi TEXT NOT NULL,
port_name TEXT NOT NULL,
agreed_days REAL NOT NULL,
actual_days REAL NOT NULL,
daily_rate REAL,
currency TEXT DEFAULT 'USD',
status TEXT DEFAULT 'pending',
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Users table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
lang TEXT NOT NULL DEFAULT 'en',
balance REAL NOT NULL DEFAULT 0.0,
is_admin INTEGER NOT NULL DEFAULT 0,
plan TEXT NOT NULL DEFAULT 'free',
plan_expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP
)
"""))
# Migration: add new columns to existing users table
if USE_POSTGRES:
cursor.execute(
"SELECT column_name FROM information_schema.columns WHERE table_name = 'users'"
)
user_cols = {row[0] for row in cursor.fetchall()}
else:
cursor.execute("PRAGMA table_info(users)")
user_cols = {row[1] for row in cursor.fetchall()}
for col_name, col_def in [
('is_admin', "INTEGER NOT NULL DEFAULT 0"),
('plan', "TEXT NOT NULL DEFAULT 'free'"),
('plan_expires_at', "TIMESTAMP"),
('telegram_chat_id', "INTEGER"),
]:
if col_name not in user_cols:
try:
if USE_POSTGRES:
cursor.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col_name} {col_def}")
else:
cursor.execute(f"ALTER TABLE users ADD COLUMN {col_name} {col_def}")
except Exception:
pass
# Sessions table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT NOT NULL UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
)
"""))
# Chat history table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS chat_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
role TEXT NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Migration: user_profiles needs 'id' column for PG wrapper RETURNING id
if USE_POSTGRES:
try:
cursor.execute("""
DO $$ BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='user_profiles')
AND NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='user_profiles' AND column_name='id')
THEN
ALTER TABLE user_profiles ADD COLUMN id SERIAL;
ALTER TABLE user_profiles DROP CONSTRAINT user_profiles_pkey;
ALTER TABLE user_profiles ADD PRIMARY KEY (id);
ALTER TABLE user_profiles ADD CONSTRAINT user_profiles_user_id_key UNIQUE (user_id);
END IF;
END $$;
""")
except Exception:
pass
# User profiles (maritime questionnaire)
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS user_profiles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
company_name TEXT NOT NULL DEFAULT '',
role TEXT NOT NULL DEFAULT '',
fleet_size INTEGER DEFAULT 0,
vessel_types TEXT NOT NULL DEFAULT '[]',
trade_routes TEXT NOT NULL DEFAULT '[]',
cargo_types TEXT NOT NULL DEFAULT '[]',
vessels_of_interest TEXT NOT NULL DEFAULT '[]',
experience_years INTEGER DEFAULT 0,
phone TEXT NOT NULL DEFAULT '',
telegram TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Migration: telegram_links needs 'id' column for PG wrapper RETURNING id
# Table only holds temp codes (30min TTL), safe to recreate
if USE_POSTGRES:
try:
cursor.execute("""
DO $$ BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='telegram_links')
AND NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='telegram_links' AND column_name='id')
THEN
DROP TABLE telegram_links;
END IF;
END $$;
""")
except Exception:
pass
# Telegram account linking codes
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS telegram_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT UNIQUE NOT NULL,
user_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
)
"""))
# Unique index on telegram_chat_id
try:
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_telegram_chat_id ON users(telegram_chat_id)")
except Exception:
pass
# Wallets table (private keys encrypted via Fernet)
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS wallets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL UNIQUE,
address TEXT NOT NULL UNIQUE,
private_key_hex TEXT NOT NULL,
network TEXT NOT NULL DEFAULT 'trc20',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Deposits table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS deposits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
tx_id TEXT NOT NULL UNIQUE,
amount REAL NOT NULL,
from_address TEXT NOT NULL DEFAULT '',
confirmed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Withdrawals table
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS withdrawals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
amount REAL NOT NULL,
to_address TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
tx_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP
)
"""))
# Service charges — revenue tracking
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS service_charges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
service TEXT NOT NULL,
amount REAL NOT NULL,
details TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Purchased contacts
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS purchased_contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
contact_data TEXT NOT NULL,
query TEXT,
contact_type TEXT,
amount_paid REAL NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Port vessel cache
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS port_vessel_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
port_key TEXT NOT NULL,
vessel_data TEXT NOT NULL,
vessel_count INTEGER DEFAULT 0,
cached_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Equasis cache (vessel details, search results)
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS equasis_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cache_key TEXT NOT NULL,
cache_type TEXT NOT NULL,
data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Equasis daily query counter
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS equasis_daily_counter (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date_str TEXT NOT NULL UNIQUE,
count INTEGER NOT NULL DEFAULT 0
)
"""))
# Query analytics log
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS query_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
query_text TEXT NOT NULL,
query_type TEXT,
tool_used TEXT,
response_time_ms INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
# Indexes
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_vessels_imo_unique ON vessels(imo) WHERE imo IS NOT NULL")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_vessels_mmsi_unique ON vessels(mmsi) WHERE mmsi IS NOT NULL")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_vessels_name ON vessels(name)")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_positions_mmsi_unique ON positions(mmsi)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_timestamp ON positions(timestamp)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_lat_lon ON positions(latitude, longitude)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_destination ON positions(destination)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_contacts_type ON contacts(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_port_calls_mmsi ON port_calls(mmsi)")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_token ON sessions(token)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_chat_history_user ON chat_history(user_id)")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_wallets_user ON wallets(user_id)")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_wallets_address ON wallets(address)")
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_deposits_tx ON deposits(tx_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_deposits_user ON deposits(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_withdrawals_user ON withdrawals(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_service_charges_user ON service_charges(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_purchased_contacts_user ON purchased_contacts(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_port_vessel_cache_key ON port_vessel_cache(port_key)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_equasis_cache_key ON equasis_cache(cache_key)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_query_log_user ON query_log(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_query_log_type ON query_log(query_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_mt_staging_lat_lon ON mt_bulk_staging(lat, lon)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_mt_staging_mmsi ON mt_bulk_staging(mmsi)")
# v3.21.0: Token/cost tracking columns for Ouroboros-style budget awareness
for col, coltype in [('input_tokens', 'INTEGER'), ('output_tokens', 'INTEGER'),
('cached_tokens', 'INTEGER'), ('cost_usd', 'REAL'),
('iterations', 'INTEGER')]:
try:
cursor.execute(f"ALTER TABLE query_log ADD COLUMN {col} {coltype}")
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
# Moltbook bot comments log
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS moltbook_comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id TEXT NOT NULL UNIQUE,
post_title TEXT,
comment_body TEXT,
commented_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_moltbook_post ON moltbook_comments(post_id)")
# Agent memory (3-tier: short-term=chat_history, working=user_memory, long-term=permanent+summaries)
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS user_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
entity TEXT,
is_permanent BOOLEAN DEFAULT FALSE,
relevance_score REAL DEFAULT 1.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_memory_user ON user_memory(user_id)")
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS conversation_summaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
summary TEXT NOT NULL,
message_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
cursor.execute("CREATE INDEX IF NOT EXISTS idx_conv_summaries_user ON conversation_summaries(user_id)")
# Vessel watches (Telegram bot: /watch command)
cursor.execute(_adapt_schema("""
CREATE TABLE IF NOT EXISTS vessel_watches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id BIGINT NOT NULL,
mmsi TEXT NOT NULL,
vessel_name TEXT,
watch_type TEXT NOT NULL,
destination_port TEXT,
destination_lat REAL,
destination_lon REAL,
port_radius_nm REAL DEFAULT 15,
last_status TEXT,
last_lat REAL,
last_lon REAL,
last_check TIMESTAMP,
notified INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
cursor.execute("CREATE INDEX IF NOT EXISTS idx_watches_active ON vessel_watches(notified, chat_id)")
# Verification codes for email confirmation (v3.32.0)
cursor.execute(_adapt_schema("""CREATE TABLE IF NOT EXISTS verification_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
code TEXT NOT NULL,
lang TEXT DEFAULT 'en',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL
)"""))
cursor.execute("CREATE INDEX IF NOT EXISTS idx_vc_email ON verification_codes(email)")
conn.commit()
# --- Migrations (each in its own transaction for PostgreSQL) ---
# Add 'source' column to positions (v2.2.0)
try:
cursor.execute("ALTER TABLE positions ADD COLUMN source TEXT DEFAULT 'unknown'")
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
# Add 'website' column to vessels (v3.19.0 — bulk carrier ownership scraper)
try:
cursor.execute("ALTER TABLE vessels ADD COLUMN website TEXT")
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
# Add 'home_port' column to user_profiles (v3.29.0)
try:
cursor.execute("ALTER TABLE user_profiles ADD COLUMN home_port TEXT NOT NULL DEFAULT ''")
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
# Add 'preferred_tonnage' column to user_profiles (v3.39.2)
try:
cursor.execute("ALTER TABLE user_profiles ADD COLUMN preferred_tonnage INTEGER DEFAULT 0")
except Exception:
pass
# Add 'search_radius' column to user_profiles (v3.40.7)
try:
cursor.execute("ALTER TABLE user_profiles ADD COLUMN search_radius INTEGER DEFAULT 0")
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
finally:
conn.close()
db_name = 'PostgreSQL' if USE_POSTGRES else f'SQLite: {DB_PATH}'
logger.info(f"Database initialized: {db_name}")
# Seed Caspian intelligence data (idempotent)
try:
_seed_caspian_data()
except Exception as e:
logger.warning(f"Caspian seed data: {e}")
# Build fuzzy vessel name index (background-safe)
try:
build_vessel_name_index()
except Exception as e:
logger.warning(f"Fuzzy index build: {e}")
return True
def _seed_caspian_data():
"""Seed Caspian Sea shipping companies and known vessels (idempotent)."""
conn = get_connection()
try:
cursor = conn.cursor()
# --- Caspian Shipping Companies ---
_CASPIAN_COMPANIES = [
{
'type': 'operator', 'company_name': 'Azerbaijan Caspian Shipping Company (ASCO)',
'country': 'Azerbaijan', 'city': 'Baku',
'address': 'Mikayil Useynov str. 2, AZ1003, Baku',
'phone': '+994 12 404 37 00', 'email': 'contact@asco.az',
'website': 'https://www.asco.az',
'notes': 'State-owned. 256 vessels (56 merchant + 200 offshore). Dominant Caspian operator. Ferries, tankers, dry cargo, Ro-Ro. Routes: Alat-Kuryk/Aktau, Alat-Turkmenbashi.',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'KazMorTransFlot (KMTF)',
'country': 'Kazakhstan', 'city': 'Aktau',
'address': 'Building 55, 13 Micro District, Aktau',
'phone': '+7 7292 535-890', 'email': 'info@kmtf.kz',
'website': 'https://www.kmtf.kz',
'notes': '100% KazMunayGas subsidiary. 18 vessels: 8 tankers, 3 dry cargo, 3 container, 4 tugs. Routes: Aktau/Kuryk-Baku, Trans-Caspian oil shuttle.',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'Caspian Integrated Maritime Solutions (CIMS)',
'country': 'Kazakhstan', 'city': 'Aktau',
'website': 'https://www.caspianims.com',
'notes': 'JV: AD Ports Group (UAE 51%) + KMTF (49%). 2 tankers: Liwa & Taraz (8000 DWT each, Damen). Kashagan/Tengiz crude via BTC pipeline.',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'Volga Shipping Company (Volgaflot)',
'country': 'Russia', 'city': 'Nizhny Novgorod',
'address': '15A Markin Square, Nizhny Novgorod, 603001',
'website': 'https://www.volgaflot.com',
'notes': 'UCL Holding (Vladimir Lisin). 236+ river-sea vessels, DWT >1.4M tons. 15+ mln tons/year. Volga-Caspian routes: grain, metals, fertilizers, timber.',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'Khazar Sea Shipping Lines (KSSL)',
'country': 'Iran', 'city': 'Bandar Anzali',
'address': 'Shahid Khomeini St., Bandar Anzali, Gilan, Iran',
'website': 'https://khazarshipping.ir',
'notes': 'IRISL subsidiary. 16-22 general cargo vessels. Offices: Astrakhan, Aktau, Baku, Turkmenbashi. Under EU/UK sanctions. Routes: Anzali-Astrakhan, Amirabad-Aktau.',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'Turkmen Marine Merchant Fleet',
'country': 'Turkmenistan', 'city': 'Turkmenbashi',
'website': 'https://tmrl.gov.tm',
'notes': 'State-owned CJSC. 20 vessels: tankers, dry cargo, ferries, passenger. Routes: Turkmenbashi-Alat, Turkmenbashi-Aktau/Kuryk. 3 new vessels under construction (OPEC Fund + IsDB).',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'Silver Shipping Caspian',
'country': 'Kazakhstan', 'city': 'Aktau',
'website': 'https://silver-caspian.com',
'notes': 'Private logistics operator. Barges, tugs, crew boats. Oversized/project cargo and offshore support.',
'source': 'caspian_intel', 'verified': 1,
},
{
'type': 'operator', 'company_name': 'Volgotanker',
'country': 'Russia', 'city': 'Samara',
'notes': 'Oil/product tanker operator. Inland waterway + Caspian coastal tankers. Volga River system to Caspian Sea.',
'source': 'caspian_intel', 'verified': 1,
},
]
for company in _CASPIAN_COMPANIES:
# Check if already exists (by company_name)
cursor.execute(
"SELECT id FROM contacts WHERE company_name = ? AND source = ?",
(company['company_name'], 'caspian_intel')
)
if cursor.fetchone():
continue # Already seeded
cols = {k: v for k, v in company.items() if k in _CONTACT_ALLOWED_COLS}
columns = ', '.join(cols.keys())
placeholders = ', '.join(['?' for _ in cols])
cursor.execute(f"INSERT INTO contacts ({columns}) VALUES ({placeholders})",
list(cols.values()))
# --- Khazar Shipping Fleet (known IMO numbers) ---
_KHAZAR_FLEET = [
{'imo': '8203608', 'name': 'SARINA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '8215742', 'name': 'SABRINA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '8605234', 'name': 'DORITA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '8721351', 'name': 'KASMA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9010711', 'name': 'VISTA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9010723', 'name': 'VIANA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9076478', 'name': 'PARIN', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9118551', 'name': 'PARAND', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9137210', 'name': 'PATRIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9137246', 'name': 'NARDIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9137258', 'name': 'KADOS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9245304', 'name': 'TARADIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9245316', 'name': 'PARMIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9367982', 'name': 'GILDA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9367994', 'name': 'SANIA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9368003', 'name': 'SARIR', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
{'imo': '9368015', 'name': 'SOMIA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
]
for vessel in _KHAZAR_FLEET:
cursor.execute("SELECT id FROM vessels WHERE imo = ?", (vessel['imo'],))
if cursor.fetchone():
continue # Already exists
vessel['source'] = 'caspian_intel'
vessel['created_at'] = datetime.utcnow().isoformat()
vessel['updated_at'] = datetime.utcnow().isoformat()
cols = {k: v for k, v in vessel.items() if k in _VESSEL_ALLOWED_COLS}
columns = ', '.join(cols.keys())
placeholders = ', '.join(['?' for _ in cols])
cursor.execute(f"INSERT INTO vessels ({columns}) VALUES ({placeholders})",
list(cols.values()))
conn.commit()
logger.info("Caspian seed data: 8 companies + 17 Khazar fleet vessels loaded")
except Exception as e:
try:
conn.rollback()
except Exception:
pass
raise
finally:
conn.close()
# =============================================================================
# VESSEL OPERATIONS
# =============================================================================
_VESSEL_ALLOWED_COLS = {
'mmsi', 'imo', 'name', 'type', 'type_code', 'flag', 'flag_code',
'callsign', 'length', 'width', 'draught', 'gross_tonnage', 'deadweight',
'year_built', 'status', 'owner', 'owner_country', 'operator',
'operator_country', 'companies_json', 'engine_type', 'speed_max',
'created_at', 'updated_at', 'source',
'website', # shipowner/operator company website (from MT Pro scraping)
'beneficial_owner', 'beneficial_owner_country',
'registered_owner', 'registered_owner_country',
'commercial_manager', 'commercial_manager_country',
}
_CONTACT_ALLOWED_COLS = {
'type', 'company_name', 'country', 'city', 'address', 'phone', 'email',
'website', 'contact_person', 'notes', 'vessels_mmsi', 'source', 'verified',
}
def upsert_vessel(data: dict) -> int:
"""Insert or update vessel by IMO or MMSI."""
conn = get_connection()
try:
cursor = conn.cursor()
data.pop('source', None)
data['updated_at'] = datetime.utcnow().isoformat()
if 'companies' in data:
data['companies_json'] = json.dumps(data.pop('companies'), ensure_ascii=False)
# Filter to allowed columns only (prevent SQL injection via dict keys)
data = {k: v for k, v in data.items() if k in _VESSEL_ALLOWED_COLS}
existing = None
if data.get('imo'):
cursor.execute("SELECT id FROM vessels WHERE imo = ?", (data['imo'],))
existing = cursor.fetchone()
if not existing and data.get('mmsi'):
cursor.execute("SELECT id FROM vessels WHERE mmsi = ?", (data['mmsi'],))
existing = cursor.fetchone()
if existing:
vessel_id = existing[0]
updates = ', '.join([f"{k}=?" for k in data.keys()])
cursor.execute(f"UPDATE vessels SET {updates} WHERE id = ?",
list(data.values()) + [vessel_id])
else:
if not data.get('created_at'):
data['created_at'] = datetime.utcnow().isoformat()
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
cursor.execute(f"INSERT INTO vessels ({columns}) VALUES ({placeholders})",
list(data.values()))
vessel_id = cursor.lastrowid
conn.commit()
return vessel_id
finally:
conn.close()
def get_vessel(mmsi: str = None, imo: str = None, name: str = None) -> dict:
"""Get vessel by MMSI, IMO, or name."""
conn = get_connection()
try:
cursor = conn.cursor()
if imo:
cursor.execute("SELECT * FROM vessels WHERE imo = ?", (imo,))
elif mmsi:
cursor.execute("SELECT * FROM vessels WHERE mmsi = ?", (mmsi,))
elif name:
cursor.execute("SELECT * FROM vessels WHERE name LIKE ?", (f"%{name}%",))
else:
return None
row = cursor.fetchone()
if not row:
return None
vessel = dict(row)
if vessel.get('companies_json'):
vessel['companies'] = json.loads(vessel['companies_json'])
vessel.pop('companies_json', None)
vessel.pop('source', None)
return vessel
finally:
conn.close()
def search_vessels(query: str, limit: int = 20) -> list:
"""Search vessels by name, MMSI, or IMO."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM vessels
WHERE name LIKE ? OR mmsi LIKE ? OR imo LIKE ?
ORDER BY updated_at DESC
LIMIT ?
""", (f"%{query}%", f"%{query}%", f"%{query}%", limit))
rows = cursor.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
# =============================================================================
# FUZZY VESSEL SEARCH (in-memory index + trigram matching)
# =============================================================================
import difflib as _difflib
_VESSEL_NAME_INDEX = {} # {normalized_name: (original_name, mmsi, imo)}
_VESSEL_TRIGRAMS = {} # {trigram: set(normalized_names)}
_FUZZY_INDEX_BUILT = False
def _normalize_vessel_name(name: str) -> str:
"""Normalize vessel name for indexing: uppercase, strip punctuation, collapse spaces."""
if not name:
return ''
n = name.upper().strip()
n = re.sub(r'[^\w\s]', '', n)
n = re.sub(r'\s+', ' ', n).strip()
return n
def _make_trigrams(s: str) -> set:
"""Generate character trigrams from string."""
if len(s) < 3:
return {s} if s else set()
return {s[i:i+3] for i in range(len(s) - 2)}
def build_vessel_name_index():
"""Build in-memory vessel name index for fuzzy search.
Queries vessels + mt_bulk_staging tables. ~200ms, ~4MB RAM for 41K vessels."""
global _VESSEL_NAME_INDEX, _VESSEL_TRIGRAMS, _FUZZY_INDEX_BUILT
idx = {}
trigrams = {}
conn = get_connection()
try:
cursor = conn.cursor()
# From vessels table
try:
cursor.execute("SELECT name, mmsi, imo FROM vessels WHERE name IS NOT NULL AND name != ''")
for row in cursor.fetchall():
r = dict(row)
norm = _normalize_vessel_name(r['name'])
if norm and len(norm) >= 2:
idx[norm] = (r['name'], r.get('mmsi', ''), r.get('imo', ''))
for tri in _make_trigrams(norm):
trigrams.setdefault(tri, set()).add(norm)
except Exception as e:
logger.debug(f"Fuzzy index: vessels table error: {e}")
# From mt_bulk_staging table
try:
cursor.execute("SELECT name, mmsi, imo FROM mt_bulk_staging WHERE name IS NOT NULL AND name != ''")
for row in cursor.fetchall():
r = dict(row)
norm = _normalize_vessel_name(r['name'])
if norm and len(norm) >= 2 and norm not in idx:
idx[norm] = (r['name'], r.get('mmsi', ''), r.get('imo', ''))
for tri in _make_trigrams(norm):
trigrams.setdefault(tri, set()).add(norm)
except Exception as e:
logger.debug(f"Fuzzy index: mt_bulk_staging error: {e}")
finally:
conn.close()
_VESSEL_NAME_INDEX = idx
_VESSEL_TRIGRAMS = trigrams
_FUZZY_INDEX_BUILT = True
logger.info(f"Fuzzy vessel index built: {len(idx)} names, {len(trigrams)} trigrams")
def fuzzy_search_vessel(query: str, threshold: float = 0.65, limit: int = 5) -> list:
"""Fuzzy search vessel by name using trigram pre-filter + difflib scoring.
Returns: [(original_name, mmsi, imo, score), ...] sorted by score desc. ~10-20ms."""
if not _FUZZY_INDEX_BUILT:
try:
build_vessel_name_index()
except Exception:
return []
norm_q = _normalize_vessel_name(query)
if not norm_q or len(norm_q) < 2:
return []
# Exact match shortcut
if norm_q in _VESSEL_NAME_INDEX:
entry = _VESSEL_NAME_INDEX[norm_q]
return [(entry[0], entry[1], entry[2], 1.0)]
# Trigram pre-filter: find candidates sharing >= 2 trigrams
q_trigrams = _make_trigrams(norm_q)
candidate_counts = {}
for tri in q_trigrams:
for name in _VESSEL_TRIGRAMS.get(tri, set()):
candidate_counts[name] = candidate_counts.get(name, 0) + 1
# Need at least 2 shared trigrams (or 1 for short queries)
min_shared = 1 if len(norm_q) < 5 else 2
candidates = [n for n, c in candidate_counts.items() if c >= min_shared]
# Score candidates with SequenceMatcher
scored = []
matcher = _difflib.SequenceMatcher(None, norm_q, '')
for cand in candidates:
matcher.set_seq2(cand)
score = matcher.ratio()
if score >= threshold:
entry = _VESSEL_NAME_INDEX[cand]
scored.append((entry[0], entry[1], entry[2], round(score, 3)))
scored.sort(key=lambda x: x[3], reverse=True)
return scored[:limit]
# Country name → ISO 2-letter flag code mapping
FLAG_NAME_TO_CODE = {
'Russia': 'RU', 'Turkey': 'TR', 'Panama': 'PA', 'Liberia': 'LR',
'Marshall Islands': 'MH', 'China': 'CN', 'Greece': 'GR', 'Singapore': 'SG',
'India': 'IN', 'Japan': 'JP', 'South Korea': 'KR', 'Germany': 'DE',
'Norway': 'NO', 'Italy': 'IT', 'United Kingdom': 'GB', 'United States': 'US',
'Iran': 'IR', 'Saudi Arabia': 'SA', 'UAE': 'AE', 'Egypt': 'EG',
'Indonesia': 'ID', 'Philippines': 'PH', 'Vietnam': 'VN', 'Brazil': 'BR',
'Denmark': 'DK', 'Netherlands': 'NL', 'France': 'FR', 'Spain': 'ES',
'Malta': 'MT', 'Cyprus': 'CY', 'Azerbaijan': 'AZ', 'Kazakhstan': 'KZ',
'Turkmenistan': 'TM', 'Georgia': 'GE', 'Ukraine': 'UA', 'Romania': 'RO',
'Bulgaria': 'BG', 'Croatia': 'HR', 'Poland': 'PL', 'Finland': 'FI',
'Sweden': 'SE', 'Estonia': 'EE', 'Latvia': 'LV', 'Lithuania': 'LT',
'Hong Kong': 'HK', 'Bahamas': 'BS', 'Bermuda': 'BM', 'Portugal': 'PT',
'Cayman Islands': 'KY', 'Isle of Man': 'IM', 'Antigua and Barbuda': 'AG',
'Saint Vincent': 'VC', 'Comoros': 'KM', 'Tanzania': 'TZ', 'Togo': 'TG',
'Cameroon': 'CM', 'Sierra Leone': 'SL', 'Moldova': 'MD', 'Palau': 'PW',
'Belize': 'BZ', 'Bolivia': 'BO', 'Cook Islands': 'CK', 'Tuvalu': 'TV',
}
# Reverse: code → name (for display)
FLAG_CODE_TO_NAME = {v: k for k, v in FLAG_NAME_TO_CODE.items()}
def search_vessels_by_flag(flag_country: str, vessel_type: str = None, limit: int = 20) -> list:
"""Search vessels by flag country name, optionally filtered by vessel type."""
# Resolve country name to ISO 2-letter code
code = None
fc_upper = flag_country.upper().strip()
# Direct 2-letter code check
if len(fc_upper) == 2 and fc_upper.isalpha():
code = fc_upper
else:
# Try name → code mapping (case-insensitive)
for name, c_code in FLAG_NAME_TO_CODE.items():
if name.upper() == fc_upper:
code = c_code
break
conn = get_connection()
try:
cursor = conn.cursor()
results = []
# mt_bulk_staging has more vessels with owner data
try:
if code:
sql = "SELECT * FROM mt_bulk_staging WHERE UPPER(flag) = ?"
params = [code]
else:
sql = "SELECT * FROM mt_bulk_staging WHERE UPPER(flag) LIKE ?"
params = [f"%{flag_country.upper()}%"]
if vessel_type:
sql += " AND UPPER(type_category) = ?"
params.append(vessel_type.upper())
sql += " ORDER BY dwt DESC NULLS LAST LIMIT ?"
params.append(limit)
cursor.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.debug(f"Flag search staging error: {e}")
if len(results) < limit:
try:
sql2 = "SELECT * FROM vessels WHERE UPPER(flag) LIKE ?"
params2 = [f"%{flag_country.upper()}%"]
sql2 += " LIMIT ?"
params2.append(limit - len(results))
cursor.execute(sql2, params2)
existing_mmsi = {r.get('mmsi') for r in results}
for row in cursor.fetchall():
r = dict(row)
if r.get('mmsi') not in existing_mmsi:
results.append(r)
except Exception as e:
logger.debug(f"Flag search vessels error: {e}")
return results
finally:
conn.close()
def search_vessels_by_owner(owner_query: str, limit: int = 20) -> list:
"""Search vessels by owner/operator name (LIKE match across owner fields)."""
conn = get_connection()
try:
cursor = conn.cursor()
q = f"%{owner_query.upper()}%"
try:
cursor.execute("""
SELECT * FROM mt_bulk_staging
WHERE UPPER(owner) LIKE ? OR UPPER(registered_owner) LIKE ?
OR UPPER(operator) LIKE ? OR UPPER(commercial_manager) LIKE ?
ORDER BY dwt DESC NULLS LAST LIMIT ?
""", (q, q, q, q, limit))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.debug(f"Owner search error: {e}")
return []
finally:
conn.close()
def search_vessels_by_type(vessel_type: str, region_bbox: tuple = None, limit: int = 20) -> list:
"""Search vessels by type category, optionally restricted to bounding box (lat_min, lat_max, lon_min, lon_max)."""
conn = get_connection()
try:
cursor = conn.cursor()
try:
sql = "SELECT * FROM mt_bulk_staging WHERE UPPER(type_category) = ?"
params = [vessel_type.upper()]
if region_bbox:
lat_min, lat_max, lon_min, lon_max = region_bbox
sql += " AND lat BETWEEN ? AND ? AND lon BETWEEN ? AND ?"
params.extend([lat_min, lat_max, lon_min, lon_max])
sql += " ORDER BY dwt DESC NULLS LAST LIMIT ?"
params.append(limit)
cursor.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.debug(f"Type search error: {e}")
return []
finally:
conn.close()
# =============================================================================
# POSITION OPERATIONS
# =============================================================================
def _normalize_timestamp(ts) -> str:
"""Normalize timestamp to 'YYYY-MM-DD HH:MM:SS.ffffff' for consistent DB sorting.
Handles: ISO with T, AISStream '+0000 UTC' suffix, unix ms (Digitraffic)."""
if ts is None:
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
if isinstance(ts, (int, float)):
# Unix timestamp (seconds or milliseconds)
if ts > 1e12:
ts = ts / 1000.0 # milliseconds → seconds
return datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')
s = str(ts).strip()
# Remove AISStream timezone suffix like ' +0000 UTC'
for suffix in (' +0000 UTC', ' UTC', '+00:00', 'Z'):
if s.endswith(suffix):
s = s[:-len(suffix)]
# Replace T separator with space for consistent SQLite sorting
s = s.replace('T', ' ')
return s
def add_position(mmsi: str, lat: float, lon: float, speed: float = None,
course: float = None, heading: float = None, status: str = None,
destination: str = None, eta: str = None, timestamp: str = None,
source: str = None):
"""Upsert vessel position — keeps only the latest position per MMSI."""
conn = get_connection()
try:
cursor = conn.cursor()
ts = _normalize_timestamp(timestamp)
src = source or 'unknown'
if USE_POSTGRES:
cursor.execute("""
INSERT INTO positions (mmsi, latitude, longitude, speed, course,
heading, status, destination, eta, timestamp, received_at, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, ?)
ON CONFLICT (mmsi) DO UPDATE SET
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
speed = EXCLUDED.speed,
course = EXCLUDED.course,
heading = EXCLUDED.heading,
status = COALESCE(EXCLUDED.status, positions.status),
destination = COALESCE(EXCLUDED.destination, positions.destination),
eta = COALESCE(EXCLUDED.eta, positions.eta),
timestamp = EXCLUDED.timestamp,
received_at = CURRENT_TIMESTAMP,
source = EXCLUDED.source
""", (mmsi, lat, lon, speed, course, heading, status, destination, eta, ts, src))
else:
cursor.execute("""
INSERT OR REPLACE INTO positions (mmsi, latitude, longitude, speed, course,
heading, status, destination, eta, timestamp, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (mmsi, lat, lon, speed, course, heading, status, destination, eta, ts, src))
conn.commit()
finally:
conn.close()
def get_last_position(mmsi: str) -> dict:
"""Get last known position for vessel."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM positions
WHERE mmsi = ?
ORDER BY timestamp DESC
LIMIT 1
""", (mmsi,))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def update_position_destination(mmsi: str, destination: str, eta: str = None):
"""Update destination/eta on the latest position for this MMSI."""
conn = get_connection()
try:
cursor = conn.cursor()
if eta:
cursor.execute("""
UPDATE positions SET destination = ?, eta = ?
WHERE mmsi = ? AND id = (
SELECT id FROM positions WHERE mmsi = ? ORDER BY timestamp DESC LIMIT 1
)
""", (destination, eta, mmsi, mmsi))
else:
cursor.execute("""
UPDATE positions SET destination = ?
WHERE mmsi = ? AND id = (
SELECT id FROM positions WHERE mmsi = ? ORDER BY timestamp DESC LIMIT 1
)
""", (destination, mmsi, mmsi))
conn.commit()
finally:
conn.close()
def get_position_history(mmsi: str, hours: int = 24) -> list:
"""Get position history for vessel."""
conn = get_connection()
try:
cursor = conn.cursor()
# Use space-separated format for consistent SQLite string comparison
since = (datetime.utcnow() - timedelta(hours=hours)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
SELECT * FROM positions
WHERE mmsi = ? AND timestamp > ?
ORDER BY timestamp DESC
""", (mmsi, since))
rows = cursor.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def get_positions_in_area(lat_min: float, lat_max: float,
lon_min: float, lon_max: float,
max_age_minutes: int = 30) -> list:
"""Get most recent positions within a geographic bounding box.
Returns deduplicated list (latest position per MMSI), joined with vessel info."""
conn = get_connection()
try:
cursor = conn.cursor()
since = (datetime.utcnow() - timedelta(minutes=max_age_minutes)).strftime('%Y-%m-%d %H:%M:%S')
type_filter = ""
if config.FREIGHT_VESSELS_ONLY:
type_filter = f"AND (v.type_code IS NULL OR v.type_code >= {config.FREIGHT_MIN_TYPE_CODE})"
cursor.execute(f"""
SELECT p.mmsi, p.latitude, p.longitude, p.speed, p.course,
p.heading, p.status, p.destination, p.eta, p.timestamp, p.source,
v.name, v.type, v.flag, v.deadweight AS dwt, v.imo,
v.type_code, v.callsign,
v.owner, v.owner_country, v.operator, v.operator_country,
v.website, v.year_built, v.companies_json
FROM positions p
LEFT JOIN vessels v ON p.mmsi = v.mmsi
WHERE p.latitude BETWEEN ? AND ?
AND p.longitude BETWEEN ? AND ?
AND p.timestamp > ?
{type_filter}
ORDER BY p.timestamp DESC
""", (lat_min, lat_max, lon_min, lon_max, since))
rows = cursor.fetchall()
seen = set()
results = []
for row in rows:
r = dict(row)
if r['mmsi'] not in seen:
seen.add(r['mmsi'])
results.append(r)
return results
finally:
conn.close()
def _normalize_mt_staging_row(row) -> dict:
"""Normalize a mt_bulk_staging DB row to standard vessel dict format."""
r = dict(row) if not isinstance(row, dict) else row
v = {
'mmsi': r.get('mmsi'),
'imo': r.get('imo'),
'name': r.get('name', ''),
'flag': r.get('flag', ''),
'dwt': r.get('dwt'),
'latitude': r.get('lat'),
'longitude': r.get('lon'),
'speed': r.get('speed'),
'course': r.get('course'),
'destination': r.get('destination', ''),
'type_category': r.get('type_category', ''),
'type': r.get('gt_shiptype', ''),
'owner': r.get('owner'),
'operator': r.get('operator'),
'registered_owner': r.get('registered_owner'),
'year_built': r.get('year_built'),
'commercial_manager': r.get('commercial_manager'),
'owner_website': r.get('owner_website'),
'owner_country': r.get('owner_country'),
'operator_country': r.get('operator_country'),
'registered_owner_country': r.get('registered_owner_country'),
'source': 'mt_staging',
'_ship_id': r.get('ship_id'),
}
age_h = r.get('position_age_hours')
if age_h is not None:
v['position_age_hours'] = round(age_h, 1)
if age_h < 1:
v['position_freshness'] = 'live'
elif age_h < 6:
v['position_freshness'] = 'recent'
elif age_h < 24:
v['position_freshness'] = 'today'
else:
v['position_freshness'] = 'stale'
return v
_MT_STAGING_SELECT = """
SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, speed, course, destination, mmsi, imo, owner, operator,
scraped_at, registered_owner, year_built,
commercial_manager, owner_website, owner_country,
operator_country, registered_owner_country,
EXTRACT(EPOCH FROM (NOW() - scraped_at)) / 3600 AS position_age_hours
FROM mt_bulk_staging
"""
def get_vessel_from_staging(mmsi: str = None, imo: str = None, name: str = None) -> dict:
"""Get vessel from mt_bulk_staging with owner data formatted as companies array."""
conn = get_connection()
try:
cursor = conn.cursor()
if imo:
cursor.execute("""SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, speed, course, destination, mmsi, imo, draught, loa, beam, year_built,
owner, owner_country, owner_address, owner_website, owner_phone, owner_email,
registered_owner, registered_owner_country, registered_owner_address,
registered_owner_website, registered_owner_phone, registered_owner_email,
commercial_manager, commercial_manager_country, commercial_manager_address, commercial_manager_website,
operator, operator_country, operator_address, operator_website
FROM mt_bulk_staging WHERE imo = ? LIMIT 1""", (imo,))
elif mmsi:
cursor.execute("""SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, speed, course, destination, mmsi, imo, draught, loa, beam, year_built,
owner, owner_country, owner_address, owner_website, owner_phone, owner_email,
registered_owner, registered_owner_country, registered_owner_address,
registered_owner_website, registered_owner_phone, registered_owner_email,
commercial_manager, commercial_manager_country, commercial_manager_address, commercial_manager_website,
operator, operator_country, operator_address, operator_website
FROM mt_bulk_staging WHERE mmsi = ? LIMIT 1""", (mmsi,))
elif name:
cursor.execute("""SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, speed, course, destination, mmsi, imo, draught, loa, beam, year_built,
owner, owner_country, owner_address, owner_website, owner_phone, owner_email,
registered_owner, registered_owner_country, registered_owner_address,
registered_owner_website, registered_owner_phone, registered_owner_email,
commercial_manager, commercial_manager_country, commercial_manager_address, commercial_manager_website,
operator, operator_country, operator_address, operator_website
FROM mt_bulk_staging WHERE UPPER(name) = UPPER(?) LIMIT 1""", (name,))
else:
return None
row = cursor.fetchone()
if not row:
return None
r = dict(row)
vessel = {
'name': r.get('name', ''),
'mmsi': r.get('mmsi'),
'imo': r.get('imo'),
'flag': r.get('flag', ''),
'type': r.get('gt_shiptype', ''),
'type_category': r.get('type_category', ''),
'deadweight': r.get('dwt'),
'length': r.get('loa'),
'width': r.get('beam'),
'draught': r.get('draught'),
'year_built': r.get('year_built'),
'latitude': r.get('lat'),
'longitude': r.get('lon'),
'speed': r.get('speed'),
'course': r.get('course'),
'destination': r.get('destination', ''),
}
# Build companies array from owner fields
companies = []
_pro = 'MT Pro required'
if r.get('owner') and r['owner'] != _pro:
c = {'role': 'beneficial_owner', 'name': r['owner']}
if r.get('owner_country'): c['country'] = r['owner_country']
if r.get('owner_address'): c['address'] = r['owner_address']
if r.get('owner_website'): c['website'] = r['owner_website']
if r.get('owner_phone'): c['phone'] = r['owner_phone']
if r.get('owner_email'): c['email'] = r['owner_email']
companies.append(c)
if r.get('registered_owner'):
c = {'role': 'registered_owner', 'name': r['registered_owner']}
if r.get('registered_owner_country'): c['country'] = r['registered_owner_country']
if r.get('registered_owner_address'): c['address'] = r['registered_owner_address']
if r.get('registered_owner_website'): c['website'] = r['registered_owner_website']
if r.get('registered_owner_phone'): c['phone'] = r['registered_owner_phone']
if r.get('registered_owner_email'): c['email'] = r['registered_owner_email']
companies.append(c)
if r.get('commercial_manager'):
c = {'role': 'commercial_manager', 'name': r['commercial_manager']}
if r.get('commercial_manager_country'): c['country'] = r['commercial_manager_country']
if r.get('commercial_manager_address'): c['address'] = r['commercial_manager_address']
if r.get('commercial_manager_website'): c['website'] = r['commercial_manager_website']
companies.append(c)
if r.get('operator'):
c = {'role': 'operator', 'name': r['operator']}
if r.get('operator_country'): c['country'] = r['operator_country']
if r.get('operator_address'): c['address'] = r['operator_address']
if r.get('operator_website'): c['website'] = r['operator_website']
companies.append(c)
if companies:
vessel['companies'] = companies
elif r.get('owner') == _pro:
vessel['owner_status'] = 'MT Pro subscription required for owner data'
return vessel
finally:
conn.close()
def get_mt_bulk_staging_near_port(lat: float, lon: float, radius_nm: float,
vessel_type: str = None) -> list:
"""Query mt_bulk_staging table for vessels near a port.
Uses bounding box on lat/lon from MT tile scraping data.
Returns list of vessel dicts compatible with search_vessels_near_port output format.
"""
import math as _math
conn = get_connection()
try:
cursor = conn.cursor()
lat_delta = radius_nm / 60.0
cos_lat = max(_math.cos(_math.radians(lat)), 0.01)
lon_delta = radius_nm / (60.0 * cos_lat)
lat_min = lat - lat_delta
lat_max = lat + lat_delta
lon_min = lon - lon_delta
lon_max = lon + lon_delta
type_clause = ''
params = [lat_min, lat_max, lon_min, lon_max]
if vessel_type == 'bulk':
type_clause = "AND (type_category = 'bulk' OR gt_shiptype = '6')"
elif vessel_type == 'tanker':
type_clause = "AND type_category = 'tanker'"
elif vessel_type == 'container':
type_clause = "AND (type_category = 'container' OR gt_shiptype = '11')"
elif vessel_type == 'roro':
type_clause = "AND (type_category = 'roro' OR gt_shiptype = '12')"
elif vessel_type == 'general':
type_clause = "AND (type_category = 'general' OR gt_shiptype IN ('9','70','139'))"
cursor.execute(f"""
{_MT_STAGING_SELECT}
WHERE lat BETWEEN ? AND ?
AND lon BETWEEN ? AND ?
AND lat IS NOT NULL AND lon IS NOT NULL
{type_clause}
ORDER BY scraped_at DESC NULLS LAST, dwt DESC NULLS LAST
LIMIT 100
""", params)
return [_normalize_mt_staging_row(row) for row in cursor.fetchall()]
except Exception:
return []
finally:
conn.close()
def get_mt_bulk_staging_in_area(lat_min: float, lat_max: float,
lon_min: float, lon_max: float,
limit: int = 500) -> list:
"""Get mt_bulk_staging vessels within bounding box for map display.
Returns normalized vessel dicts with latitude/longitude keys."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(f"""
{_MT_STAGING_SELECT}
WHERE lat BETWEEN ? AND ?
AND lon BETWEEN ? AND ?
AND lat IS NOT NULL AND lon IS NOT NULL
AND mmsi IS NOT NULL
ORDER BY scraped_at DESC NULLS LAST
LIMIT ?
""", [lat_min, lat_max, lon_min, lon_max, limit])
return [_normalize_mt_staging_row(row) for row in cursor.fetchall()]
except Exception:
return []
finally:
conn.close()
def get_all_vessel_positions_minimal() -> list:
"""Get all vessel positions for map clustering (minimal fields).
Unions fresh AIS positions with MT bulk staging, deduped by MMSI.
Returns list of dicts: mmsi, lat, lon, type_category, heading, speed, course, name."""
conn = get_connection()
try:
cursor = conn.cursor()
# Fresh AIS positions (< 60 min) + MT bulk staging, deduped by MMSI (PostgreSQL DISTINCT ON)
cursor.execute("""
SELECT DISTINCT ON (mmsi)
mmsi, lat, lon, type_category, heading, speed, course, name
FROM (
SELECT p.mmsi, p.latitude AS lat, p.longitude AS lon,
COALESCE(v.type, '') AS type_category,
p.heading, p.speed, p.course,
COALESCE(v.name, '') AS name,
1 AS priority,
p.timestamp AS ts
FROM positions p
LEFT JOIN vessels v ON p.mmsi = v.mmsi
WHERE p.timestamp > NOW() - INTERVAL '60 minutes'
AND p.latitude IS NOT NULL AND p.longitude IS NOT NULL
AND p.mmsi IS NOT NULL
UNION ALL
SELECT mmsi, lat, lon,
COALESCE(type_category, 'other') AS type_category,
NULL AS heading, speed, course,
COALESCE(name, '') AS name,
2 AS priority,
scraped_at AS ts
FROM mt_bulk_staging
WHERE lat IS NOT NULL AND lon IS NOT NULL AND mmsi IS NOT NULL
) combined
ORDER BY mmsi, priority ASC, ts DESC
""")
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
mmsi = str(r.get('mmsi', ''))
if not mmsi:
continue
results.append({
'mmsi': mmsi,
'lat': float(r['lat']) if r['lat'] else None,
'lon': float(r['lon']) if r['lon'] else None,
'type_category': r.get('type_category') or 'other',
'heading': r.get('heading'),
'speed': r.get('speed'),
'course': r.get('course'),
'name': r.get('name') or '',
})
return results
except Exception as e:
logger.error(f"get_all_vessel_positions_minimal error: {e}")
return []
finally:
conn.close()
def update_mt_bulk_staging_position(mmsi: str, lat: float, lon: float,
speed: float = None, course: float = None):
"""Cross-update mt_bulk_staging with fresh AIS position for a known vessel.
Only updates if we have a matching MMSI in the table.
"""
if not mmsi or lat is None or lon is None:
return
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE mt_bulk_staging
SET lat = ?, lon = ?, speed = COALESCE(?, speed),
course = COALESCE(?, course), scraped_at = NOW()
WHERE mmsi = ?
""", [lat, lon, speed, course, mmsi])
conn.commit()
except Exception:
pass
finally:
conn.close()
def insert_mt_discovery(vessel: dict, source: str = 'ais'):
"""Insert a newly discovered cargo/tanker vessel into mt_bulk_staging.
Uses ON CONFLICT DO NOTHING to avoid overwriting existing data.
Vessel dict should have: mmsi, name, lat/latitude, lon/longitude, type/type_category.
"""
mmsi = vessel.get('mmsi')
if not mmsi:
return
name = vessel.get('name', '')
lat = vessel.get('lat') or vessel.get('latitude')
lon = vessel.get('lon') or vessel.get('longitude')
speed = vessel.get('speed')
course = vessel.get('course')
flag = vessel.get('flag', '')
dwt = vessel.get('dwt')
imo = vessel.get('imo')
type_cat = vessel.get('type_category', '')
gt_shiptype = vessel.get('type', '')
ship_id = f"ais_{mmsi}"
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, speed, course, mmsi, imo, scraped_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
ON CONFLICT (ship_id) DO NOTHING
""", [ship_id, name, flag, dwt, gt_shiptype, type_cat,
lat, lon, speed, course, mmsi, imo])
conn.commit()
except Exception:
pass
finally:
conn.close()
def get_vessels_by_destination(destination_patterns: list, max_age_minutes: int = 360) -> list:
"""Find vessels whose AIS destination matches any of the given patterns.
Args:
destination_patterns: list of uppercase strings, e.g. ["BOSTON", "USBOS", "US BOS"]
max_age_minutes: max age of position data (default 6 hours)
Returns:
list of vessel dicts with position + vessel info (latest per MMSI)
"""
if not destination_patterns:
return []
conn = get_connection()
try:
cursor = conn.cursor()
since = (datetime.utcnow() - timedelta(minutes=max_age_minutes)).strftime('%Y-%m-%d %H:%M:%S')
# Build dynamic WHERE: (UPPER(p.destination) LIKE ? OR UPPER(p.destination) LIKE ? ...)
like_clauses = []
params = []
for pat in destination_patterns:
like_clauses.append("UPPER(p.destination) LIKE ?")
params.append(f"%{pat.upper()}%")
where_dest = " OR ".join(like_clauses)
params.append(since)
type_filter = ""
if config.FREIGHT_VESSELS_ONLY:
type_filter = f"AND (v.type_code IS NULL OR v.type_code >= {config.FREIGHT_MIN_TYPE_CODE})"
cursor.execute(f"""
SELECT p.mmsi, p.latitude, p.longitude, p.speed, p.course,
p.heading, p.status, p.destination, p.eta, p.timestamp, p.source,
v.name, v.type, v.flag, v.deadweight AS dwt, v.imo,
v.type_code, v.callsign,
v.owner, v.owner_country, v.operator, v.operator_country,
v.website, v.year_built, v.companies_json
FROM positions p
LEFT JOIN vessels v ON p.mmsi = v.mmsi
WHERE ({where_dest})
AND p.destination IS NOT NULL AND p.destination != ''
AND p.timestamp > ?
{type_filter}
ORDER BY p.timestamp DESC
""", params)
rows = cursor.fetchall()
# Deduplicate: latest position per MMSI
seen = set()
results = []
for row in rows:
r = dict(row)
if r['mmsi'] not in seen:
seen.add(r['mmsi'])
results.append(r)
return results
finally:
conn.close()
def cleanup_old_positions(keep_hours: int = 48) -> int:
"""Delete positions older than keep_hours. Returns count of deleted rows."""
conn = get_connection()
try:
cursor = conn.cursor()
cutoff = (datetime.utcnow() - timedelta(hours=keep_hours)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("DELETE FROM positions WHERE timestamp < ?", (cutoff,))
deleted = cursor.rowcount
conn.commit()
return deleted
finally:
conn.close()
# =============================================================================
# CONTACT OPERATIONS
# =============================================================================
def add_contact(contact_type: str, company_name: str, **kwargs) -> int:
"""Add contact."""
conn = get_connection()
try:
cursor = conn.cursor()
data = {'type': contact_type, 'company_name': company_name, **kwargs}
# Filter to allowed columns only (prevent SQL injection via dict keys)
data = {k: v for k, v in data.items() if k in _CONTACT_ALLOWED_COLS}
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
cursor.execute(f"INSERT INTO contacts ({columns}) VALUES ({placeholders})",
list(data.values()))
conn.commit()
contact_id = cursor.lastrowid
return contact_id
finally:
conn.close()
def search_contacts(query: str = None, contact_type: str = None, limit: int = 20) -> list:
"""Search contacts."""
conn = get_connection()
try:
cursor = conn.cursor()
if query and contact_type:
cursor.execute("""
SELECT * FROM contacts
WHERE type = ? AND (company_name LIKE ? OR contact_person LIKE ?)
ORDER BY updated_at DESC LIMIT ?
""", (contact_type, f"%{query}%", f"%{query}%", limit))
elif contact_type:
cursor.execute("""
SELECT * FROM contacts WHERE type = ?
ORDER BY updated_at DESC LIMIT ?
""", (contact_type, limit))
elif query:
cursor.execute("""
SELECT * FROM contacts
WHERE company_name LIKE ? OR contact_person LIKE ?
ORDER BY updated_at DESC LIMIT ?
""", (f"%{query}%", f"%{query}%", limit))
else:
cursor.execute("SELECT * FROM contacts ORDER BY updated_at DESC LIMIT ?", (limit,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def _mask_email(email):
"""Mask email: john.doe@maersk.com -> j***@maersk.com"""
if not email:
return ''
parts = email.split('@')
if len(parts) != 2:
return '***@***.com'
local = parts[0]
masked_local = local[0] + '***' if local else '***'
return f"{masked_local}@{parts[1]}"
def _mask_phone(phone):
"""Mask phone: +1-555-123-4567 -> +1-***-**67"""
if not phone:
return ''
if len(phone) > 6:
return phone[:3] + '***' + phone[-2:]
return '***'
def search_contacts_in_staging(query: str, limit: int = 10, mask: bool = True) -> list:
"""Search mt_bulk_staging for vessel/owner contacts by name, MMSI, IMO, or owner/operator name.
mask=True: emails/phones masked (free preview). mask=False: full data (paid unlock)."""
conn = get_connection()
try:
cursor = conn.cursor()
q = f"%{query}%"
q_exact = query.strip()
# Search by name, owner, operator, registered_owner, MMSI, IMO
cursor.execute("""
SELECT name, imo, mmsi, flag, dwt, type_category,
owner, owner_email, owner_phone, owner_website,
owner_address, owner_country,
operator, operator_website, operator_country,
registered_owner, registered_owner_email, registered_owner_website,
registered_owner_country,
commercial_manager, commercial_manager_website, commercial_manager_country
FROM mt_bulk_staging
WHERE (name ILIKE ? OR owner ILIKE ? OR operator ILIKE ?
OR registered_owner ILIKE ?
OR CAST(mmsi AS TEXT) = ? OR CAST(imo AS TEXT) = ?)
AND (owner IS NOT NULL OR operator IS NOT NULL)
LIMIT ?
""", (q, q, q, q, q_exact, q_exact, limit))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
vessel_info = f"{r.get('name', '')} (IMO: {r.get('imo', 'N/A')}, MMSI: {r.get('mmsi', 'N/A')})"
contacts_out = []
# Owner contact
if r.get('owner'):
email_raw = r.get('owner_email') or ''
phone_raw = r.get('owner_phone') or ''
contacts_out.append({
'company_name': r['owner'],
'vessel': vessel_info,
'role': 'Beneficial Owner',
'email': _mask_email(email_raw) if mask else email_raw,
'phone': _mask_phone(phone_raw) if mask else phone_raw,
'website': r.get('owner_website') or '',
'country': r.get('owner_country') or '',
'address': (r.get('owner_address') or '') if not mask else '',
'has_email': bool(email_raw),
'has_phone': bool(phone_raw),
})
# Registered owner (if different)
if r.get('registered_owner') and r.get('registered_owner') != r.get('owner'):
email_raw = r.get('registered_owner_email') or ''
contacts_out.append({
'company_name': r['registered_owner'],
'vessel': vessel_info,
'role': 'Registered Owner',
'email': _mask_email(email_raw) if mask else email_raw,
'phone': '',
'website': r.get('registered_owner_website') or '',
'country': r.get('registered_owner_country') or '',
'address': '',
'has_email': bool(email_raw),
'has_phone': False,
})
# Operator (if different from owner)
if r.get('operator') and r.get('operator') != r.get('owner'):
contacts_out.append({
'company_name': r['operator'],
'vessel': vessel_info,
'role': 'Operator',
'email': '',
'phone': '',
'website': r.get('operator_website') or '',
'country': r.get('operator_country') or '',
'address': '',
'has_email': False,
'has_phone': False,
})
# Commercial manager (if different)
if r.get('commercial_manager') and r.get('commercial_manager') != r.get('owner'):
contacts_out.append({
'company_name': r['commercial_manager'],
'vessel': vessel_info,
'role': 'Commercial Manager',
'email': '',
'phone': '',
'website': r.get('commercial_manager_website') or '',
'country': r.get('commercial_manager_country') or '',
'address': '',
'has_email': False,
'has_phone': False,
})
results.extend(contacts_out)
# Deduplicate by company_name + role
seen = set()
unique = []
for c in results:
key = (c['company_name'], c['role'])
if key not in seen:
seen.add(key)
unique.append(c)
return unique[:limit]
finally:
conn.close()
def get_contacts_for_vessel(mmsi: str) -> list:
"""Get all contacts associated with a vessel."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM contacts WHERE vessels_mmsi LIKE ?", (f'%"{mmsi}"%',))
rows = cursor.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
# =============================================================================
# USER OPERATIONS
# =============================================================================
def create_user(email: str, password_hash: str, name: str = '', lang: str = 'en') -> int:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO users (email, password_hash, name, lang) VALUES (?, ?, ?, ?)",
(email.lower().strip(), password_hash, name.strip(), lang)
)
conn.commit()
user_id = cursor.lastrowid
return user_id
finally:
conn.close()
def get_user_by_telegram_chat_id(chat_id: int):
"""Find user linked to a Telegram chat_id."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE telegram_chat_id = ?", (chat_id,))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def create_telegram_link(user_id: int) -> str:
"""Generate a one-time deep-link code for Telegram binding. Returns code."""
import secrets
code = secrets.token_urlsafe(16)
ttl = config.TELEGRAM_LINK_TTL_MINUTES
expires = (datetime.utcnow() + timedelta(minutes=ttl)).isoformat()
conn = get_connection()
try:
cursor = conn.cursor()
# Remove previous codes for this user + expired codes
cursor.execute("DELETE FROM telegram_links WHERE user_id = ?", (user_id,))
cursor.execute("DELETE FROM telegram_links WHERE expires_at < ?",
(datetime.utcnow().isoformat(),))
cursor.execute(
"INSERT INTO telegram_links (code, user_id, expires_at) VALUES (?, ?, ?)",
(code, user_id, expires))
conn.commit()
finally:
conn.close()
return code
def verify_telegram_link(code: str, chat_id: int):
"""Verify link code and bind chat_id to user. Returns user dict or None."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT user_id FROM telegram_links WHERE code = ? AND expires_at > ?",
(code, datetime.utcnow().isoformat()))
row = cursor.fetchone()
if not row:
return None
user_id = row[0] if isinstance(row, (list, tuple)) else row['user_id']
# Bind chat_id to user
cursor.execute("UPDATE users SET telegram_chat_id = ? WHERE id = ?",
(chat_id, user_id))
# Consume the code
cursor.execute("DELETE FROM telegram_links WHERE code = ?", (code,))
conn.commit()
finally:
conn.close()
return get_user_by_id(user_id)
def unlink_telegram(user_id: int):
"""Remove Telegram binding from user."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("UPDATE users SET telegram_chat_id = NULL WHERE id = ?",
(user_id,))
conn.commit()
finally:
conn.close()
def get_user_by_email(email: str) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE email = ?", (email.lower().strip(),))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def get_user_by_id(user_id: int) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def update_user_last_login(user_id: int):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("UPDATE users SET last_login = ? WHERE id = ?",
(datetime.utcnow().isoformat(), user_id))
conn.commit()
finally:
conn.close()
def update_user_balance(user_id: int, amount: float):
amount = round(amount, 2)
if amount <= 0:
raise ValueError(f"update_user_balance: amount must be positive, got {amount}")
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("UPDATE users SET balance = round(balance + ?, 2) WHERE id = ?", (amount, user_id))
conn.commit()
finally:
conn.close()
def charge_user(user_id: int, amount: float) -> bool:
"""Atomically charge user balance. Returns True if sufficient funds."""
amount = round(amount, 2)
if amount <= 0:
return False
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET balance = round(balance - ?, 2) WHERE id = ? AND balance >= ?",
(amount, user_id, amount)
)
success = cursor.rowcount > 0
conn.commit()
return success
finally:
conn.close()
def add_service_charge(user_id: int, service: str, amount: float, details: str = None):
"""Log a service charge for revenue tracking."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO service_charges (user_id, service, amount, details) VALUES (?, ?, ?, ?)",
(user_id, service, amount, details)
)
conn.commit()
finally:
conn.close()
def save_purchased_contacts(user_id: int, contacts: list, query: str, contact_type: str, amount: float):
"""Save purchased (unlocked) contacts for the user."""
conn = get_connection()
try:
cursor = conn.cursor()
for c in contacts:
cursor.execute(
"INSERT INTO purchased_contacts (user_id, contact_data, query, contact_type, amount_paid) VALUES (?, ?, ?, ?, ?)",
(user_id, json.dumps(c, ensure_ascii=False), query, contact_type, amount)
)
conn.commit()
finally:
conn.close()
def charge_and_log(user_id: int, amount: float, service: str, details: str = None,
contacts: list = None, query: str = None, contact_type: str = None) -> bool:
"""Atomically charge user, log service charge, and optionally save purchased contacts.
All operations in a single transaction -- either all succeed or all roll back."""
amount = round(amount, 2)
if amount <= 0:
return False
conn = get_connection()
try:
cursor = conn.cursor()
# 1. Atomic balance deduction
cursor.execute(
"UPDATE users SET balance = round(balance - ?, 2) WHERE id = ? AND balance >= ?",
(amount, user_id, amount)
)
if cursor.rowcount == 0:
conn.rollback()
return False
# 2. Log service charge
cursor.execute(
"INSERT INTO service_charges (user_id, service, amount, details) VALUES (?, ?, ?, ?)",
(user_id, service, amount, details)
)
# 3. Optionally save purchased contacts
if contacts:
for c in contacts:
cursor.execute(
"INSERT INTO purchased_contacts (user_id, contact_data, query, contact_type, amount_paid) VALUES (?, ?, ?, ?, ?)",
(user_id, json.dumps(c, ensure_ascii=False), query or '', contact_type or '', amount)
)
conn.commit()
return True
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_purchased_contacts(user_id: int) -> list:
"""Get all purchased contacts for a user."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT id, contact_data, query, contact_type, amount_paid, created_at FROM purchased_contacts WHERE user_id = ? ORDER BY id DESC",
(user_id,)
)
rows = cursor.fetchall()
results = []
for r in rows:
try:
data = json.loads(r[1])
except (json.JSONDecodeError, TypeError):
data = {}
results.append({
'id': r[0],
'contact': data,
'query': r[2],
'contact_type': r[3],
'amount_paid': r[4],
'purchased_at': r[5],
})
return results
finally:
conn.close()
def has_purchased_contact(user_id: int, query: str, contact_type: str = None) -> list:
"""Check if user already purchased contacts for this query."""
conn = get_connection()
try:
cursor = conn.cursor()
if contact_type:
cursor.execute(
"SELECT contact_data FROM purchased_contacts WHERE user_id = ? AND LOWER(query) = LOWER(?) AND contact_type = ?",
(user_id, query, contact_type)
)
else:
cursor.execute(
"SELECT contact_data FROM purchased_contacts WHERE user_id = ? AND LOWER(query) = LOWER(?)",
(user_id, query)
)
rows = cursor.fetchall()
contacts = []
for r in rows:
try:
contacts.append(json.loads(r[0]))
except (json.JSONDecodeError, TypeError):
pass
return contacts
finally:
conn.close()
def get_revenue_stats() -> dict:
"""Get platform revenue stats from service charges."""
conn = get_connection()
try:
return _get_revenue_stats_inner(conn)
finally:
conn.close()
def _get_revenue_stats_inner(conn) -> dict:
cursor = conn.cursor()
now = datetime.utcnow()
# --- Service charges (main revenue) ---
cursor.execute("SELECT COALESCE(SUM(amount), 0) FROM service_charges")
total_service_revenue = cursor.fetchone()[0]
cursor.execute(
"SELECT service, COUNT(*) as cnt, SUM(amount) as total FROM service_charges GROUP BY service ORDER BY total DESC"
)
by_service = [{'service': r[0], 'count': r[1], 'total': round(r[2], 2)} for r in cursor.fetchall()]
# Today
today_str = now.strftime('%Y-%m-%d')
cursor.execute(
"SELECT COALESCE(SUM(amount), 0) FROM service_charges WHERE created_at >= ?",
(today_str,)
)
today_revenue = cursor.fetchone()[0]
# This week (last 7 days)
week_ago = (now - timedelta(days=7)).strftime('%Y-%m-%d')
cursor.execute(
"SELECT COALESCE(SUM(amount), 0) FROM service_charges WHERE created_at >= ?",
(week_ago,)
)
week_revenue = cursor.fetchone()[0]
# This month (last 30 days)
month_ago = (now - timedelta(days=30)).strftime('%Y-%m-%d')
cursor.execute(
"SELECT COALESCE(SUM(amount), 0) FROM service_charges WHERE created_at >= ?",
(month_ago,)
)
month_revenue = cursor.fetchone()[0]
# --- Deposits & withdrawals ---
cursor.execute("SELECT COALESCE(SUM(amount), 0), COUNT(*) FROM deposits")
row = cursor.fetchone()
total_deposited, deposit_count = row[0], row[1]
# Deposit fees earned (2% of each deposit = platform income)
deposit_fee_income = round(total_deposited * 0.02, 2)
cursor.execute("SELECT COALESCE(SUM(balance), 0) FROM users")
total_balances = cursor.fetchone()[0]
cursor.execute(
"SELECT COALESCE(SUM(amount), 0), COUNT(*) FROM withdrawals WHERE status != 'rejected'"
)
row = cursor.fetchone()
total_withdrawn, withdrawal_count = row[0], row[1]
# --- Platform profit ---
# Profit = (deposits - user_balances - withdrawals) + service_charges
# deposits - balances - withdrawals = what users spent/fees
platform_profit = (total_deposited - total_balances - total_withdrawn) + total_service_revenue
# --- User metrics ---
cursor.execute("SELECT COUNT(*) FROM users")
total_users = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM users WHERE balance > 0")
users_with_balance = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT user_id) FROM deposits")
paying_users = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT user_id) FROM service_charges")
active_buyers = cursor.fetchone()[0]
# Users registered this week
cursor.execute(
"SELECT COUNT(*) FROM users WHERE created_at >= ?",
(week_ago,)
)
new_users_week = cursor.fetchone()[0]
# --- Daily revenue (last 14 days) ---
cursor.execute(
"""SELECT DATE(created_at) as day, SUM(amount) as total
FROM service_charges
WHERE created_at >= ?
GROUP BY DATE(created_at)
ORDER BY day""",
((now - timedelta(days=14)).strftime('%Y-%m-%d'),)
)
daily_revenue = [{'date': r[0], 'amount': round(r[1], 2)} for r in cursor.fetchall()]
# --- Recent charges ---
cursor.execute(
"""SELECT sc.id, u.email, sc.service, sc.amount, sc.details, sc.created_at
FROM service_charges sc JOIN users u ON sc.user_id = u.id
ORDER BY sc.id DESC LIMIT 20"""
)
recent = [{'id': r[0], 'user': r[1], 'service': r[2], 'amount': r[3],
'details': r[4], 'date': r[5]} for r in cursor.fetchall()]
# --- Recent deposits ---
cursor.execute(
"""SELECT d.id, u.email, d.amount, d.from_address, d.confirmed_at
FROM deposits d JOIN users u ON d.user_id = u.id
ORDER BY d.id DESC LIMIT 10"""
)
recent_deposits = [{'id': r[0], 'user': r[1], 'amount': round(r[2], 2),
'from': r[3][:8] + '...' if r[3] and len(r[3]) > 8 else (r[3] or ''),
'date': r[4]} for r in cursor.fetchall()]
return {
# Main totals
'total_revenue': round(total_service_revenue, 2),
'deposit_fee_income': deposit_fee_income,
'platform_profit': round(platform_profit, 2),
# Time periods
'today_revenue': round(today_revenue, 2),
'week_revenue': round(week_revenue, 2),
'month_revenue': round(month_revenue, 2),
# Financial flow
'total_deposited': round(total_deposited, 2),
'deposit_count': deposit_count,
'total_user_balances': round(total_balances, 2),
'total_withdrawn': round(total_withdrawn, 2),
'withdrawal_count': withdrawal_count,
# Users
'total_users': total_users,
'users_with_balance': users_with_balance,
'paying_users': paying_users,
'active_buyers': active_buyers,
'new_users_week': new_users_week,
# Breakdowns
'by_service': by_service,
'daily_revenue': daily_revenue,
'recent_charges': recent,
'recent_deposits': recent_deposits,
}
PORT_CACHE_MINUTES = config.PORT_CACHE_MINUTES
def get_cached_port_vessels(port_key: str) -> list:
"""Get cached vessels for a port if cache is fresh (< 15 min)."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT vessel_data, cached_at FROM port_vessel_cache WHERE port_key = ? ORDER BY id DESC LIMIT 1",
(port_key.lower(),)
)
row = cursor.fetchone()
if not row:
return None
try:
cached_at = datetime.fromisoformat(str(row[1]))
except (ValueError, TypeError):
return None
if datetime.utcnow() - cached_at > timedelta(minutes=PORT_CACHE_MINUTES):
return None
try:
return json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return None
finally:
conn.close()
def cache_port_vessels(port_key: str, vessels: list):
"""Cache vessels list for a port."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO port_vessel_cache (port_key, vessel_data, vessel_count) VALUES (?, ?, ?)",
(port_key.lower(), json.dumps(vessels, ensure_ascii=False), len(vessels))
)
cursor.execute(
"DELETE FROM port_vessel_cache WHERE port_key = ? AND id NOT IN "
"(SELECT id FROM port_vessel_cache WHERE port_key = ? ORDER BY id DESC LIMIT 2)",
(port_key.lower(), port_key.lower())
)
conn.commit()
finally:
conn.close()
def set_user_admin(user_id: int, is_admin: bool = True):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("UPDATE users SET is_admin = ? WHERE id = ?", (1 if is_admin else 0, user_id))
conn.commit()
finally:
conn.close()
# =============================================================================
# SESSION OPERATIONS
# =============================================================================
def create_session(user_id: int, token: str, expires_hours: int = 72) -> int:
conn = get_connection()
try:
cursor = conn.cursor()
expires = (datetime.utcnow() + timedelta(hours=expires_hours)).isoformat()
cursor.execute(
"INSERT INTO sessions (user_id, token, expires_at) VALUES (?, ?, ?)",
(user_id, token, expires)
)
conn.commit()
session_id = cursor.lastrowid
return session_id
finally:
conn.close()
def get_session(token: str) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM sessions WHERE token = ? AND expires_at > ?",
(token, datetime.utcnow().isoformat())
)
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def delete_session(token: str):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM sessions WHERE token = ?", (token,))
conn.commit()
finally:
conn.close()
# =============================================================================
# CHAT HISTORY OPERATIONS
# =============================================================================
def add_chat_message(user_id: int, role: str, message: str):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO chat_history (user_id, role, message) VALUES (?, ?, ?)",
(user_id, role, message)
)
conn.commit()
finally:
conn.close()
def get_chat_history(user_id: int, limit: int = 50) -> list:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT role, message, created_at FROM chat_history
WHERE user_id = ?
ORDER BY id DESC LIMIT ?
""", (user_id, limit))
rows = cursor.fetchall()
return [dict(row) for row in reversed(rows)]
finally:
conn.close()
def clear_chat_history(user_id: int):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM chat_history WHERE user_id = ?", (user_id,))
conn.commit()
finally:
conn.close()
def get_chat_message_count(user_id: int) -> int:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM chat_history WHERE user_id = ?", (user_id,))
return cursor.fetchone()[0]
finally:
conn.close()
# =============================================================================
# AGENT MEMORY (3-tier system)
# =============================================================================
def save_user_memory(user_id: int, memory_type: str, content: str, entity: str = None, is_permanent: bool = False):
"""Save a memory fact. Deduplicates by content similarity."""
conn = get_connection()
try:
cursor = conn.cursor()
# Check for duplicate (same user + similar content)
cursor.execute(
"SELECT id FROM user_memory WHERE user_id = ? AND content = ?",
(user_id, content)
)
if cursor.fetchone():
# Update last_accessed instead of duplicating
cursor.execute(
"UPDATE user_memory SET last_accessed = CURRENT_TIMESTAMP, relevance_score = 1.0 WHERE user_id = ? AND content = ?",
(user_id, content)
)
conn.commit()
return
# Enforce max memories per user
cursor.execute("SELECT COUNT(*) FROM user_memory WHERE user_id = ? AND is_permanent = FALSE", (user_id,))
count = cursor.fetchone()[0]
if count >= 50:
# Delete oldest non-permanent memory
cursor.execute("""
DELETE FROM user_memory WHERE id = (
SELECT id FROM user_memory WHERE user_id = ? AND is_permanent = FALSE
ORDER BY relevance_score ASC, last_accessed ASC LIMIT 1
)
""", (user_id,))
cursor.execute(
"INSERT INTO user_memory (user_id, memory_type, content, entity, is_permanent) VALUES (?, ?, ?, ?, ?)",
(user_id, memory_type, content, entity, is_permanent)
)
conn.commit()
finally:
conn.close()
def get_user_memories(user_id: int, limit: int = 10) -> list:
"""Get top memories sorted by: permanent first, then relevance * recency."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id, memory_type, content, entity, is_permanent, relevance_score, created_at
FROM user_memory WHERE user_id = ?
ORDER BY is_permanent DESC, relevance_score DESC, last_accessed DESC
LIMIT ?
""", (user_id, limit))
rows = cursor.fetchall()
# Update last_accessed for retrieved memories
if rows:
ids = [dict(r)['id'] for r in rows]
placeholders = ','.join(['?' for _ in ids])
cursor.execute(f"UPDATE user_memory SET last_accessed = CURRENT_TIMESTAMP WHERE id IN ({placeholders})", ids)
conn.commit()
return [dict(r) for r in rows]
finally:
conn.close()
def delete_user_memory(user_id: int, memory_id: int):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM user_memory WHERE id = ? AND user_id = ?", (memory_id, user_id))
conn.commit()
finally:
conn.close()
def save_conversation_summary(user_id: int, summary: str, message_count: int):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO conversation_summaries (user_id, summary, message_count) VALUES (?, ?, ?)",
(user_id, summary, message_count)
)
# Keep only last 10 summaries per user
cursor.execute("""
DELETE FROM conversation_summaries WHERE id NOT IN (
SELECT id FROM conversation_summaries WHERE user_id = ?
ORDER BY id DESC LIMIT 10
) AND user_id = ?
""", (user_id, user_id))
conn.commit()
finally:
conn.close()
def get_conversation_summaries(user_id: int, limit: int = 3) -> list:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT summary, message_count, created_at FROM conversation_summaries
WHERE user_id = ? ORDER BY id DESC LIMIT ?
""", (user_id, limit))
return [dict(r) for r in cursor.fetchall()]
finally:
conn.close()
# =============================================================================
# USER PROFILE OPERATIONS
# =============================================================================
PROFILE_FIELDS = [
'company_name', 'role', 'fleet_size', 'vessel_types', 'trade_routes',
'cargo_types', 'vessels_of_interest', 'experience_years', 'phone',
'telegram', 'notes', 'home_port', 'preferred_tonnage', 'search_radius'
]
def get_user_profile(user_id: int) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
if row:
profile = dict(row)
for field in ('vessel_types', 'trade_routes', 'cargo_types', 'vessels_of_interest'):
try:
profile[field] = json.loads(profile[field])
except (json.JSONDecodeError, TypeError):
profile[field] = []
return profile
return None
finally:
conn.close()
def upsert_user_profile(user_id: int, data: dict) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
for field in ('vessel_types', 'trade_routes', 'cargo_types', 'vessels_of_interest'):
if field in data and isinstance(data[field], list):
data[field] = json.dumps(data[field])
cursor.execute("SELECT user_id FROM user_profiles WHERE user_id = ?", (user_id,))
exists = cursor.fetchone()
if exists:
sets = []
vals = []
for k in PROFILE_FIELDS:
if k in data:
sets.append(f"{k} = ?")
vals.append(data[k])
if sets:
sets.append("updated_at = ?")
vals.append(datetime.utcnow().isoformat())
vals.append(user_id)
cursor.execute(f"UPDATE user_profiles SET {', '.join(sets)} WHERE user_id = ?", vals)
else:
cols = ['user_id']
vals = [user_id]
for k in PROFILE_FIELDS:
if k in data:
cols.append(k)
vals.append(data[k])
placeholders = ', '.join(['?'] * len(cols))
cursor.execute(f"INSERT INTO user_profiles ({', '.join(cols)}) VALUES ({placeholders})", vals)
conn.commit()
finally:
conn.close()
return get_user_profile(user_id)
def _sanitize_profile_field(value, max_len: int = 100) -> str:
"""Strip prompt-injection patterns, HTML tags, and control chars from profile field."""
if not value or not isinstance(value, str):
return ''
val = re.sub(r'<[^>]+>', '', value) # strip HTML tags
val = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', val)
val = val.replace('```', '')
val = re.sub(
r'(?i)(ignore|disregard|forget|override).{0,20}(previous|above|all|system|instructions?|prompt)',
'[filtered]', val)
val = re.sub(
r'(?i)(you are now|act as|pretend|new instructions?|system prompt|HACKED)',
'[filtered]', val)
return val.strip()[:max_len]
def get_profile_summary(user_id: int) -> str:
"""Get a human-readable profile summary for AI context."""
profile = get_user_profile(user_id)
if not profile:
return ""
parts = []
if profile.get('company_name'):
parts.append(f"Company: {_sanitize_profile_field(profile['company_name'], 80)}")
if profile.get('role'):
parts.append(f"Role: {_sanitize_profile_field(profile['role'], 50)}")
if profile.get('fleet_size'):
parts.append(f"Fleet size: {profile['fleet_size']} vessels")
if profile.get('vessel_types') and len(profile['vessel_types']) > 0:
safe = [_sanitize_profile_field(v, 50) for v in profile['vessel_types'] if v]
if safe:
parts.append(f"Vessel types: {', '.join(safe)}")
if profile.get('trade_routes') and len(profile['trade_routes']) > 0:
safe = [_sanitize_profile_field(v, 80) for v in profile['trade_routes'] if v]
if safe:
parts.append(f"Trade routes: {', '.join(safe)}")
if profile.get('cargo_types') and len(profile['cargo_types']) > 0:
safe = [_sanitize_profile_field(v, 50) for v in profile['cargo_types'] if v]
if safe:
parts.append(f"Cargo types: {', '.join(safe)}")
if profile.get('vessels_of_interest') and len(profile['vessels_of_interest']) > 0:
safe = [_sanitize_profile_field(v, 50) for v in profile['vessels_of_interest'] if v]
if safe:
parts.append(f"Vessels of interest (fleet/watchlist): {', '.join(safe)}")
if profile.get('experience_years'):
parts.append(f"Experience: {profile['experience_years']} years")
if profile.get('home_port'):
parts.append(f"Home port: {_sanitize_profile_field(profile['home_port'], 80)}")
if profile.get('preferred_tonnage'):
parts.append(f"Preferred tonnage: {profile['preferred_tonnage']} t")
if profile.get('search_radius'):
parts.append(f"Search radius: {profile['search_radius']} NM")
if profile.get('notes'):
parts.append(f"Notes: {_sanitize_profile_field(profile['notes'], 200)}")
return '; '.join(parts)
# =============================================================================
# WALLET OPERATIONS (USDT TRC20) — keys encrypted at rest
# =============================================================================
def create_wallet(user_id: int, address: str, private_key_hex: str) -> dict:
encrypted_key = _encrypt_private_key(private_key_hex)
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO wallets (user_id, address, private_key_hex) VALUES (?, ?, ?)",
(user_id, address, encrypted_key)
)
conn.commit()
finally:
conn.close()
return get_user_wallet(user_id)
def get_user_wallet(user_id: int) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM wallets WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
if not row:
return None
wallet = dict(row)
wallet['private_key_hex'] = _decrypt_private_key(wallet.get('private_key_hex', ''))
return wallet
finally:
conn.close()
def get_wallet_by_address(address: str) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM wallets WHERE address = ?", (address,))
row = cursor.fetchone()
if not row:
return None
wallet = dict(row)
wallet['private_key_hex'] = _decrypt_private_key(wallet.get('private_key_hex', ''))
return wallet
finally:
conn.close()
# =============================================================================
# DEPOSIT OPERATIONS
# =============================================================================
def add_deposit(user_id: int, tx_id: str, amount: float, from_address: str = '') -> int:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO deposits (user_id, tx_id, amount, from_address) VALUES (?, ?, ?, ?)",
(user_id, tx_id, amount, from_address)
)
conn.commit()
deposit_id = cursor.lastrowid
return deposit_id
finally:
conn.close()
def is_deposit_processed(tx_id: str) -> bool:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT id FROM deposits WHERE tx_id = ?", (tx_id,))
row = cursor.fetchone()
return row is not None
finally:
conn.close()
def get_user_deposits(user_id: int, limit: int = 20) -> list:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT tx_id, amount, from_address, confirmed_at FROM deposits WHERE user_id = ? ORDER BY id DESC LIMIT ?",
(user_id, limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
# =============================================================================
# WITHDRAWAL OPERATIONS
# =============================================================================
def create_withdrawal(user_id: int, amount: float, to_address: str) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO withdrawals (user_id, amount, to_address) VALUES (?, ?, ?)",
(user_id, amount, to_address)
)
conn.commit()
wid = cursor.lastrowid
finally:
conn.close()
return get_withdrawal(wid)
def get_withdrawal(withdrawal_id: int) -> dict:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM withdrawals WHERE id = ?", (withdrawal_id,))
row = cursor.fetchone()
return dict(row) if row else None
finally:
conn.close()
def get_user_withdrawals(user_id: int, limit: int = 20) -> list:
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT id, amount, to_address, status, tx_id, created_at, processed_at FROM withdrawals WHERE user_id = ? ORDER BY id DESC LIMIT ?",
(user_id, limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def update_withdrawal_status(withdrawal_id: int, status: str, tx_id: str = None):
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"UPDATE withdrawals SET status = ?, tx_id = ?, processed_at = ? WHERE id = ?",
(status, tx_id, datetime.utcnow().isoformat(), withdrawal_id)
)
conn.commit()
finally:
conn.close()
# =============================================================================
# EQUASIS CACHE
# =============================================================================
EQUASIS_CACHE_HOURS = config.EQUASIS_CACHE_HOURS
EQUASIS_DAILY_LIMIT = config.EQUASIS_DAILY_LIMIT
def get_equasis_cache(cache_key: str, cache_type: str):
"""Get cached Equasis data if fresh enough. Returns parsed data or None."""
max_hours = EQUASIS_CACHE_HOURS.get(cache_type, 24)
since = (datetime.utcnow() - timedelta(hours=max_hours)).isoformat()
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT data FROM equasis_cache WHERE cache_key = ? AND cache_type = ? AND created_at > ? ORDER BY id DESC LIMIT 1",
(cache_key.lower(), cache_type, since)
)
row = cursor.fetchone()
if not row:
return None
try:
return json.loads(row[0])
except (json.JSONDecodeError, TypeError):
return None
finally:
conn.close()
def set_equasis_cache(cache_key: str, cache_type: str, data):
"""Cache Equasis query result."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO equasis_cache (cache_key, cache_type, data) VALUES (?, ?, ?)",
(cache_key.lower(), cache_type, json.dumps(data, ensure_ascii=False))
)
# Clean old entries (keep last 3 per key+type)
cursor.execute(
"DELETE FROM equasis_cache WHERE cache_key = ? AND cache_type = ? AND id NOT IN "
"(SELECT id FROM equasis_cache WHERE cache_key = ? AND cache_type = ? ORDER BY id DESC LIMIT 3)",
(cache_key.lower(), cache_type, cache_key.lower(), cache_type)
)
conn.commit()
finally:
conn.close()
def increment_equasis_counter() -> int:
"""Increment daily Equasis query counter. Returns current count."""
today = datetime.utcnow().strftime('%Y-%m-%d')
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT count FROM equasis_daily_counter WHERE date_str = ?", (today,))
row = cursor.fetchone()
if row:
new_count = row[0] + 1
cursor.execute("UPDATE equasis_daily_counter SET count = ? WHERE date_str = ?", (new_count, today))
else:
new_count = 1
cursor.execute("INSERT INTO equasis_daily_counter (date_str, count) VALUES (?, ?)", (today, 1))
# Clean old dates (keep last 7 days)
week_ago = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d')
cursor.execute("DELETE FROM equasis_daily_counter WHERE date_str < ?", (week_ago,))
conn.commit()
return new_count
finally:
conn.close()
def get_equasis_remaining() -> int:
"""Get remaining Equasis queries for today."""
today = datetime.utcnow().strftime('%Y-%m-%d')
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT count FROM equasis_daily_counter WHERE date_str = ?", (today,))
row = cursor.fetchone()
used = row[0] if row else 0
return max(0, EQUASIS_DAILY_LIMIT - used)
finally:
conn.close()
def get_vessels_needing_enrichment(mmsis: list, ttl_days: int = 7, limit: int = 40) -> list:
"""Return MMSIs that need Equasis enrichment (no companies or stale)."""
if not mmsis:
return []
conn = get_connection()
try:
cur = conn.cursor()
placeholders = ','.join(['?'] * len(mmsis))
cutoff = (datetime.utcnow() - timedelta(days=ttl_days)).isoformat()
# Stale known vessels
cur.execute(f"""
SELECT mmsi FROM vessels
WHERE mmsi IN ({placeholders})
AND (companies_json IS NULL OR updated_at < ?)
ORDER BY updated_at ASC
LIMIT ?
""", mmsis + [cutoff, limit])
stale = [row[0] for row in cur.fetchall()]
# Unknown MMSIs (not in vessels table at all)
cur.execute(f"SELECT mmsi FROM vessels WHERE mmsi IN ({placeholders})", mmsis)
known_set = {row[0] for row in cur.fetchall()}
unknown = [m for m in mmsis if m not in known_set]
return (unknown + stale)[:limit]
finally:
conn.close()
# =============================================================================
# SUBSCRIPTION PLANS
# =============================================================================
SUBSCRIPTION_PLANS = {
'free': {
'name': 'Free',
'price_monthly': 0,
'vessel_lookups_day': 10,
'route_calcs_day': 5,
'port_scans_day': 3,
'contact_unlock_price': 10.0,
},
'basic': {
'name': 'Basic',
'price_monthly': 29,
'vessel_lookups_day': 100,
'route_calcs_day': 50,
'port_scans_day': 20,
'contact_unlock_price': 5.0,
},
'pro': {
'name': 'Professional',
'price_monthly': 99,
'vessel_lookups_day': -1, # unlimited
'route_calcs_day': -1,
'port_scans_day': -1,
'contact_unlock_price': 0.0,
},
}
def get_user_plan(user_id: int) -> dict:
"""Get user's current subscription plan details."""
user = get_user_by_id(user_id)
if not user:
return SUBSCRIPTION_PLANS['free']
if user.get('is_admin'):
return {**SUBSCRIPTION_PLANS['pro'], 'name': 'Admin (unlimited)'}
plan_key = user.get('plan', 'free')
expires = user.get('plan_expires_at')
# Check if plan expired
if plan_key != 'free' and expires:
try:
exp_dt = datetime.fromisoformat(str(expires))
if datetime.utcnow() > exp_dt:
# Plan expired — downgrade to free
set_user_plan(user_id, 'free')
return SUBSCRIPTION_PLANS['free']
except (ValueError, TypeError):
pass
return SUBSCRIPTION_PLANS.get(plan_key, SUBSCRIPTION_PLANS['free'])
def set_user_plan(user_id: int, plan: str, days: int = 30):
"""Set user's subscription plan."""
conn = get_connection()
try:
cursor = conn.cursor()
if plan == 'free':
cursor.execute("UPDATE users SET plan = ?, plan_expires_at = NULL WHERE id = ?",
(plan, user_id))
else:
expires = (datetime.utcnow() + timedelta(days=days)).isoformat()
cursor.execute("UPDATE users SET plan = ?, plan_expires_at = ? WHERE id = ?",
(plan, expires, user_id))
conn.commit()
finally:
conn.close()
# =============================================================================
# STATS
# =============================================================================
def get_stats() -> dict:
"""Get database statistics."""
conn = get_connection()
try:
cursor = conn.cursor()
stats = {}
cursor.execute("SELECT COUNT(*) FROM vessels")
stats['vessels'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM positions")
stats['positions'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM contacts")
stats['contacts'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM port_calls")
stats['port_calls'] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM users")
stats['users'] = cursor.fetchone()[0]
stats['db_backend'] = 'PostgreSQL' if USE_POSTGRES else 'SQLite'
return stats
finally:
conn.close()
# =============================================================================
# QUERY ANALYTICS
# =============================================================================
def log_query(user_id: int = None, query_text: str = '', query_type: str = None,
tool_used: str = None, response_time_ms: int = None,
input_tokens: int = None, output_tokens: int = None,
cached_tokens: int = None, cost_usd: float = None,
iterations: int = None):
"""Log a user query for analytics with optional token/cost tracking."""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO query_log (user_id, query_text, query_type, tool_used, response_time_ms,
input_tokens, output_tokens, cached_tokens, cost_usd, iterations)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (user_id, query_text[:500], query_type, tool_used, response_time_ms,
input_tokens, output_tokens, cached_tokens, cost_usd, iterations))
conn.commit()
except Exception:
pass # Don't fail on analytics
finally:
if conn:
conn.close()
def get_query_stats(days: int = 7) -> dict:
"""Get query analytics for the last N days."""
conn = get_connection()
try:
cursor = conn.cursor()
since = (datetime.utcnow() - timedelta(days=days)).isoformat()
cursor.execute("SELECT COUNT(*) FROM query_log WHERE created_at > ?", (since,))
total = cursor.fetchone()[0]
cursor.execute("""
SELECT tool_used, COUNT(*) as cnt FROM query_log
WHERE created_at > ? AND tool_used IS NOT NULL
GROUP BY tool_used ORDER BY cnt DESC
""", (since,))
by_tool = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("""
SELECT query_type, COUNT(*) as cnt FROM query_log
WHERE created_at > ? AND query_type IS NOT NULL
GROUP BY query_type ORDER BY cnt DESC
""", (since,))
by_type = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("""
SELECT AVG(response_time_ms) FROM query_log
WHERE created_at > ? AND response_time_ms IS NOT NULL
""", (since,))
avg_time = cursor.fetchone()[0]
return {
'period_days': days,
'total_queries': total,
'by_tool': by_tool,
'by_type': by_type,
'avg_response_ms': round(avg_time) if avg_time else None,
}
finally:
conn.close()
def get_cost_stats(days: int = 30) -> dict:
"""Get Claude API cost analytics from query_log."""
conn = get_connection()
try:
cursor = conn.cursor()
now = datetime.utcnow()
since = (now - timedelta(days=days)).isoformat()
today_str = now.strftime('%Y-%m-%d')
week_ago = (now - timedelta(days=7)).isoformat()
month_ago = (now - timedelta(days=30)).isoformat()
# Overall totals
cursor.execute("""
SELECT COUNT(*), COALESCE(SUM(cost_usd), 0),
COALESCE(SUM(input_tokens), 0), COALESCE(SUM(output_tokens), 0),
COALESCE(SUM(cached_tokens), 0), COALESCE(AVG(iterations), 0)
FROM query_log WHERE created_at > ? AND cost_usd IS NOT NULL
""", (since,))
row = cursor.fetchone()
total_queries = row[0] or 0
total_cost = round(row[1], 4)
total_input = row[2] or 0
total_output = row[3] or 0
total_cached = row[4] or 0
avg_iterations = round(row[5], 1) if row[5] else 0
# Cost by period
cursor.execute("SELECT COALESCE(SUM(cost_usd), 0) FROM query_log WHERE created_at >= ? AND cost_usd IS NOT NULL", (today_str,))
cost_today = round(cursor.fetchone()[0], 4)
cursor.execute("SELECT COALESCE(SUM(cost_usd), 0) FROM query_log WHERE created_at >= ? AND cost_usd IS NOT NULL", (week_ago,))
cost_week = round(cursor.fetchone()[0], 4)
cursor.execute("SELECT COALESCE(SUM(cost_usd), 0) FROM query_log WHERE created_at >= ? AND cost_usd IS NOT NULL", (month_ago,))
cost_month = round(cursor.fetchone()[0], 4)
# Top users by cost
cursor.execute("""
SELECT q.user_id, COALESCE(u.email, 'anonymous'), COUNT(*) as queries,
COALESCE(SUM(q.cost_usd), 0) as total_cost
FROM query_log q LEFT JOIN users u ON q.user_id = u.id
WHERE q.created_at > ? AND q.cost_usd IS NOT NULL
GROUP BY q.user_id, u.email ORDER BY total_cost DESC LIMIT 10
""", (since,))
top_users = [{'user_id': r[0], 'email': r[1], 'queries': r[2],
'total_cost': round(r[3], 4)} for r in cursor.fetchall()]
# Daily cost trend
cursor.execute("""
SELECT DATE(created_at) as day, COUNT(*) as queries,
COALESCE(SUM(cost_usd), 0) as cost,
COALESCE(AVG(input_tokens), 0) as avg_in,
COALESCE(AVG(output_tokens), 0) as avg_out
FROM query_log
WHERE created_at > ? AND cost_usd IS NOT NULL
GROUP BY DATE(created_at) ORDER BY day
""", (since,))
daily_costs = [{'date': r[0], 'queries': r[1], 'cost': round(r[2], 4),
'avg_input': round(r[3]), 'avg_output': round(r[4])} for r in cursor.fetchall()]
# Cache efficiency
cache_rate = round(total_cached * 100.0 / (total_input + total_cached), 1) if (total_input + total_cached) > 0 else 0
return {
'period_days': days,
'total_queries': total_queries,
'total_cost_usd': total_cost,
'avg_cost_per_query': round(total_cost / total_queries, 4) if total_queries else 0,
'cost_today': cost_today,
'cost_week': cost_week,
'cost_month': cost_month,
'top_users': top_users,
'daily_costs': daily_costs,
'cache_stats': {
'total_input_tokens': total_input,
'total_cached_tokens': total_cached,
'cache_hit_rate_pct': cache_rate,
},
'efficiency': {
'avg_iterations': avg_iterations,
'avg_input_per_query': round(total_input / total_queries) if total_queries else 0,
'avg_output_per_query': round(total_output / total_queries) if total_queries else 0,
},
}
finally:
conn.close()
# =============================================================================
# MOLTBOOK BOT
# =============================================================================
def has_commented_post(post_id: str) -> bool:
"""Check if we already commented on a Moltbook post."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM moltbook_comments WHERE post_id = ?", (str(post_id),))
result = cursor.fetchone()
return result is not None
finally:
conn.close()
def save_moltbook_comment(post_id: str, title: str, body: str):
"""Save record of a Moltbook comment."""
conn = get_connection()
try:
cursor = conn.cursor()
if USE_POSTGRES:
cursor.execute(
"INSERT INTO moltbook_comments (post_id, post_title, comment_body) VALUES (?, ?, ?) ON CONFLICT DO NOTHING",
(str(post_id), title[:500] if title else '', body[:2000] if body else '')
)
else:
cursor.execute(
"INSERT OR IGNORE INTO moltbook_comments (post_id, post_title, comment_body) VALUES (?, ?, ?)",
(str(post_id), title[:500] if title else '', body[:2000] if body else '')
)
conn.commit()
except Exception as e:
logger.error(f"DB error saving moltbook comment: {e}")
finally:
conn.close()
def get_moltbook_stats() -> dict:
"""Get Moltbook bot statistics."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM moltbook_comments")
total = cursor.fetchone()[0]
cutoff = (datetime.utcnow() - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("SELECT COUNT(*) FROM moltbook_comments WHERE commented_at > ?", (cutoff,))
last_24h = cursor.fetchone()[0]
return {'total_comments': total, 'last_24h': last_24h}
finally:
conn.close()
# =============================================================================
# VESSEL WATCHES (Telegram bot: /watch command)
# =============================================================================
def add_vessel_watch(chat_id: int, mmsi: str, vessel_name: str, watch_type: str,
destination_port: str = None, destination_lat: float = None,
destination_lon: float = None, port_radius_nm: float = 15,
last_status: str = None) -> int:
"""Create a vessel watch. Returns watch ID."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO vessel_watches (chat_id, mmsi, vessel_name, watch_type, "
"destination_port, destination_lat, destination_lon, port_radius_nm, last_status) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(chat_id, mmsi, vessel_name, watch_type,
destination_port, destination_lat, destination_lon, port_radius_nm, last_status)
)
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_active_watches(chat_id: int = None) -> list:
"""Get active (not notified) watches. If chat_id given, filter by chat."""
conn = get_connection()
try:
cursor = conn.cursor()
if chat_id is not None:
cursor.execute(
"SELECT * FROM vessel_watches WHERE notified = 0 AND chat_id = ? ORDER BY id",
(chat_id,)
)
else:
cursor.execute("SELECT * FROM vessel_watches WHERE notified = 0 ORDER BY id")
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
def mark_watch_notified(watch_id: int):
"""Mark a watch as notified (fired)."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute("UPDATE vessel_watches SET notified = 1 WHERE id = ?", (watch_id,))
conn.commit()
finally:
conn.close()
def delete_watch(watch_id: int, chat_id: int) -> bool:
"""Delete a watch (only if it belongs to chat_id). Returns True if deleted."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"DELETE FROM vessel_watches WHERE id = ? AND chat_id = ?",
(watch_id, chat_id)
)
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()
def update_watch_position(watch_id: int, last_status: str = None,
last_lat: float = None, last_lon: float = None):
"""Update watch with latest position/status data."""
conn = get_connection()
try:
cursor = conn.cursor()
cursor.execute(
"UPDATE vessel_watches SET last_status = ?, last_lat = ?, last_lon = ?, "
"last_check = CURRENT_TIMESTAMP WHERE id = ?",
(last_status, last_lat, last_lon, watch_id)
)
conn.commit()
finally:
conn.close()
if __name__ == "__main__":
init_db()
logger.info("Stats: %s", get_stats())
logger.info("Equasis remaining: %s", get_equasis_remaining())
logger.info("Moltbook stats: %s", get_moltbook_stats())
def get_vessels_heading_to_staging(destination_patterns: list, vessel_type: str = None, limit: int = 50) -> list:
"""Query mt_bulk_staging for vessels whose AIS destination matches patterns."""
if not destination_patterns:
return []
conn = get_connection()
try:
cursor = conn.cursor()
like_clauses = []
params = []
for pat in destination_patterns:
like_clauses.append("UPPER(destination) LIKE ?")
params.append(f"%{pat.upper()}%")
where = " OR ".join(like_clauses)
type_filter = ""
if vessel_type:
type_map = {
'tanker': ('tanker',), 'bulk': ('bulk', 'general'),
'container': ('container',), 'general': ('general', 'cargo'),
}
types = type_map.get(vessel_type.lower(), (vessel_type.lower(),))
type_clauses = " OR ".join(["LOWER(type_category) = ?" for _ in types])
type_filter = f"AND ({type_clauses})"
params.extend(types)
params.append(limit)
cursor.execute(f"""
SELECT mmsi, name, flag, dwt, type_category, gt_shiptype,
lat, lon, speed, course, destination, imo,
owner, operator, year_built, scraped_at
FROM mt_bulk_staging
WHERE ({where}) AND destination IS NOT NULL AND destination != ''
AND lat IS NOT NULL AND lon IS NOT NULL
{type_filter}
ORDER BY scraped_at DESC NULLS LAST
LIMIT ?
""", params)
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"get_vessels_heading_to_staging error: {e}")
return []
finally:
conn.close()