3784 lines
145 KiB
Python
3784 lines
145 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Maritime Database — Schema and Operations
|
|
SeaFare_Montana Data Layer
|
|
|
|
Supports PostgreSQL (production) and SQLite (development).
|
|
Wallet private keys are encrypted at rest via Fernet (AES-128-CBC).
|
|
|
|
Ɉ MONTANA PROTOCOL — ML-DSA-65 (FIPS 204)
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import sqlite3
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger('maritime_db')
|
|
|
|
import config
|
|
|
|
# =============================================================================
|
|
# DATABASE BACKEND DETECTION
|
|
# =============================================================================
|
|
|
|
DATABASE_URL = (os.environ.get(config.ENV_DATABASE_URL) or '').strip() or None
|
|
USE_POSTGRES = bool(DATABASE_URL)
|
|
PG_FALLBACK_ERROR = None # Stores the reason PG failed (for /health diagnostics)
|
|
|
|
if USE_POSTGRES:
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import psycopg2.pool
|
|
import psycopg2.extensions
|
|
from psycopg2.pool import ThreadedConnectionPool
|
|
|
|
# Make psycopg2 return strings instead of Python datetime/Decimal objects.
|
|
# This ensures all query results are JSON-serializable without custom encoders.
|
|
def _cast_to_str(value, cursor):
|
|
return str(value) if value is not None else None
|
|
|
|
def _cast_numeric(value, cursor):
|
|
return float(value) if value is not None else None
|
|
|
|
# OIDs: 1114=TIMESTAMP, 1184=TIMESTAMPTZ, 1082=DATE, 1083=TIME, 1266=TIMETZ
|
|
for _oid, _name in [(1114, 'TS'), (1184, 'TSTZ'), (1082, 'DT'), (1083, 'TM'), (1266, 'TMTZ')]:
|
|
psycopg2.extensions.register_type(
|
|
psycopg2.extensions.new_type((_oid,), f'_STR_{_name}', _cast_to_str))
|
|
# OID 1700=NUMERIC → float
|
|
psycopg2.extensions.register_type(
|
|
psycopg2.extensions.new_type((1700,), '_FLOAT_NUM', _cast_numeric))
|
|
|
|
DB_PATH = Path(__file__).parent / "data" / "maritime.db"
|
|
|
|
# PostgreSQL connection pool (lazy-initialized on first get_connection() call)
|
|
_pg_pool = None
|
|
_pg_pool_lock = threading.Lock()
|
|
|
|
|
|
# =============================================================================
|
|
# POSTGRESQL ↔ SQLITE COMPATIBILITY WRAPPERS
|
|
# =============================================================================
|
|
|
|
class _PgCursorWrapper:
|
|
"""Wraps psycopg2 cursor: ? → %s placeholders, lastrowid via RETURNING."""
|
|
|
|
def __init__(self, cursor):
|
|
self._cursor = cursor
|
|
self._last_id = None
|
|
|
|
def execute(self, sql, params=None):
|
|
sql = sql.replace('?', '%s')
|
|
stripped = sql.strip().upper()
|
|
# Auto-add RETURNING id for plain INSERT statements
|
|
if stripped.startswith('INSERT') and 'RETURNING' not in stripped:
|
|
sql = sql.rstrip().rstrip(';') + ' RETURNING id'
|
|
self._cursor.execute(sql, params or ())
|
|
try:
|
|
row = self._cursor.fetchone()
|
|
self._last_id = row[0] if row else None
|
|
except Exception:
|
|
self._last_id = None
|
|
else:
|
|
self._last_id = None
|
|
self._cursor.execute(sql, params or ())
|
|
|
|
def fetchone(self):
|
|
return self._cursor.fetchone()
|
|
|
|
def fetchall(self):
|
|
return self._cursor.fetchall()
|
|
|
|
@property
|
|
def lastrowid(self):
|
|
return self._last_id
|
|
|
|
@property
|
|
def rowcount(self):
|
|
return self._cursor.rowcount
|
|
|
|
|
|
class _PgConnectionWrapper:
|
|
"""Wraps psycopg2 connection for SQLite-compatible API.
|
|
When backed by a connection pool, close() returns the connection
|
|
to the pool instead of destroying it."""
|
|
|
|
def __init__(self, conn, pool=None):
|
|
self._conn = conn
|
|
self._pool = pool
|
|
|
|
def cursor(self):
|
|
return _PgCursorWrapper(
|
|
self._conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
)
|
|
|
|
def execute(self, sql, params=None):
|
|
cursor = self.cursor()
|
|
cursor.execute(sql, params)
|
|
return cursor
|
|
|
|
def commit(self):
|
|
self._conn.commit()
|
|
|
|
def rollback(self):
|
|
self._conn.rollback()
|
|
|
|
def close(self):
|
|
if self._pool is not None:
|
|
# Rollback any open transaction before returning to pool
|
|
# (prevents "idle in transaction" state in pool)
|
|
try:
|
|
self._conn.rollback()
|
|
except Exception:
|
|
pass
|
|
self._pool.putconn(self._conn)
|
|
else:
|
|
self._conn.close()
|
|
|
|
|
|
def _make_external_url(url):
|
|
"""Convert Render internal DB URL to external if internal DNS fails.
|
|
Internal: postgresql://user:pass@dpg-xxx-a/db
|
|
External: postgresql://user:pass@dpg-xxx-a.oregon-postgres.render.com/db?sslmode=require
|
|
"""
|
|
from urllib.parse import urlparse, urlunparse
|
|
parsed = urlparse(url)
|
|
host = parsed.hostname or ''
|
|
# Only convert Render internal hostnames (dpg-xxx-a pattern without dots)
|
|
if host.startswith('dpg-') and '.' not in host:
|
|
# Try common Render regions
|
|
for region in ['oregon-postgres.render.com', 'ohio-postgres.render.com',
|
|
'frankfurt-postgres.render.com', 'singapore-postgres.render.com']:
|
|
ext_host = f"{host}.{region}"
|
|
new_netloc = parsed.netloc.replace(host, ext_host)
|
|
ext_url = urlunparse(parsed._replace(netloc=new_netloc))
|
|
# External connections require SSL
|
|
if '?' not in ext_url:
|
|
ext_url += '?sslmode=require'
|
|
elif 'sslmode' not in ext_url:
|
|
ext_url += '&sslmode=require'
|
|
try:
|
|
import socket
|
|
socket.getaddrinfo(ext_host, parsed.port or 5432)
|
|
logger.info(f"Resolved external PostgreSQL host: {ext_host}")
|
|
return ext_url
|
|
except socket.gaierror:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _fallback_to_sqlite(reason='unknown'):
|
|
"""Switch from PostgreSQL to SQLite when PG is completely unavailable.
|
|
SQLite data is ephemeral on Render (lost on redeploy) but keeps the app functional."""
|
|
global USE_POSTGRES, PG_FALLBACK_ERROR
|
|
USE_POSTGRES = False
|
|
PG_FALLBACK_ERROR = str(reason)
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
logger.warning("FALLBACK: Switched to SQLite mode (PostgreSQL unavailable)")
|
|
try:
|
|
init_db()
|
|
logger.info("SQLite database initialized successfully")
|
|
except sqlite3.DatabaseError as e:
|
|
if any(x in str(e).lower() for x in ('malformed', 'corrupt', 'disk i/o')):
|
|
logger.warning(f"SQLite corrupted during fallback: {e}. Recreating...")
|
|
try:
|
|
DB_PATH.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
init_db()
|
|
logger.info("SQLite recreated after corruption during fallback")
|
|
except Exception as e2:
|
|
logger.error(f"SQLite re-init also failed: {e2}")
|
|
else:
|
|
logger.error(f"SQLite init failed: {e}")
|
|
except Exception as e:
|
|
logger.error(f"SQLite init failed: {e}")
|
|
|
|
|
|
def _get_pg_pool():
|
|
"""Lazily initialize the PostgreSQL ThreadedConnectionPool (thread-safe)."""
|
|
global _pg_pool
|
|
if _pg_pool is not None:
|
|
return _pg_pool
|
|
with _pg_pool_lock:
|
|
# Double-check after acquiring lock
|
|
if _pg_pool is not None:
|
|
return _pg_pool
|
|
url = DATABASE_URL
|
|
if url.startswith('postgres://'):
|
|
url = url.replace('postgres://', 'postgresql://', 1)
|
|
# Detect pooler endpoints (Neon, Supabase) — they don't support startup options
|
|
is_pooler = '-pooler' in url or 'pgbouncer' in url
|
|
pool_opts = {} if is_pooler else {'options': '-c statement_timeout=10000'}
|
|
# Try internal URL first, fall back to external if DNS fails
|
|
try:
|
|
_pg_pool = ThreadedConnectionPool(
|
|
minconn=config.PG_POOL_MIN_CONN,
|
|
maxconn=config.PG_POOL_MAX_CONN,
|
|
dsn=url,
|
|
connect_timeout=5,
|
|
**pool_opts,
|
|
)
|
|
return _pg_pool
|
|
except psycopg2.OperationalError as e:
|
|
err_msg = str(e)
|
|
if 'unsupported startup parameter' in err_msg and not is_pooler:
|
|
# Retry without options for pooler compatibility
|
|
logger.warning("Retrying PostgreSQL without startup options (pooler detected)")
|
|
_pg_pool = ThreadedConnectionPool(
|
|
minconn=config.PG_POOL_MIN_CONN,
|
|
maxconn=config.PG_POOL_MAX_CONN,
|
|
dsn=url,
|
|
connect_timeout=5,
|
|
)
|
|
return _pg_pool
|
|
if 'could not translate host name' in err_msg or 'Name or service not known' in err_msg:
|
|
logger.warning(f"Internal PostgreSQL DNS failed, trying external URL: {e}")
|
|
ext_url = _make_external_url(url)
|
|
if ext_url:
|
|
try:
|
|
_pg_pool = ThreadedConnectionPool(
|
|
minconn=config.PG_POOL_MIN_CONN,
|
|
maxconn=config.PG_POOL_MAX_CONN,
|
|
dsn=ext_url,
|
|
connect_timeout=10,
|
|
)
|
|
logger.info("Connected to PostgreSQL via external URL")
|
|
return _pg_pool
|
|
except psycopg2.OperationalError as ext_e:
|
|
logger.error(f"External PostgreSQL also failed: {ext_e}")
|
|
raise
|
|
|
|
|
|
def close_pool():
|
|
"""Close the PostgreSQL connection pool (if it exists).
|
|
Safe to call on SQLite (no-op). Used during graceful shutdown."""
|
|
global _pg_pool
|
|
with _pg_pool_lock:
|
|
if _pg_pool is not None:
|
|
try:
|
|
_pg_pool.closeall()
|
|
logger.info("PostgreSQL connection pool closed")
|
|
except Exception as e:
|
|
logger.error(f"Error closing PostgreSQL pool: {e}")
|
|
finally:
|
|
_pg_pool = None
|
|
else:
|
|
logger.info("No connection pool to close (SQLite mode or pool not initialized)")
|
|
|
|
|
|
def _reset_pg_pool():
|
|
"""Reset the PostgreSQL connection pool (e.g., after OperationalError)."""
|
|
global _pg_pool
|
|
with _pg_pool_lock:
|
|
if _pg_pool is not None:
|
|
try:
|
|
_pg_pool.closeall()
|
|
except Exception:
|
|
pass
|
|
_pg_pool = None
|
|
logger.warning("PostgreSQL connection pool reset (will reconnect on next query)")
|
|
|
|
|
|
def get_connection():
|
|
"""Get database connection (PostgreSQL in production, SQLite locally).
|
|
PostgreSQL connections come from a ThreadedConnectionPool (min=1, max=5).
|
|
If PostgreSQL is completely unreachable, auto-falls back to SQLite.
|
|
Callers MUST call .close() to return the connection to the pool."""
|
|
if USE_POSTGRES:
|
|
for attempt in range(2):
|
|
try:
|
|
pool = _get_pg_pool()
|
|
except Exception as e:
|
|
logger.error(f"PostgreSQL pool creation failed (attempt {attempt+1}): {e}")
|
|
if attempt == 0:
|
|
_reset_pg_pool()
|
|
continue
|
|
# All PostgreSQL attempts exhausted — fall back to SQLite
|
|
logger.error("PostgreSQL completely unreachable, falling back to SQLite")
|
|
_fallback_to_sqlite(reason=f"pool creation failed: {e}")
|
|
return _get_sqlite_connection()
|
|
try:
|
|
conn = pool.getconn()
|
|
# Verify connection is alive (fast check)
|
|
conn.cursor().execute("SELECT 1")
|
|
conn.rollback() # Close the implicit transaction from health check
|
|
return _PgConnectionWrapper(conn, pool=pool)
|
|
except psycopg2.pool.PoolError as e:
|
|
if attempt == 0:
|
|
_reset_pg_pool()
|
|
continue
|
|
logger.error("PostgreSQL pool exhausted, falling back to SQLite")
|
|
_fallback_to_sqlite(reason=f"pool exhausted: {e}")
|
|
return _get_sqlite_connection()
|
|
except (psycopg2.OperationalError, psycopg2.InterfaceError) as e:
|
|
logger.warning(f"PostgreSQL connection failed (attempt {attempt+1}): {e}")
|
|
try:
|
|
pool.putconn(conn, close=True)
|
|
except Exception:
|
|
pass
|
|
if attempt == 0:
|
|
_reset_pg_pool()
|
|
continue
|
|
# All retries exhausted — fall back to SQLite
|
|
logger.error("PostgreSQL connection failed after retries, falling back to SQLite")
|
|
_fallback_to_sqlite(reason=f"connection failed: {e}")
|
|
return _get_sqlite_connection()
|
|
return _get_sqlite_connection()
|
|
|
|
|
|
def _get_sqlite_connection():
|
|
"""Get a SQLite connection (used as primary in dev or fallback in production).
|
|
Auto-heals corrupted databases by deleting and recreating them."""
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
# Quick integrity check — detect corruption early
|
|
try:
|
|
conn.execute("SELECT 1")
|
|
except sqlite3.DatabaseError as e:
|
|
if any(x in str(e).lower() for x in ('malformed', 'corrupt', 'disk i/o')):
|
|
logger.warning(f"SQLite DB corrupted: {e}. Deleting and recreating...")
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
DB_PATH.unlink(missing_ok=True)
|
|
except Exception as del_e:
|
|
logger.error(f"Failed to delete corrupted DB: {del_e}")
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
init_db()
|
|
logger.info("SQLite database recreated after corruption")
|
|
except Exception as init_e:
|
|
logger.error(f"Failed to reinitialize DB: {init_e}")
|
|
else:
|
|
raise
|
|
return conn
|
|
|
|
|
|
def _adapt_schema(sql):
|
|
"""Adapt CREATE TABLE SQL for PostgreSQL."""
|
|
if not USE_POSTGRES:
|
|
return sql
|
|
sql = re.sub(
|
|
r'INTEGER\s+PRIMARY\s+KEY\s+AUTOINCREMENT',
|
|
'SERIAL PRIMARY KEY',
|
|
sql,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
return sql
|
|
|
|
|
|
# =============================================================================
|
|
# WALLET KEY ENCRYPTION (Fernet / AES-128-CBC)
|
|
# =============================================================================
|
|
|
|
def _get_encryption_key():
|
|
"""Derive Fernet key from SECRET_KEY env var."""
|
|
secret = os.environ.get(config.ENV_SECRET_KEY) or os.environ.get(config.ENV_ANTHROPIC_API_KEY)
|
|
if not secret:
|
|
raise RuntimeError("SECRET_KEY or ANTHROPIC_API_KEY must be set for wallet encryption")
|
|
key = hashlib.sha256(secret.encode()).digest()
|
|
return base64.urlsafe_b64encode(key)
|
|
|
|
|
|
def _encrypt_private_key(plaintext: str) -> str:
|
|
"""Encrypt private key for storage. Returns 'enc:<ciphertext>'."""
|
|
from cryptography.fernet import Fernet
|
|
f = Fernet(_get_encryption_key())
|
|
return 'enc:' + f.encrypt(plaintext.encode()).decode()
|
|
|
|
|
|
def _decrypt_private_key(stored: str) -> str:
|
|
"""Decrypt private key from storage. Handles legacy unencrypted keys."""
|
|
if not stored or not stored.startswith('enc:'):
|
|
return stored # Legacy unencrypted key
|
|
from cryptography.fernet import Fernet
|
|
f = Fernet(_get_encryption_key())
|
|
return f.decrypt(stored[4:].encode()).decode()
|
|
|
|
|
|
# =============================================================================
|
|
# SCHEMA INITIALIZATION
|
|
# =============================================================================
|
|
|
|
def init_db():
|
|
"""Initialize database schema."""
|
|
conn = get_connection()
|
|
try:
|
|
if not USE_POSTGRES:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Vessels table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS vessels (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
mmsi TEXT,
|
|
imo TEXT,
|
|
name TEXT,
|
|
type TEXT,
|
|
type_code INTEGER,
|
|
flag TEXT,
|
|
flag_code TEXT,
|
|
callsign TEXT,
|
|
length REAL,
|
|
width REAL,
|
|
draught REAL,
|
|
gross_tonnage INTEGER,
|
|
deadweight INTEGER,
|
|
year_built INTEGER,
|
|
status TEXT,
|
|
owner TEXT,
|
|
owner_country TEXT,
|
|
operator TEXT,
|
|
operator_country TEXT,
|
|
companies_json TEXT,
|
|
engine_type TEXT,
|
|
speed_max REAL,
|
|
website TEXT,
|
|
beneficial_owner TEXT,
|
|
beneficial_owner_country TEXT,
|
|
registered_owner TEXT,
|
|
registered_owner_country TEXT,
|
|
commercial_manager TEXT,
|
|
commercial_manager_country TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
source TEXT DEFAULT 'internal'
|
|
)
|
|
"""))
|
|
|
|
# Migration: add missing columns
|
|
if USE_POSTGRES:
|
|
cursor.execute(
|
|
"SELECT column_name FROM information_schema.columns WHERE table_name = 'vessels'"
|
|
)
|
|
existing_cols = {row[0] for row in cursor.fetchall()}
|
|
else:
|
|
cursor.execute("PRAGMA table_info(vessels)")
|
|
existing_cols = {row[1] for row in cursor.fetchall()}
|
|
|
|
for col_name, col_def in [('companies_json', 'TEXT'), ('status', 'TEXT')]:
|
|
if col_name not in existing_cols:
|
|
try:
|
|
if USE_POSTGRES:
|
|
cursor.execute(f"ALTER TABLE vessels ADD COLUMN IF NOT EXISTS {col_name} {col_def}")
|
|
else:
|
|
cursor.execute(f"ALTER TABLE vessels ADD COLUMN {col_name} {col_def}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Positions table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS positions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
mmsi TEXT NOT NULL,
|
|
latitude REAL NOT NULL,
|
|
longitude REAL NOT NULL,
|
|
speed REAL,
|
|
course REAL,
|
|
heading REAL,
|
|
status TEXT,
|
|
destination TEXT,
|
|
eta TEXT,
|
|
timestamp TIMESTAMP NOT NULL,
|
|
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Contacts table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS contacts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
type TEXT NOT NULL,
|
|
company_name TEXT NOT NULL,
|
|
country TEXT,
|
|
city TEXT,
|
|
address TEXT,
|
|
phone TEXT,
|
|
email TEXT,
|
|
website TEXT,
|
|
contact_person TEXT,
|
|
notes TEXT,
|
|
vessels_mmsi TEXT,
|
|
source TEXT,
|
|
verified INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Port calls
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS port_calls (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
mmsi TEXT NOT NULL,
|
|
port_name TEXT NOT NULL,
|
|
port_code TEXT,
|
|
country TEXT,
|
|
arrival TIMESTAMP,
|
|
departure TIMESTAMP,
|
|
berth TEXT,
|
|
cargo_type TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Demurrage records
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS demurrage (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
mmsi TEXT NOT NULL,
|
|
port_name TEXT NOT NULL,
|
|
agreed_days REAL NOT NULL,
|
|
actual_days REAL NOT NULL,
|
|
daily_rate REAL,
|
|
currency TEXT DEFAULT 'USD',
|
|
status TEXT DEFAULT 'pending',
|
|
notes TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Users table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
email TEXT NOT NULL UNIQUE,
|
|
password_hash TEXT NOT NULL,
|
|
name TEXT NOT NULL DEFAULT '',
|
|
lang TEXT NOT NULL DEFAULT 'en',
|
|
balance REAL NOT NULL DEFAULT 0.0,
|
|
is_admin INTEGER NOT NULL DEFAULT 0,
|
|
plan TEXT NOT NULL DEFAULT 'free',
|
|
plan_expires_at TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_login TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Migration: add new columns to existing users table
|
|
if USE_POSTGRES:
|
|
cursor.execute(
|
|
"SELECT column_name FROM information_schema.columns WHERE table_name = 'users'"
|
|
)
|
|
user_cols = {row[0] for row in cursor.fetchall()}
|
|
else:
|
|
cursor.execute("PRAGMA table_info(users)")
|
|
user_cols = {row[1] for row in cursor.fetchall()}
|
|
|
|
for col_name, col_def in [
|
|
('is_admin', "INTEGER NOT NULL DEFAULT 0"),
|
|
('plan', "TEXT NOT NULL DEFAULT 'free'"),
|
|
('plan_expires_at', "TIMESTAMP"),
|
|
('telegram_chat_id', "INTEGER"),
|
|
]:
|
|
if col_name not in user_cols:
|
|
try:
|
|
if USE_POSTGRES:
|
|
cursor.execute(f"ALTER TABLE users ADD COLUMN IF NOT EXISTS {col_name} {col_def}")
|
|
else:
|
|
cursor.execute(f"ALTER TABLE users ADD COLUMN {col_name} {col_def}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Sessions table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
token TEXT NOT NULL UNIQUE,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
expires_at TIMESTAMP NOT NULL
|
|
)
|
|
"""))
|
|
|
|
# Chat history table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS chat_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
role TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Migration: user_profiles needs 'id' column for PG wrapper RETURNING id
|
|
if USE_POSTGRES:
|
|
try:
|
|
cursor.execute("""
|
|
DO $$ BEGIN
|
|
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='user_profiles')
|
|
AND NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='user_profiles' AND column_name='id')
|
|
THEN
|
|
ALTER TABLE user_profiles ADD COLUMN id SERIAL;
|
|
ALTER TABLE user_profiles DROP CONSTRAINT user_profiles_pkey;
|
|
ALTER TABLE user_profiles ADD PRIMARY KEY (id);
|
|
ALTER TABLE user_profiles ADD CONSTRAINT user_profiles_user_id_key UNIQUE (user_id);
|
|
END IF;
|
|
END $$;
|
|
""")
|
|
except Exception:
|
|
pass
|
|
|
|
# User profiles (maritime questionnaire)
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS user_profiles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER UNIQUE NOT NULL,
|
|
company_name TEXT NOT NULL DEFAULT '',
|
|
role TEXT NOT NULL DEFAULT '',
|
|
fleet_size INTEGER DEFAULT 0,
|
|
vessel_types TEXT NOT NULL DEFAULT '[]',
|
|
trade_routes TEXT NOT NULL DEFAULT '[]',
|
|
cargo_types TEXT NOT NULL DEFAULT '[]',
|
|
vessels_of_interest TEXT NOT NULL DEFAULT '[]',
|
|
experience_years INTEGER DEFAULT 0,
|
|
phone TEXT NOT NULL DEFAULT '',
|
|
telegram TEXT NOT NULL DEFAULT '',
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Migration: telegram_links needs 'id' column for PG wrapper RETURNING id
|
|
# Table only holds temp codes (30min TTL), safe to recreate
|
|
if USE_POSTGRES:
|
|
try:
|
|
cursor.execute("""
|
|
DO $$ BEGIN
|
|
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='telegram_links')
|
|
AND NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='telegram_links' AND column_name='id')
|
|
THEN
|
|
DROP TABLE telegram_links;
|
|
END IF;
|
|
END $$;
|
|
""")
|
|
except Exception:
|
|
pass
|
|
|
|
# Telegram account linking codes
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS telegram_links (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
code TEXT UNIQUE NOT NULL,
|
|
user_id INTEGER NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
expires_at TIMESTAMP NOT NULL
|
|
)
|
|
"""))
|
|
|
|
# Unique index on telegram_chat_id
|
|
try:
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_telegram_chat_id ON users(telegram_chat_id)")
|
|
except Exception:
|
|
pass
|
|
|
|
# Wallets table (private keys encrypted via Fernet)
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS wallets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL UNIQUE,
|
|
address TEXT NOT NULL UNIQUE,
|
|
private_key_hex TEXT NOT NULL,
|
|
network TEXT NOT NULL DEFAULT 'trc20',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Deposits table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS deposits (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
tx_id TEXT NOT NULL UNIQUE,
|
|
amount REAL NOT NULL,
|
|
from_address TEXT NOT NULL DEFAULT '',
|
|
confirmed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Withdrawals table
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS withdrawals (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
amount REAL NOT NULL,
|
|
to_address TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
tx_id TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
processed_at TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Service charges — revenue tracking
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS service_charges (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
service TEXT NOT NULL,
|
|
amount REAL NOT NULL,
|
|
details TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Purchased contacts
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS purchased_contacts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
contact_data TEXT NOT NULL,
|
|
query TEXT,
|
|
contact_type TEXT,
|
|
amount_paid REAL NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Port vessel cache
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS port_vessel_cache (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
port_key TEXT NOT NULL,
|
|
vessel_data TEXT NOT NULL,
|
|
vessel_count INTEGER DEFAULT 0,
|
|
cached_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Equasis cache (vessel details, search results)
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS equasis_cache (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
cache_key TEXT NOT NULL,
|
|
cache_type TEXT NOT NULL,
|
|
data TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Equasis daily query counter
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS equasis_daily_counter (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
date_str TEXT NOT NULL UNIQUE,
|
|
count INTEGER NOT NULL DEFAULT 0
|
|
)
|
|
"""))
|
|
|
|
# Query analytics log
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS query_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER,
|
|
query_text TEXT NOT NULL,
|
|
query_type TEXT,
|
|
tool_used TEXT,
|
|
response_time_ms INTEGER,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
|
|
# Indexes
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_vessels_imo_unique ON vessels(imo) WHERE imo IS NOT NULL")
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_vessels_mmsi_unique ON vessels(mmsi) WHERE mmsi IS NOT NULL")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_vessels_name ON vessels(name)")
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_positions_mmsi_unique ON positions(mmsi)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_timestamp ON positions(timestamp)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_lat_lon ON positions(latitude, longitude)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_destination ON positions(destination)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_contacts_type ON contacts(type)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_port_calls_mmsi ON port_calls(mmsi)")
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email ON users(email)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_token ON sessions(token)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_chat_history_user ON chat_history(user_id)")
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_wallets_user ON wallets(user_id)")
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_wallets_address ON wallets(address)")
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_deposits_tx ON deposits(tx_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_deposits_user ON deposits(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_withdrawals_user ON withdrawals(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_service_charges_user ON service_charges(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_purchased_contacts_user ON purchased_contacts(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_port_vessel_cache_key ON port_vessel_cache(port_key)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_equasis_cache_key ON equasis_cache(cache_key)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_query_log_user ON query_log(user_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_query_log_type ON query_log(query_type)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_mt_staging_lat_lon ON mt_bulk_staging(lat, lon)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_mt_staging_mmsi ON mt_bulk_staging(mmsi)")
|
|
|
|
# v3.21.0: Token/cost tracking columns for Ouroboros-style budget awareness
|
|
for col, coltype in [('input_tokens', 'INTEGER'), ('output_tokens', 'INTEGER'),
|
|
('cached_tokens', 'INTEGER'), ('cost_usd', 'REAL'),
|
|
('iterations', 'INTEGER')]:
|
|
try:
|
|
cursor.execute(f"ALTER TABLE query_log ADD COLUMN {col} {coltype}")
|
|
conn.commit()
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
|
|
# Moltbook bot comments log
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS moltbook_comments (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
post_id TEXT NOT NULL UNIQUE,
|
|
post_title TEXT,
|
|
comment_body TEXT,
|
|
commented_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
cursor.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_moltbook_post ON moltbook_comments(post_id)")
|
|
|
|
# Agent memory (3-tier: short-term=chat_history, working=user_memory, long-term=permanent+summaries)
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS user_memory (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
memory_type TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
entity TEXT,
|
|
is_permanent BOOLEAN DEFAULT FALSE,
|
|
relevance_score REAL DEFAULT 1.0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_memory_user ON user_memory(user_id)")
|
|
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS conversation_summaries (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
summary TEXT NOT NULL,
|
|
message_count INTEGER,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_conv_summaries_user ON conversation_summaries(user_id)")
|
|
|
|
# Vessel watches (Telegram bot: /watch command)
|
|
cursor.execute(_adapt_schema("""
|
|
CREATE TABLE IF NOT EXISTS vessel_watches (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
chat_id BIGINT NOT NULL,
|
|
mmsi TEXT NOT NULL,
|
|
vessel_name TEXT,
|
|
watch_type TEXT NOT NULL,
|
|
destination_port TEXT,
|
|
destination_lat REAL,
|
|
destination_lon REAL,
|
|
port_radius_nm REAL DEFAULT 15,
|
|
last_status TEXT,
|
|
last_lat REAL,
|
|
last_lon REAL,
|
|
last_check TIMESTAMP,
|
|
notified INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""))
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_watches_active ON vessel_watches(notified, chat_id)")
|
|
|
|
|
|
# Verification codes for email confirmation (v3.32.0)
|
|
cursor.execute(_adapt_schema("""CREATE TABLE IF NOT EXISTS verification_codes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
email TEXT NOT NULL,
|
|
code TEXT NOT NULL,
|
|
lang TEXT DEFAULT 'en',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
expires_at TIMESTAMP NOT NULL
|
|
)"""))
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_vc_email ON verification_codes(email)")
|
|
conn.commit()
|
|
|
|
# --- Migrations (each in its own transaction for PostgreSQL) ---
|
|
# Add 'source' column to positions (v2.2.0)
|
|
try:
|
|
cursor.execute("ALTER TABLE positions ADD COLUMN source TEXT DEFAULT 'unknown'")
|
|
conn.commit()
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
# Add 'website' column to vessels (v3.19.0 — bulk carrier ownership scraper)
|
|
try:
|
|
cursor.execute("ALTER TABLE vessels ADD COLUMN website TEXT")
|
|
conn.commit()
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
# Add 'home_port' column to user_profiles (v3.29.0)
|
|
try:
|
|
cursor.execute("ALTER TABLE user_profiles ADD COLUMN home_port TEXT NOT NULL DEFAULT ''")
|
|
conn.commit()
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
# Add 'preferred_tonnage' column to user_profiles (v3.39.2)
|
|
try:
|
|
cursor.execute("ALTER TABLE user_profiles ADD COLUMN preferred_tonnage INTEGER DEFAULT 0")
|
|
except Exception:
|
|
pass
|
|
|
|
# Add 'search_radius' column to user_profiles (v3.40.7)
|
|
try:
|
|
cursor.execute("ALTER TABLE user_profiles ADD COLUMN search_radius INTEGER DEFAULT 0")
|
|
conn.commit()
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
db_name = 'PostgreSQL' if USE_POSTGRES else f'SQLite: {DB_PATH}'
|
|
logger.info(f"Database initialized: {db_name}")
|
|
|
|
# Seed Caspian intelligence data (idempotent)
|
|
try:
|
|
_seed_caspian_data()
|
|
except Exception as e:
|
|
logger.warning(f"Caspian seed data: {e}")
|
|
|
|
# Build fuzzy vessel name index (background-safe)
|
|
try:
|
|
build_vessel_name_index()
|
|
except Exception as e:
|
|
logger.warning(f"Fuzzy index build: {e}")
|
|
|
|
return True
|
|
|
|
|
|
def _seed_caspian_data():
|
|
"""Seed Caspian Sea shipping companies and known vessels (idempotent)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# --- Caspian Shipping Companies ---
|
|
_CASPIAN_COMPANIES = [
|
|
{
|
|
'type': 'operator', 'company_name': 'Azerbaijan Caspian Shipping Company (ASCO)',
|
|
'country': 'Azerbaijan', 'city': 'Baku',
|
|
'address': 'Mikayil Useynov str. 2, AZ1003, Baku',
|
|
'phone': '+994 12 404 37 00', 'email': 'contact@asco.az',
|
|
'website': 'https://www.asco.az',
|
|
'notes': 'State-owned. 256 vessels (56 merchant + 200 offshore). Dominant Caspian operator. Ferries, tankers, dry cargo, Ro-Ro. Routes: Alat-Kuryk/Aktau, Alat-Turkmenbashi.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'KazMorTransFlot (KMTF)',
|
|
'country': 'Kazakhstan', 'city': 'Aktau',
|
|
'address': 'Building 55, 13 Micro District, Aktau',
|
|
'phone': '+7 7292 535-890', 'email': 'info@kmtf.kz',
|
|
'website': 'https://www.kmtf.kz',
|
|
'notes': '100% KazMunayGas subsidiary. 18 vessels: 8 tankers, 3 dry cargo, 3 container, 4 tugs. Routes: Aktau/Kuryk-Baku, Trans-Caspian oil shuttle.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'Caspian Integrated Maritime Solutions (CIMS)',
|
|
'country': 'Kazakhstan', 'city': 'Aktau',
|
|
'website': 'https://www.caspianims.com',
|
|
'notes': 'JV: AD Ports Group (UAE 51%) + KMTF (49%). 2 tankers: Liwa & Taraz (8000 DWT each, Damen). Kashagan/Tengiz crude via BTC pipeline.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'Volga Shipping Company (Volgaflot)',
|
|
'country': 'Russia', 'city': 'Nizhny Novgorod',
|
|
'address': '15A Markin Square, Nizhny Novgorod, 603001',
|
|
'website': 'https://www.volgaflot.com',
|
|
'notes': 'UCL Holding (Vladimir Lisin). 236+ river-sea vessels, DWT >1.4M tons. 15+ mln tons/year. Volga-Caspian routes: grain, metals, fertilizers, timber.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'Khazar Sea Shipping Lines (KSSL)',
|
|
'country': 'Iran', 'city': 'Bandar Anzali',
|
|
'address': 'Shahid Khomeini St., Bandar Anzali, Gilan, Iran',
|
|
'website': 'https://khazarshipping.ir',
|
|
'notes': 'IRISL subsidiary. 16-22 general cargo vessels. Offices: Astrakhan, Aktau, Baku, Turkmenbashi. Under EU/UK sanctions. Routes: Anzali-Astrakhan, Amirabad-Aktau.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'Turkmen Marine Merchant Fleet',
|
|
'country': 'Turkmenistan', 'city': 'Turkmenbashi',
|
|
'website': 'https://tmrl.gov.tm',
|
|
'notes': 'State-owned CJSC. 20 vessels: tankers, dry cargo, ferries, passenger. Routes: Turkmenbashi-Alat, Turkmenbashi-Aktau/Kuryk. 3 new vessels under construction (OPEC Fund + IsDB).',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'Silver Shipping Caspian',
|
|
'country': 'Kazakhstan', 'city': 'Aktau',
|
|
'website': 'https://silver-caspian.com',
|
|
'notes': 'Private logistics operator. Barges, tugs, crew boats. Oversized/project cargo and offshore support.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
{
|
|
'type': 'operator', 'company_name': 'Volgotanker',
|
|
'country': 'Russia', 'city': 'Samara',
|
|
'notes': 'Oil/product tanker operator. Inland waterway + Caspian coastal tankers. Volga River system to Caspian Sea.',
|
|
'source': 'caspian_intel', 'verified': 1,
|
|
},
|
|
]
|
|
|
|
for company in _CASPIAN_COMPANIES:
|
|
# Check if already exists (by company_name)
|
|
cursor.execute(
|
|
"SELECT id FROM contacts WHERE company_name = ? AND source = ?",
|
|
(company['company_name'], 'caspian_intel')
|
|
)
|
|
if cursor.fetchone():
|
|
continue # Already seeded
|
|
cols = {k: v for k, v in company.items() if k in _CONTACT_ALLOWED_COLS}
|
|
columns = ', '.join(cols.keys())
|
|
placeholders = ', '.join(['?' for _ in cols])
|
|
cursor.execute(f"INSERT INTO contacts ({columns}) VALUES ({placeholders})",
|
|
list(cols.values()))
|
|
|
|
# --- Khazar Shipping Fleet (known IMO numbers) ---
|
|
_KHAZAR_FLEET = [
|
|
{'imo': '8203608', 'name': 'SARINA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '8215742', 'name': 'SABRINA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '8605234', 'name': 'DORITA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '8721351', 'name': 'KASMA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9010711', 'name': 'VISTA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9010723', 'name': 'VIANA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9076478', 'name': 'PARIN', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9118551', 'name': 'PARAND', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9137210', 'name': 'PATRIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9137246', 'name': 'NARDIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9137258', 'name': 'KADOS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9245304', 'name': 'TARADIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9245316', 'name': 'PARMIS', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9367982', 'name': 'GILDA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9367994', 'name': 'SANIA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9368003', 'name': 'SARIR', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
{'imo': '9368015', 'name': 'SOMIA', 'flag': 'Iran', 'type': 'General Cargo', 'owner': 'Khazar Sea Shipping Lines', 'operator': 'Khazar Sea Shipping Lines'},
|
|
]
|
|
|
|
for vessel in _KHAZAR_FLEET:
|
|
cursor.execute("SELECT id FROM vessels WHERE imo = ?", (vessel['imo'],))
|
|
if cursor.fetchone():
|
|
continue # Already exists
|
|
vessel['source'] = 'caspian_intel'
|
|
vessel['created_at'] = datetime.utcnow().isoformat()
|
|
vessel['updated_at'] = datetime.utcnow().isoformat()
|
|
cols = {k: v for k, v in vessel.items() if k in _VESSEL_ALLOWED_COLS}
|
|
columns = ', '.join(cols.keys())
|
|
placeholders = ', '.join(['?' for _ in cols])
|
|
cursor.execute(f"INSERT INTO vessels ({columns}) VALUES ({placeholders})",
|
|
list(cols.values()))
|
|
|
|
conn.commit()
|
|
logger.info("Caspian seed data: 8 companies + 17 Khazar fleet vessels loaded")
|
|
except Exception as e:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# VESSEL OPERATIONS
|
|
# =============================================================================
|
|
|
|
_VESSEL_ALLOWED_COLS = {
|
|
'mmsi', 'imo', 'name', 'type', 'type_code', 'flag', 'flag_code',
|
|
'callsign', 'length', 'width', 'draught', 'gross_tonnage', 'deadweight',
|
|
'year_built', 'status', 'owner', 'owner_country', 'operator',
|
|
'operator_country', 'companies_json', 'engine_type', 'speed_max',
|
|
'created_at', 'updated_at', 'source',
|
|
'website', # shipowner/operator company website (from MT Pro scraping)
|
|
'beneficial_owner', 'beneficial_owner_country',
|
|
'registered_owner', 'registered_owner_country',
|
|
'commercial_manager', 'commercial_manager_country',
|
|
}
|
|
|
|
_CONTACT_ALLOWED_COLS = {
|
|
'type', 'company_name', 'country', 'city', 'address', 'phone', 'email',
|
|
'website', 'contact_person', 'notes', 'vessels_mmsi', 'source', 'verified',
|
|
}
|
|
|
|
def upsert_vessel(data: dict) -> int:
|
|
"""Insert or update vessel by IMO or MMSI."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
data.pop('source', None)
|
|
data['updated_at'] = datetime.utcnow().isoformat()
|
|
|
|
if 'companies' in data:
|
|
data['companies_json'] = json.dumps(data.pop('companies'), ensure_ascii=False)
|
|
|
|
# Filter to allowed columns only (prevent SQL injection via dict keys)
|
|
data = {k: v for k, v in data.items() if k in _VESSEL_ALLOWED_COLS}
|
|
|
|
existing = None
|
|
if data.get('imo'):
|
|
cursor.execute("SELECT id FROM vessels WHERE imo = ?", (data['imo'],))
|
|
existing = cursor.fetchone()
|
|
if not existing and data.get('mmsi'):
|
|
cursor.execute("SELECT id FROM vessels WHERE mmsi = ?", (data['mmsi'],))
|
|
existing = cursor.fetchone()
|
|
|
|
if existing:
|
|
vessel_id = existing[0]
|
|
updates = ', '.join([f"{k}=?" for k in data.keys()])
|
|
cursor.execute(f"UPDATE vessels SET {updates} WHERE id = ?",
|
|
list(data.values()) + [vessel_id])
|
|
else:
|
|
if not data.get('created_at'):
|
|
data['created_at'] = datetime.utcnow().isoformat()
|
|
columns = ', '.join(data.keys())
|
|
placeholders = ', '.join(['?' for _ in data])
|
|
cursor.execute(f"INSERT INTO vessels ({columns}) VALUES ({placeholders})",
|
|
list(data.values()))
|
|
vessel_id = cursor.lastrowid
|
|
|
|
conn.commit()
|
|
return vessel_id
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_vessel(mmsi: str = None, imo: str = None, name: str = None) -> dict:
|
|
"""Get vessel by MMSI, IMO, or name."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
if imo:
|
|
cursor.execute("SELECT * FROM vessels WHERE imo = ?", (imo,))
|
|
elif mmsi:
|
|
cursor.execute("SELECT * FROM vessels WHERE mmsi = ?", (mmsi,))
|
|
elif name:
|
|
cursor.execute("SELECT * FROM vessels WHERE name LIKE ?", (f"%{name}%",))
|
|
else:
|
|
return None
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
|
|
vessel = dict(row)
|
|
if vessel.get('companies_json'):
|
|
vessel['companies'] = json.loads(vessel['companies_json'])
|
|
vessel.pop('companies_json', None)
|
|
vessel.pop('source', None)
|
|
return vessel
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def search_vessels(query: str, limit: int = 20) -> list:
|
|
"""Search vessels by name, MMSI, or IMO."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT * FROM vessels
|
|
WHERE name LIKE ? OR mmsi LIKE ? OR imo LIKE ?
|
|
ORDER BY updated_at DESC
|
|
LIMIT ?
|
|
""", (f"%{query}%", f"%{query}%", f"%{query}%", limit))
|
|
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# FUZZY VESSEL SEARCH (in-memory index + trigram matching)
|
|
# =============================================================================
|
|
|
|
import difflib as _difflib
|
|
|
|
_VESSEL_NAME_INDEX = {} # {normalized_name: (original_name, mmsi, imo)}
|
|
_VESSEL_TRIGRAMS = {} # {trigram: set(normalized_names)}
|
|
_FUZZY_INDEX_BUILT = False
|
|
|
|
|
|
def _normalize_vessel_name(name: str) -> str:
|
|
"""Normalize vessel name for indexing: uppercase, strip punctuation, collapse spaces."""
|
|
if not name:
|
|
return ''
|
|
n = name.upper().strip()
|
|
n = re.sub(r'[^\w\s]', '', n)
|
|
n = re.sub(r'\s+', ' ', n).strip()
|
|
return n
|
|
|
|
|
|
def _make_trigrams(s: str) -> set:
|
|
"""Generate character trigrams from string."""
|
|
if len(s) < 3:
|
|
return {s} if s else set()
|
|
return {s[i:i+3] for i in range(len(s) - 2)}
|
|
|
|
|
|
def build_vessel_name_index():
|
|
"""Build in-memory vessel name index for fuzzy search.
|
|
Queries vessels + mt_bulk_staging tables. ~200ms, ~4MB RAM for 41K vessels."""
|
|
global _VESSEL_NAME_INDEX, _VESSEL_TRIGRAMS, _FUZZY_INDEX_BUILT
|
|
idx = {}
|
|
trigrams = {}
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
# From vessels table
|
|
try:
|
|
cursor.execute("SELECT name, mmsi, imo FROM vessels WHERE name IS NOT NULL AND name != ''")
|
|
for row in cursor.fetchall():
|
|
r = dict(row)
|
|
norm = _normalize_vessel_name(r['name'])
|
|
if norm and len(norm) >= 2:
|
|
idx[norm] = (r['name'], r.get('mmsi', ''), r.get('imo', ''))
|
|
for tri in _make_trigrams(norm):
|
|
trigrams.setdefault(tri, set()).add(norm)
|
|
except Exception as e:
|
|
logger.debug(f"Fuzzy index: vessels table error: {e}")
|
|
|
|
# From mt_bulk_staging table
|
|
try:
|
|
cursor.execute("SELECT name, mmsi, imo FROM mt_bulk_staging WHERE name IS NOT NULL AND name != ''")
|
|
for row in cursor.fetchall():
|
|
r = dict(row)
|
|
norm = _normalize_vessel_name(r['name'])
|
|
if norm and len(norm) >= 2 and norm not in idx:
|
|
idx[norm] = (r['name'], r.get('mmsi', ''), r.get('imo', ''))
|
|
for tri in _make_trigrams(norm):
|
|
trigrams.setdefault(tri, set()).add(norm)
|
|
except Exception as e:
|
|
logger.debug(f"Fuzzy index: mt_bulk_staging error: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
_VESSEL_NAME_INDEX = idx
|
|
_VESSEL_TRIGRAMS = trigrams
|
|
_FUZZY_INDEX_BUILT = True
|
|
logger.info(f"Fuzzy vessel index built: {len(idx)} names, {len(trigrams)} trigrams")
|
|
|
|
|
|
def fuzzy_search_vessel(query: str, threshold: float = 0.65, limit: int = 5) -> list:
|
|
"""Fuzzy search vessel by name using trigram pre-filter + difflib scoring.
|
|
Returns: [(original_name, mmsi, imo, score), ...] sorted by score desc. ~10-20ms."""
|
|
if not _FUZZY_INDEX_BUILT:
|
|
try:
|
|
build_vessel_name_index()
|
|
except Exception:
|
|
return []
|
|
|
|
norm_q = _normalize_vessel_name(query)
|
|
if not norm_q or len(norm_q) < 2:
|
|
return []
|
|
|
|
# Exact match shortcut
|
|
if norm_q in _VESSEL_NAME_INDEX:
|
|
entry = _VESSEL_NAME_INDEX[norm_q]
|
|
return [(entry[0], entry[1], entry[2], 1.0)]
|
|
|
|
# Trigram pre-filter: find candidates sharing >= 2 trigrams
|
|
q_trigrams = _make_trigrams(norm_q)
|
|
candidate_counts = {}
|
|
for tri in q_trigrams:
|
|
for name in _VESSEL_TRIGRAMS.get(tri, set()):
|
|
candidate_counts[name] = candidate_counts.get(name, 0) + 1
|
|
|
|
# Need at least 2 shared trigrams (or 1 for short queries)
|
|
min_shared = 1 if len(norm_q) < 5 else 2
|
|
candidates = [n for n, c in candidate_counts.items() if c >= min_shared]
|
|
|
|
# Score candidates with SequenceMatcher
|
|
scored = []
|
|
matcher = _difflib.SequenceMatcher(None, norm_q, '')
|
|
for cand in candidates:
|
|
matcher.set_seq2(cand)
|
|
score = matcher.ratio()
|
|
if score >= threshold:
|
|
entry = _VESSEL_NAME_INDEX[cand]
|
|
scored.append((entry[0], entry[1], entry[2], round(score, 3)))
|
|
|
|
scored.sort(key=lambda x: x[3], reverse=True)
|
|
return scored[:limit]
|
|
|
|
|
|
# Country name → ISO 2-letter flag code mapping
|
|
FLAG_NAME_TO_CODE = {
|
|
'Russia': 'RU', 'Turkey': 'TR', 'Panama': 'PA', 'Liberia': 'LR',
|
|
'Marshall Islands': 'MH', 'China': 'CN', 'Greece': 'GR', 'Singapore': 'SG',
|
|
'India': 'IN', 'Japan': 'JP', 'South Korea': 'KR', 'Germany': 'DE',
|
|
'Norway': 'NO', 'Italy': 'IT', 'United Kingdom': 'GB', 'United States': 'US',
|
|
'Iran': 'IR', 'Saudi Arabia': 'SA', 'UAE': 'AE', 'Egypt': 'EG',
|
|
'Indonesia': 'ID', 'Philippines': 'PH', 'Vietnam': 'VN', 'Brazil': 'BR',
|
|
'Denmark': 'DK', 'Netherlands': 'NL', 'France': 'FR', 'Spain': 'ES',
|
|
'Malta': 'MT', 'Cyprus': 'CY', 'Azerbaijan': 'AZ', 'Kazakhstan': 'KZ',
|
|
'Turkmenistan': 'TM', 'Georgia': 'GE', 'Ukraine': 'UA', 'Romania': 'RO',
|
|
'Bulgaria': 'BG', 'Croatia': 'HR', 'Poland': 'PL', 'Finland': 'FI',
|
|
'Sweden': 'SE', 'Estonia': 'EE', 'Latvia': 'LV', 'Lithuania': 'LT',
|
|
'Hong Kong': 'HK', 'Bahamas': 'BS', 'Bermuda': 'BM', 'Portugal': 'PT',
|
|
'Cayman Islands': 'KY', 'Isle of Man': 'IM', 'Antigua and Barbuda': 'AG',
|
|
'Saint Vincent': 'VC', 'Comoros': 'KM', 'Tanzania': 'TZ', 'Togo': 'TG',
|
|
'Cameroon': 'CM', 'Sierra Leone': 'SL', 'Moldova': 'MD', 'Palau': 'PW',
|
|
'Belize': 'BZ', 'Bolivia': 'BO', 'Cook Islands': 'CK', 'Tuvalu': 'TV',
|
|
}
|
|
# Reverse: code → name (for display)
|
|
FLAG_CODE_TO_NAME = {v: k for k, v in FLAG_NAME_TO_CODE.items()}
|
|
|
|
|
|
def search_vessels_by_flag(flag_country: str, vessel_type: str = None, limit: int = 20) -> list:
|
|
"""Search vessels by flag country name, optionally filtered by vessel type."""
|
|
# Resolve country name to ISO 2-letter code
|
|
code = None
|
|
fc_upper = flag_country.upper().strip()
|
|
# Direct 2-letter code check
|
|
if len(fc_upper) == 2 and fc_upper.isalpha():
|
|
code = fc_upper
|
|
else:
|
|
# Try name → code mapping (case-insensitive)
|
|
for name, c_code in FLAG_NAME_TO_CODE.items():
|
|
if name.upper() == fc_upper:
|
|
code = c_code
|
|
break
|
|
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
results = []
|
|
# mt_bulk_staging has more vessels with owner data
|
|
try:
|
|
if code:
|
|
sql = "SELECT * FROM mt_bulk_staging WHERE UPPER(flag) = ?"
|
|
params = [code]
|
|
else:
|
|
sql = "SELECT * FROM mt_bulk_staging WHERE UPPER(flag) LIKE ?"
|
|
params = [f"%{flag_country.upper()}%"]
|
|
if vessel_type:
|
|
sql += " AND UPPER(type_category) = ?"
|
|
params.append(vessel_type.upper())
|
|
sql += " ORDER BY dwt DESC NULLS LAST LIMIT ?"
|
|
params.append(limit)
|
|
cursor.execute(sql, params)
|
|
results = [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.debug(f"Flag search staging error: {e}")
|
|
|
|
if len(results) < limit:
|
|
try:
|
|
sql2 = "SELECT * FROM vessels WHERE UPPER(flag) LIKE ?"
|
|
params2 = [f"%{flag_country.upper()}%"]
|
|
sql2 += " LIMIT ?"
|
|
params2.append(limit - len(results))
|
|
cursor.execute(sql2, params2)
|
|
existing_mmsi = {r.get('mmsi') for r in results}
|
|
for row in cursor.fetchall():
|
|
r = dict(row)
|
|
if r.get('mmsi') not in existing_mmsi:
|
|
results.append(r)
|
|
except Exception as e:
|
|
logger.debug(f"Flag search vessels error: {e}")
|
|
return results
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def search_vessels_by_owner(owner_query: str, limit: int = 20) -> list:
|
|
"""Search vessels by owner/operator name (LIKE match across owner fields)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
q = f"%{owner_query.upper()}%"
|
|
try:
|
|
cursor.execute("""
|
|
SELECT * FROM mt_bulk_staging
|
|
WHERE UPPER(owner) LIKE ? OR UPPER(registered_owner) LIKE ?
|
|
OR UPPER(operator) LIKE ? OR UPPER(commercial_manager) LIKE ?
|
|
ORDER BY dwt DESC NULLS LAST LIMIT ?
|
|
""", (q, q, q, q, limit))
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.debug(f"Owner search error: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def search_vessels_by_type(vessel_type: str, region_bbox: tuple = None, limit: int = 20) -> list:
|
|
"""Search vessels by type category, optionally restricted to bounding box (lat_min, lat_max, lon_min, lon_max)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
try:
|
|
sql = "SELECT * FROM mt_bulk_staging WHERE UPPER(type_category) = ?"
|
|
params = [vessel_type.upper()]
|
|
if region_bbox:
|
|
lat_min, lat_max, lon_min, lon_max = region_bbox
|
|
sql += " AND lat BETWEEN ? AND ? AND lon BETWEEN ? AND ?"
|
|
params.extend([lat_min, lat_max, lon_min, lon_max])
|
|
sql += " ORDER BY dwt DESC NULLS LAST LIMIT ?"
|
|
params.append(limit)
|
|
cursor.execute(sql, params)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.debug(f"Type search error: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# POSITION OPERATIONS
|
|
# =============================================================================
|
|
|
|
def _normalize_timestamp(ts) -> str:
|
|
"""Normalize timestamp to 'YYYY-MM-DD HH:MM:SS.ffffff' for consistent DB sorting.
|
|
Handles: ISO with T, AISStream '+0000 UTC' suffix, unix ms (Digitraffic)."""
|
|
if ts is None:
|
|
return datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
|
|
if isinstance(ts, (int, float)):
|
|
# Unix timestamp (seconds or milliseconds)
|
|
if ts > 1e12:
|
|
ts = ts / 1000.0 # milliseconds → seconds
|
|
return datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S.%f')
|
|
s = str(ts).strip()
|
|
# Remove AISStream timezone suffix like ' +0000 UTC'
|
|
for suffix in (' +0000 UTC', ' UTC', '+00:00', 'Z'):
|
|
if s.endswith(suffix):
|
|
s = s[:-len(suffix)]
|
|
# Replace T separator with space for consistent SQLite sorting
|
|
s = s.replace('T', ' ')
|
|
return s
|
|
|
|
|
|
def add_position(mmsi: str, lat: float, lon: float, speed: float = None,
|
|
course: float = None, heading: float = None, status: str = None,
|
|
destination: str = None, eta: str = None, timestamp: str = None,
|
|
source: str = None):
|
|
"""Upsert vessel position — keeps only the latest position per MMSI."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
ts = _normalize_timestamp(timestamp)
|
|
src = source or 'unknown'
|
|
|
|
if USE_POSTGRES:
|
|
cursor.execute("""
|
|
INSERT INTO positions (mmsi, latitude, longitude, speed, course,
|
|
heading, status, destination, eta, timestamp, received_at, source)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, ?)
|
|
ON CONFLICT (mmsi) DO UPDATE SET
|
|
latitude = EXCLUDED.latitude,
|
|
longitude = EXCLUDED.longitude,
|
|
speed = EXCLUDED.speed,
|
|
course = EXCLUDED.course,
|
|
heading = EXCLUDED.heading,
|
|
status = COALESCE(EXCLUDED.status, positions.status),
|
|
destination = COALESCE(EXCLUDED.destination, positions.destination),
|
|
eta = COALESCE(EXCLUDED.eta, positions.eta),
|
|
timestamp = EXCLUDED.timestamp,
|
|
received_at = CURRENT_TIMESTAMP,
|
|
source = EXCLUDED.source
|
|
""", (mmsi, lat, lon, speed, course, heading, status, destination, eta, ts, src))
|
|
else:
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO positions (mmsi, latitude, longitude, speed, course,
|
|
heading, status, destination, eta, timestamp, source)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (mmsi, lat, lon, speed, course, heading, status, destination, eta, ts, src))
|
|
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_last_position(mmsi: str) -> dict:
|
|
"""Get last known position for vessel."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT * FROM positions
|
|
WHERE mmsi = ?
|
|
ORDER BY timestamp DESC
|
|
LIMIT 1
|
|
""", (mmsi,))
|
|
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_position_destination(mmsi: str, destination: str, eta: str = None):
|
|
"""Update destination/eta on the latest position for this MMSI."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
if eta:
|
|
cursor.execute("""
|
|
UPDATE positions SET destination = ?, eta = ?
|
|
WHERE mmsi = ? AND id = (
|
|
SELECT id FROM positions WHERE mmsi = ? ORDER BY timestamp DESC LIMIT 1
|
|
)
|
|
""", (destination, eta, mmsi, mmsi))
|
|
else:
|
|
cursor.execute("""
|
|
UPDATE positions SET destination = ?
|
|
WHERE mmsi = ? AND id = (
|
|
SELECT id FROM positions WHERE mmsi = ? ORDER BY timestamp DESC LIMIT 1
|
|
)
|
|
""", (destination, mmsi, mmsi))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_position_history(mmsi: str, hours: int = 24) -> list:
|
|
"""Get position history for vessel."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Use space-separated format for consistent SQLite string comparison
|
|
since = (datetime.utcnow() - timedelta(hours=hours)).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor.execute("""
|
|
SELECT * FROM positions
|
|
WHERE mmsi = ? AND timestamp > ?
|
|
ORDER BY timestamp DESC
|
|
""", (mmsi, since))
|
|
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_positions_in_area(lat_min: float, lat_max: float,
|
|
lon_min: float, lon_max: float,
|
|
max_age_minutes: int = 30) -> list:
|
|
"""Get most recent positions within a geographic bounding box.
|
|
Returns deduplicated list (latest position per MMSI), joined with vessel info."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
since = (datetime.utcnow() - timedelta(minutes=max_age_minutes)).strftime('%Y-%m-%d %H:%M:%S')
|
|
type_filter = ""
|
|
if config.FREIGHT_VESSELS_ONLY:
|
|
type_filter = f"AND (v.type_code IS NULL OR v.type_code >= {config.FREIGHT_MIN_TYPE_CODE})"
|
|
cursor.execute(f"""
|
|
SELECT p.mmsi, p.latitude, p.longitude, p.speed, p.course,
|
|
p.heading, p.status, p.destination, p.eta, p.timestamp, p.source,
|
|
v.name, v.type, v.flag, v.deadweight AS dwt, v.imo,
|
|
v.type_code, v.callsign,
|
|
v.owner, v.owner_country, v.operator, v.operator_country,
|
|
v.website, v.year_built, v.companies_json
|
|
FROM positions p
|
|
LEFT JOIN vessels v ON p.mmsi = v.mmsi
|
|
WHERE p.latitude BETWEEN ? AND ?
|
|
AND p.longitude BETWEEN ? AND ?
|
|
AND p.timestamp > ?
|
|
{type_filter}
|
|
ORDER BY p.timestamp DESC
|
|
""", (lat_min, lat_max, lon_min, lon_max, since))
|
|
|
|
rows = cursor.fetchall()
|
|
seen = set()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
if r['mmsi'] not in seen:
|
|
seen.add(r['mmsi'])
|
|
results.append(r)
|
|
return results
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _normalize_mt_staging_row(row) -> dict:
|
|
"""Normalize a mt_bulk_staging DB row to standard vessel dict format."""
|
|
r = dict(row) if not isinstance(row, dict) else row
|
|
v = {
|
|
'mmsi': r.get('mmsi'),
|
|
'imo': r.get('imo'),
|
|
'name': r.get('name', ''),
|
|
'flag': r.get('flag', ''),
|
|
'dwt': r.get('dwt'),
|
|
'latitude': r.get('lat'),
|
|
'longitude': r.get('lon'),
|
|
'speed': r.get('speed'),
|
|
'course': r.get('course'),
|
|
'destination': r.get('destination', ''),
|
|
'type_category': r.get('type_category', ''),
|
|
'type': r.get('gt_shiptype', ''),
|
|
'owner': r.get('owner'),
|
|
'operator': r.get('operator'),
|
|
'registered_owner': r.get('registered_owner'),
|
|
'year_built': r.get('year_built'),
|
|
'commercial_manager': r.get('commercial_manager'),
|
|
'owner_website': r.get('owner_website'),
|
|
'owner_country': r.get('owner_country'),
|
|
'operator_country': r.get('operator_country'),
|
|
'registered_owner_country': r.get('registered_owner_country'),
|
|
'source': 'mt_staging',
|
|
'_ship_id': r.get('ship_id'),
|
|
}
|
|
age_h = r.get('position_age_hours')
|
|
if age_h is not None:
|
|
v['position_age_hours'] = round(age_h, 1)
|
|
if age_h < 1:
|
|
v['position_freshness'] = 'live'
|
|
elif age_h < 6:
|
|
v['position_freshness'] = 'recent'
|
|
elif age_h < 24:
|
|
v['position_freshness'] = 'today'
|
|
else:
|
|
v['position_freshness'] = 'stale'
|
|
return v
|
|
|
|
|
|
_MT_STAGING_SELECT = """
|
|
SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
|
|
lat, lon, speed, course, destination, mmsi, imo, owner, operator,
|
|
scraped_at, registered_owner, year_built,
|
|
commercial_manager, owner_website, owner_country,
|
|
operator_country, registered_owner_country,
|
|
EXTRACT(EPOCH FROM (NOW() - scraped_at)) / 3600 AS position_age_hours
|
|
FROM mt_bulk_staging
|
|
"""
|
|
|
|
|
|
def get_vessel_from_staging(mmsi: str = None, imo: str = None, name: str = None) -> dict:
|
|
"""Get vessel from mt_bulk_staging with owner data formatted as companies array."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
if imo:
|
|
cursor.execute("""SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
|
|
lat, lon, speed, course, destination, mmsi, imo, draught, loa, beam, year_built,
|
|
owner, owner_country, owner_address, owner_website, owner_phone, owner_email,
|
|
registered_owner, registered_owner_country, registered_owner_address,
|
|
registered_owner_website, registered_owner_phone, registered_owner_email,
|
|
commercial_manager, commercial_manager_country, commercial_manager_address, commercial_manager_website,
|
|
operator, operator_country, operator_address, operator_website
|
|
FROM mt_bulk_staging WHERE imo = ? LIMIT 1""", (imo,))
|
|
elif mmsi:
|
|
cursor.execute("""SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
|
|
lat, lon, speed, course, destination, mmsi, imo, draught, loa, beam, year_built,
|
|
owner, owner_country, owner_address, owner_website, owner_phone, owner_email,
|
|
registered_owner, registered_owner_country, registered_owner_address,
|
|
registered_owner_website, registered_owner_phone, registered_owner_email,
|
|
commercial_manager, commercial_manager_country, commercial_manager_address, commercial_manager_website,
|
|
operator, operator_country, operator_address, operator_website
|
|
FROM mt_bulk_staging WHERE mmsi = ? LIMIT 1""", (mmsi,))
|
|
elif name:
|
|
cursor.execute("""SELECT ship_id, name, flag, dwt, gt_shiptype, type_category,
|
|
lat, lon, speed, course, destination, mmsi, imo, draught, loa, beam, year_built,
|
|
owner, owner_country, owner_address, owner_website, owner_phone, owner_email,
|
|
registered_owner, registered_owner_country, registered_owner_address,
|
|
registered_owner_website, registered_owner_phone, registered_owner_email,
|
|
commercial_manager, commercial_manager_country, commercial_manager_address, commercial_manager_website,
|
|
operator, operator_country, operator_address, operator_website
|
|
FROM mt_bulk_staging WHERE UPPER(name) = UPPER(?) LIMIT 1""", (name,))
|
|
else:
|
|
return None
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
|
|
r = dict(row)
|
|
vessel = {
|
|
'name': r.get('name', ''),
|
|
'mmsi': r.get('mmsi'),
|
|
'imo': r.get('imo'),
|
|
'flag': r.get('flag', ''),
|
|
'type': r.get('gt_shiptype', ''),
|
|
'type_category': r.get('type_category', ''),
|
|
'deadweight': r.get('dwt'),
|
|
'length': r.get('loa'),
|
|
'width': r.get('beam'),
|
|
'draught': r.get('draught'),
|
|
'year_built': r.get('year_built'),
|
|
'latitude': r.get('lat'),
|
|
'longitude': r.get('lon'),
|
|
'speed': r.get('speed'),
|
|
'course': r.get('course'),
|
|
'destination': r.get('destination', ''),
|
|
}
|
|
|
|
# Build companies array from owner fields
|
|
companies = []
|
|
_pro = 'MT Pro required'
|
|
if r.get('owner') and r['owner'] != _pro:
|
|
c = {'role': 'beneficial_owner', 'name': r['owner']}
|
|
if r.get('owner_country'): c['country'] = r['owner_country']
|
|
if r.get('owner_address'): c['address'] = r['owner_address']
|
|
if r.get('owner_website'): c['website'] = r['owner_website']
|
|
if r.get('owner_phone'): c['phone'] = r['owner_phone']
|
|
if r.get('owner_email'): c['email'] = r['owner_email']
|
|
companies.append(c)
|
|
if r.get('registered_owner'):
|
|
c = {'role': 'registered_owner', 'name': r['registered_owner']}
|
|
if r.get('registered_owner_country'): c['country'] = r['registered_owner_country']
|
|
if r.get('registered_owner_address'): c['address'] = r['registered_owner_address']
|
|
if r.get('registered_owner_website'): c['website'] = r['registered_owner_website']
|
|
if r.get('registered_owner_phone'): c['phone'] = r['registered_owner_phone']
|
|
if r.get('registered_owner_email'): c['email'] = r['registered_owner_email']
|
|
companies.append(c)
|
|
if r.get('commercial_manager'):
|
|
c = {'role': 'commercial_manager', 'name': r['commercial_manager']}
|
|
if r.get('commercial_manager_country'): c['country'] = r['commercial_manager_country']
|
|
if r.get('commercial_manager_address'): c['address'] = r['commercial_manager_address']
|
|
if r.get('commercial_manager_website'): c['website'] = r['commercial_manager_website']
|
|
companies.append(c)
|
|
if r.get('operator'):
|
|
c = {'role': 'operator', 'name': r['operator']}
|
|
if r.get('operator_country'): c['country'] = r['operator_country']
|
|
if r.get('operator_address'): c['address'] = r['operator_address']
|
|
if r.get('operator_website'): c['website'] = r['operator_website']
|
|
companies.append(c)
|
|
|
|
if companies:
|
|
vessel['companies'] = companies
|
|
elif r.get('owner') == _pro:
|
|
vessel['owner_status'] = 'MT Pro subscription required for owner data'
|
|
|
|
return vessel
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_mt_bulk_staging_near_port(lat: float, lon: float, radius_nm: float,
|
|
vessel_type: str = None) -> list:
|
|
"""Query mt_bulk_staging table for vessels near a port.
|
|
Uses bounding box on lat/lon from MT tile scraping data.
|
|
Returns list of vessel dicts compatible with search_vessels_near_port output format.
|
|
"""
|
|
import math as _math
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
lat_delta = radius_nm / 60.0
|
|
cos_lat = max(_math.cos(_math.radians(lat)), 0.01)
|
|
lon_delta = radius_nm / (60.0 * cos_lat)
|
|
|
|
lat_min = lat - lat_delta
|
|
lat_max = lat + lat_delta
|
|
lon_min = lon - lon_delta
|
|
lon_max = lon + lon_delta
|
|
|
|
type_clause = ''
|
|
params = [lat_min, lat_max, lon_min, lon_max]
|
|
|
|
if vessel_type == 'bulk':
|
|
type_clause = "AND (type_category = 'bulk' OR gt_shiptype = '6')"
|
|
elif vessel_type == 'tanker':
|
|
type_clause = "AND type_category = 'tanker'"
|
|
elif vessel_type == 'container':
|
|
type_clause = "AND (type_category = 'container' OR gt_shiptype = '11')"
|
|
elif vessel_type == 'roro':
|
|
type_clause = "AND (type_category = 'roro' OR gt_shiptype = '12')"
|
|
elif vessel_type == 'general':
|
|
type_clause = "AND (type_category = 'general' OR gt_shiptype IN ('9','70','139'))"
|
|
|
|
cursor.execute(f"""
|
|
{_MT_STAGING_SELECT}
|
|
WHERE lat BETWEEN ? AND ?
|
|
AND lon BETWEEN ? AND ?
|
|
AND lat IS NOT NULL AND lon IS NOT NULL
|
|
{type_clause}
|
|
ORDER BY scraped_at DESC NULLS LAST, dwt DESC NULLS LAST
|
|
LIMIT 100
|
|
""", params)
|
|
|
|
return [_normalize_mt_staging_row(row) for row in cursor.fetchall()]
|
|
except Exception:
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_mt_bulk_staging_in_area(lat_min: float, lat_max: float,
|
|
lon_min: float, lon_max: float,
|
|
limit: int = 500) -> list:
|
|
"""Get mt_bulk_staging vessels within bounding box for map display.
|
|
Returns normalized vessel dicts with latitude/longitude keys."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(f"""
|
|
{_MT_STAGING_SELECT}
|
|
WHERE lat BETWEEN ? AND ?
|
|
AND lon BETWEEN ? AND ?
|
|
AND lat IS NOT NULL AND lon IS NOT NULL
|
|
AND mmsi IS NOT NULL
|
|
ORDER BY scraped_at DESC NULLS LAST
|
|
LIMIT ?
|
|
""", [lat_min, lat_max, lon_min, lon_max, limit])
|
|
|
|
return [_normalize_mt_staging_row(row) for row in cursor.fetchall()]
|
|
except Exception:
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_all_vessel_positions_minimal() -> list:
|
|
"""Get all vessel positions for map clustering (minimal fields).
|
|
Unions fresh AIS positions with MT bulk staging, deduped by MMSI.
|
|
Returns list of dicts: mmsi, lat, lon, type_category, heading, speed, course, name."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
# Fresh AIS positions (< 60 min) + MT bulk staging, deduped by MMSI (PostgreSQL DISTINCT ON)
|
|
cursor.execute("""
|
|
SELECT DISTINCT ON (mmsi)
|
|
mmsi, lat, lon, type_category, heading, speed, course, name
|
|
FROM (
|
|
SELECT p.mmsi, p.latitude AS lat, p.longitude AS lon,
|
|
COALESCE(v.type, '') AS type_category,
|
|
p.heading, p.speed, p.course,
|
|
COALESCE(v.name, '') AS name,
|
|
1 AS priority,
|
|
p.timestamp AS ts
|
|
FROM positions p
|
|
LEFT JOIN vessels v ON p.mmsi = v.mmsi
|
|
WHERE p.timestamp > NOW() - INTERVAL '60 minutes'
|
|
AND p.latitude IS NOT NULL AND p.longitude IS NOT NULL
|
|
AND p.mmsi IS NOT NULL
|
|
UNION ALL
|
|
SELECT mmsi, lat, lon,
|
|
COALESCE(type_category, 'other') AS type_category,
|
|
NULL AS heading, speed, course,
|
|
COALESCE(name, '') AS name,
|
|
2 AS priority,
|
|
scraped_at AS ts
|
|
FROM mt_bulk_staging
|
|
WHERE lat IS NOT NULL AND lon IS NOT NULL AND mmsi IS NOT NULL
|
|
) combined
|
|
ORDER BY mmsi, priority ASC, ts DESC
|
|
""")
|
|
rows = cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
mmsi = str(r.get('mmsi', ''))
|
|
if not mmsi:
|
|
continue
|
|
results.append({
|
|
'mmsi': mmsi,
|
|
'lat': float(r['lat']) if r['lat'] else None,
|
|
'lon': float(r['lon']) if r['lon'] else None,
|
|
'type_category': r.get('type_category') or 'other',
|
|
'heading': r.get('heading'),
|
|
'speed': r.get('speed'),
|
|
'course': r.get('course'),
|
|
'name': r.get('name') or '',
|
|
})
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"get_all_vessel_positions_minimal error: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_mt_bulk_staging_position(mmsi: str, lat: float, lon: float,
|
|
speed: float = None, course: float = None):
|
|
"""Cross-update mt_bulk_staging with fresh AIS position for a known vessel.
|
|
Only updates if we have a matching MMSI in the table.
|
|
"""
|
|
if not mmsi or lat is None or lon is None:
|
|
return
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE mt_bulk_staging
|
|
SET lat = ?, lon = ?, speed = COALESCE(?, speed),
|
|
course = COALESCE(?, course), scraped_at = NOW()
|
|
WHERE mmsi = ?
|
|
""", [lat, lon, speed, course, mmsi])
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def insert_mt_discovery(vessel: dict, source: str = 'ais'):
|
|
"""Insert a newly discovered cargo/tanker vessel into mt_bulk_staging.
|
|
Uses ON CONFLICT DO NOTHING to avoid overwriting existing data.
|
|
Vessel dict should have: mmsi, name, lat/latitude, lon/longitude, type/type_category.
|
|
"""
|
|
mmsi = vessel.get('mmsi')
|
|
if not mmsi:
|
|
return
|
|
name = vessel.get('name', '')
|
|
lat = vessel.get('lat') or vessel.get('latitude')
|
|
lon = vessel.get('lon') or vessel.get('longitude')
|
|
speed = vessel.get('speed')
|
|
course = vessel.get('course')
|
|
flag = vessel.get('flag', '')
|
|
dwt = vessel.get('dwt')
|
|
imo = vessel.get('imo')
|
|
type_cat = vessel.get('type_category', '')
|
|
gt_shiptype = vessel.get('type', '')
|
|
ship_id = f"ais_{mmsi}"
|
|
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO mt_bulk_staging
|
|
(ship_id, name, flag, dwt, gt_shiptype, type_category,
|
|
lat, lon, speed, course, mmsi, imo, scraped_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())
|
|
ON CONFLICT (ship_id) DO NOTHING
|
|
""", [ship_id, name, flag, dwt, gt_shiptype, type_cat,
|
|
lat, lon, speed, course, mmsi, imo])
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_vessels_by_destination(destination_patterns: list, max_age_minutes: int = 360) -> list:
|
|
"""Find vessels whose AIS destination matches any of the given patterns.
|
|
|
|
Args:
|
|
destination_patterns: list of uppercase strings, e.g. ["BOSTON", "USBOS", "US BOS"]
|
|
max_age_minutes: max age of position data (default 6 hours)
|
|
Returns:
|
|
list of vessel dicts with position + vessel info (latest per MMSI)
|
|
"""
|
|
if not destination_patterns:
|
|
return []
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
since = (datetime.utcnow() - timedelta(minutes=max_age_minutes)).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
# Build dynamic WHERE: (UPPER(p.destination) LIKE ? OR UPPER(p.destination) LIKE ? ...)
|
|
like_clauses = []
|
|
params = []
|
|
for pat in destination_patterns:
|
|
like_clauses.append("UPPER(p.destination) LIKE ?")
|
|
params.append(f"%{pat.upper()}%")
|
|
|
|
where_dest = " OR ".join(like_clauses)
|
|
params.append(since)
|
|
|
|
type_filter = ""
|
|
if config.FREIGHT_VESSELS_ONLY:
|
|
type_filter = f"AND (v.type_code IS NULL OR v.type_code >= {config.FREIGHT_MIN_TYPE_CODE})"
|
|
|
|
cursor.execute(f"""
|
|
SELECT p.mmsi, p.latitude, p.longitude, p.speed, p.course,
|
|
p.heading, p.status, p.destination, p.eta, p.timestamp, p.source,
|
|
v.name, v.type, v.flag, v.deadweight AS dwt, v.imo,
|
|
v.type_code, v.callsign,
|
|
v.owner, v.owner_country, v.operator, v.operator_country,
|
|
v.website, v.year_built, v.companies_json
|
|
FROM positions p
|
|
LEFT JOIN vessels v ON p.mmsi = v.mmsi
|
|
WHERE ({where_dest})
|
|
AND p.destination IS NOT NULL AND p.destination != ''
|
|
AND p.timestamp > ?
|
|
{type_filter}
|
|
ORDER BY p.timestamp DESC
|
|
""", params)
|
|
|
|
rows = cursor.fetchall()
|
|
# Deduplicate: latest position per MMSI
|
|
seen = set()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
if r['mmsi'] not in seen:
|
|
seen.add(r['mmsi'])
|
|
results.append(r)
|
|
return results
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def cleanup_old_positions(keep_hours: int = 48) -> int:
|
|
"""Delete positions older than keep_hours. Returns count of deleted rows."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cutoff = (datetime.utcnow() - timedelta(hours=keep_hours)).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor.execute("DELETE FROM positions WHERE timestamp < ?", (cutoff,))
|
|
deleted = cursor.rowcount
|
|
conn.commit()
|
|
return deleted
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# CONTACT OPERATIONS
|
|
# =============================================================================
|
|
|
|
def add_contact(contact_type: str, company_name: str, **kwargs) -> int:
|
|
"""Add contact."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
data = {'type': contact_type, 'company_name': company_name, **kwargs}
|
|
# Filter to allowed columns only (prevent SQL injection via dict keys)
|
|
data = {k: v for k, v in data.items() if k in _CONTACT_ALLOWED_COLS}
|
|
columns = ', '.join(data.keys())
|
|
placeholders = ', '.join(['?' for _ in data])
|
|
cursor.execute(f"INSERT INTO contacts ({columns}) VALUES ({placeholders})",
|
|
list(data.values()))
|
|
|
|
conn.commit()
|
|
contact_id = cursor.lastrowid
|
|
return contact_id
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def search_contacts(query: str = None, contact_type: str = None, limit: int = 20) -> list:
|
|
"""Search contacts."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
if query and contact_type:
|
|
cursor.execute("""
|
|
SELECT * FROM contacts
|
|
WHERE type = ? AND (company_name LIKE ? OR contact_person LIKE ?)
|
|
ORDER BY updated_at DESC LIMIT ?
|
|
""", (contact_type, f"%{query}%", f"%{query}%", limit))
|
|
elif contact_type:
|
|
cursor.execute("""
|
|
SELECT * FROM contacts WHERE type = ?
|
|
ORDER BY updated_at DESC LIMIT ?
|
|
""", (contact_type, limit))
|
|
elif query:
|
|
cursor.execute("""
|
|
SELECT * FROM contacts
|
|
WHERE company_name LIKE ? OR contact_person LIKE ?
|
|
ORDER BY updated_at DESC LIMIT ?
|
|
""", (f"%{query}%", f"%{query}%", limit))
|
|
else:
|
|
cursor.execute("SELECT * FROM contacts ORDER BY updated_at DESC LIMIT ?", (limit,))
|
|
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _mask_email(email):
|
|
"""Mask email: john.doe@maersk.com -> j***@maersk.com"""
|
|
if not email:
|
|
return ''
|
|
parts = email.split('@')
|
|
if len(parts) != 2:
|
|
return '***@***.com'
|
|
local = parts[0]
|
|
masked_local = local[0] + '***' if local else '***'
|
|
return f"{masked_local}@{parts[1]}"
|
|
|
|
|
|
def _mask_phone(phone):
|
|
"""Mask phone: +1-555-123-4567 -> +1-***-**67"""
|
|
if not phone:
|
|
return ''
|
|
if len(phone) > 6:
|
|
return phone[:3] + '***' + phone[-2:]
|
|
return '***'
|
|
|
|
|
|
def search_contacts_in_staging(query: str, limit: int = 10, mask: bool = True) -> list:
|
|
"""Search mt_bulk_staging for vessel/owner contacts by name, MMSI, IMO, or owner/operator name.
|
|
mask=True: emails/phones masked (free preview). mask=False: full data (paid unlock)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
q = f"%{query}%"
|
|
q_exact = query.strip()
|
|
# Search by name, owner, operator, registered_owner, MMSI, IMO
|
|
cursor.execute("""
|
|
SELECT name, imo, mmsi, flag, dwt, type_category,
|
|
owner, owner_email, owner_phone, owner_website,
|
|
owner_address, owner_country,
|
|
operator, operator_website, operator_country,
|
|
registered_owner, registered_owner_email, registered_owner_website,
|
|
registered_owner_country,
|
|
commercial_manager, commercial_manager_website, commercial_manager_country
|
|
FROM mt_bulk_staging
|
|
WHERE (name ILIKE ? OR owner ILIKE ? OR operator ILIKE ?
|
|
OR registered_owner ILIKE ?
|
|
OR CAST(mmsi AS TEXT) = ? OR CAST(imo AS TEXT) = ?)
|
|
AND (owner IS NOT NULL OR operator IS NOT NULL)
|
|
LIMIT ?
|
|
""", (q, q, q, q, q_exact, q_exact, limit))
|
|
rows = cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
vessel_info = f"{r.get('name', '')} (IMO: {r.get('imo', 'N/A')}, MMSI: {r.get('mmsi', 'N/A')})"
|
|
contacts_out = []
|
|
# Owner contact
|
|
if r.get('owner'):
|
|
email_raw = r.get('owner_email') or ''
|
|
phone_raw = r.get('owner_phone') or ''
|
|
contacts_out.append({
|
|
'company_name': r['owner'],
|
|
'vessel': vessel_info,
|
|
'role': 'Beneficial Owner',
|
|
'email': _mask_email(email_raw) if mask else email_raw,
|
|
'phone': _mask_phone(phone_raw) if mask else phone_raw,
|
|
'website': r.get('owner_website') or '',
|
|
'country': r.get('owner_country') or '',
|
|
'address': (r.get('owner_address') or '') if not mask else '',
|
|
'has_email': bool(email_raw),
|
|
'has_phone': bool(phone_raw),
|
|
})
|
|
# Registered owner (if different)
|
|
if r.get('registered_owner') and r.get('registered_owner') != r.get('owner'):
|
|
email_raw = r.get('registered_owner_email') or ''
|
|
contacts_out.append({
|
|
'company_name': r['registered_owner'],
|
|
'vessel': vessel_info,
|
|
'role': 'Registered Owner',
|
|
'email': _mask_email(email_raw) if mask else email_raw,
|
|
'phone': '',
|
|
'website': r.get('registered_owner_website') or '',
|
|
'country': r.get('registered_owner_country') or '',
|
|
'address': '',
|
|
'has_email': bool(email_raw),
|
|
'has_phone': False,
|
|
})
|
|
# Operator (if different from owner)
|
|
if r.get('operator') and r.get('operator') != r.get('owner'):
|
|
contacts_out.append({
|
|
'company_name': r['operator'],
|
|
'vessel': vessel_info,
|
|
'role': 'Operator',
|
|
'email': '',
|
|
'phone': '',
|
|
'website': r.get('operator_website') or '',
|
|
'country': r.get('operator_country') or '',
|
|
'address': '',
|
|
'has_email': False,
|
|
'has_phone': False,
|
|
})
|
|
# Commercial manager (if different)
|
|
if r.get('commercial_manager') and r.get('commercial_manager') != r.get('owner'):
|
|
contacts_out.append({
|
|
'company_name': r['commercial_manager'],
|
|
'vessel': vessel_info,
|
|
'role': 'Commercial Manager',
|
|
'email': '',
|
|
'phone': '',
|
|
'website': r.get('commercial_manager_website') or '',
|
|
'country': r.get('commercial_manager_country') or '',
|
|
'address': '',
|
|
'has_email': False,
|
|
'has_phone': False,
|
|
})
|
|
results.extend(contacts_out)
|
|
# Deduplicate by company_name + role
|
|
seen = set()
|
|
unique = []
|
|
for c in results:
|
|
key = (c['company_name'], c['role'])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(c)
|
|
return unique[:limit]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_contacts_for_vessel(mmsi: str) -> list:
|
|
"""Get all contacts associated with a vessel."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM contacts WHERE vessels_mmsi LIKE ?", (f'%"{mmsi}"%',))
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# USER OPERATIONS
|
|
# =============================================================================
|
|
|
|
def create_user(email: str, password_hash: str, name: str = '', lang: str = 'en') -> int:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO users (email, password_hash, name, lang) VALUES (?, ?, ?, ?)",
|
|
(email.lower().strip(), password_hash, name.strip(), lang)
|
|
)
|
|
conn.commit()
|
|
user_id = cursor.lastrowid
|
|
return user_id
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_user_by_telegram_chat_id(chat_id: int):
|
|
"""Find user linked to a Telegram chat_id."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM users WHERE telegram_chat_id = ?", (chat_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def create_telegram_link(user_id: int) -> str:
|
|
"""Generate a one-time deep-link code for Telegram binding. Returns code."""
|
|
import secrets
|
|
code = secrets.token_urlsafe(16)
|
|
ttl = config.TELEGRAM_LINK_TTL_MINUTES
|
|
expires = (datetime.utcnow() + timedelta(minutes=ttl)).isoformat()
|
|
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
# Remove previous codes for this user + expired codes
|
|
cursor.execute("DELETE FROM telegram_links WHERE user_id = ?", (user_id,))
|
|
cursor.execute("DELETE FROM telegram_links WHERE expires_at < ?",
|
|
(datetime.utcnow().isoformat(),))
|
|
cursor.execute(
|
|
"INSERT INTO telegram_links (code, user_id, expires_at) VALUES (?, ?, ?)",
|
|
(code, user_id, expires))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return code
|
|
|
|
|
|
def verify_telegram_link(code: str, chat_id: int):
|
|
"""Verify link code and bind chat_id to user. Returns user dict or None."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT user_id FROM telegram_links WHERE code = ? AND expires_at > ?",
|
|
(code, datetime.utcnow().isoformat()))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
|
|
user_id = row[0] if isinstance(row, (list, tuple)) else row['user_id']
|
|
|
|
# Bind chat_id to user
|
|
cursor.execute("UPDATE users SET telegram_chat_id = ? WHERE id = ?",
|
|
(chat_id, user_id))
|
|
# Consume the code
|
|
cursor.execute("DELETE FROM telegram_links WHERE code = ?", (code,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return get_user_by_id(user_id)
|
|
|
|
|
|
def unlink_telegram(user_id: int):
|
|
"""Remove Telegram binding from user."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE users SET telegram_chat_id = NULL WHERE id = ?",
|
|
(user_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_user_by_email(email: str) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM users WHERE email = ?", (email.lower().strip(),))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_user_by_id(user_id: int) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_user_last_login(user_id: int):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE users SET last_login = ? WHERE id = ?",
|
|
(datetime.utcnow().isoformat(), user_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_user_balance(user_id: int, amount: float):
|
|
amount = round(amount, 2)
|
|
if amount <= 0:
|
|
raise ValueError(f"update_user_balance: amount must be positive, got {amount}")
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE users SET balance = round(balance + ?, 2) WHERE id = ?", (amount, user_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def charge_user(user_id: int, amount: float) -> bool:
|
|
"""Atomically charge user balance. Returns True if sufficient funds."""
|
|
amount = round(amount, 2)
|
|
if amount <= 0:
|
|
return False
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE users SET balance = round(balance - ?, 2) WHERE id = ? AND balance >= ?",
|
|
(amount, user_id, amount)
|
|
)
|
|
success = cursor.rowcount > 0
|
|
conn.commit()
|
|
return success
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def add_service_charge(user_id: int, service: str, amount: float, details: str = None):
|
|
"""Log a service charge for revenue tracking."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO service_charges (user_id, service, amount, details) VALUES (?, ?, ?, ?)",
|
|
(user_id, service, amount, details)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def save_purchased_contacts(user_id: int, contacts: list, query: str, contact_type: str, amount: float):
|
|
"""Save purchased (unlocked) contacts for the user."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
for c in contacts:
|
|
cursor.execute(
|
|
"INSERT INTO purchased_contacts (user_id, contact_data, query, contact_type, amount_paid) VALUES (?, ?, ?, ?, ?)",
|
|
(user_id, json.dumps(c, ensure_ascii=False), query, contact_type, amount)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def charge_and_log(user_id: int, amount: float, service: str, details: str = None,
|
|
contacts: list = None, query: str = None, contact_type: str = None) -> bool:
|
|
"""Atomically charge user, log service charge, and optionally save purchased contacts.
|
|
All operations in a single transaction -- either all succeed or all roll back."""
|
|
amount = round(amount, 2)
|
|
if amount <= 0:
|
|
return False
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
# 1. Atomic balance deduction
|
|
cursor.execute(
|
|
"UPDATE users SET balance = round(balance - ?, 2) WHERE id = ? AND balance >= ?",
|
|
(amount, user_id, amount)
|
|
)
|
|
if cursor.rowcount == 0:
|
|
conn.rollback()
|
|
return False
|
|
# 2. Log service charge
|
|
cursor.execute(
|
|
"INSERT INTO service_charges (user_id, service, amount, details) VALUES (?, ?, ?, ?)",
|
|
(user_id, service, amount, details)
|
|
)
|
|
# 3. Optionally save purchased contacts
|
|
if contacts:
|
|
for c in contacts:
|
|
cursor.execute(
|
|
"INSERT INTO purchased_contacts (user_id, contact_data, query, contact_type, amount_paid) VALUES (?, ?, ?, ?, ?)",
|
|
(user_id, json.dumps(c, ensure_ascii=False), query or '', contact_type or '', amount)
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_purchased_contacts(user_id: int) -> list:
|
|
"""Get all purchased contacts for a user."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT id, contact_data, query, contact_type, amount_paid, created_at FROM purchased_contacts WHERE user_id = ? ORDER BY id DESC",
|
|
(user_id,)
|
|
)
|
|
rows = cursor.fetchall()
|
|
results = []
|
|
for r in rows:
|
|
try:
|
|
data = json.loads(r[1])
|
|
except (json.JSONDecodeError, TypeError):
|
|
data = {}
|
|
results.append({
|
|
'id': r[0],
|
|
'contact': data,
|
|
'query': r[2],
|
|
'contact_type': r[3],
|
|
'amount_paid': r[4],
|
|
'purchased_at': r[5],
|
|
})
|
|
return results
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def has_purchased_contact(user_id: int, query: str, contact_type: str = None) -> list:
|
|
"""Check if user already purchased contacts for this query."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
if contact_type:
|
|
cursor.execute(
|
|
"SELECT contact_data FROM purchased_contacts WHERE user_id = ? AND LOWER(query) = LOWER(?) AND contact_type = ?",
|
|
(user_id, query, contact_type)
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"SELECT contact_data FROM purchased_contacts WHERE user_id = ? AND LOWER(query) = LOWER(?)",
|
|
(user_id, query)
|
|
)
|
|
rows = cursor.fetchall()
|
|
contacts = []
|
|
for r in rows:
|
|
try:
|
|
contacts.append(json.loads(r[0]))
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return contacts
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_revenue_stats() -> dict:
|
|
"""Get platform revenue stats from service charges."""
|
|
conn = get_connection()
|
|
try:
|
|
return _get_revenue_stats_inner(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _get_revenue_stats_inner(conn) -> dict:
|
|
cursor = conn.cursor()
|
|
now = datetime.utcnow()
|
|
|
|
# --- Service charges (main revenue) ---
|
|
cursor.execute("SELECT COALESCE(SUM(amount), 0) FROM service_charges")
|
|
total_service_revenue = cursor.fetchone()[0]
|
|
|
|
cursor.execute(
|
|
"SELECT service, COUNT(*) as cnt, SUM(amount) as total FROM service_charges GROUP BY service ORDER BY total DESC"
|
|
)
|
|
by_service = [{'service': r[0], 'count': r[1], 'total': round(r[2], 2)} for r in cursor.fetchall()]
|
|
|
|
# Today
|
|
today_str = now.strftime('%Y-%m-%d')
|
|
cursor.execute(
|
|
"SELECT COALESCE(SUM(amount), 0) FROM service_charges WHERE created_at >= ?",
|
|
(today_str,)
|
|
)
|
|
today_revenue = cursor.fetchone()[0]
|
|
|
|
# This week (last 7 days)
|
|
week_ago = (now - timedelta(days=7)).strftime('%Y-%m-%d')
|
|
cursor.execute(
|
|
"SELECT COALESCE(SUM(amount), 0) FROM service_charges WHERE created_at >= ?",
|
|
(week_ago,)
|
|
)
|
|
week_revenue = cursor.fetchone()[0]
|
|
|
|
# This month (last 30 days)
|
|
month_ago = (now - timedelta(days=30)).strftime('%Y-%m-%d')
|
|
cursor.execute(
|
|
"SELECT COALESCE(SUM(amount), 0) FROM service_charges WHERE created_at >= ?",
|
|
(month_ago,)
|
|
)
|
|
month_revenue = cursor.fetchone()[0]
|
|
|
|
# --- Deposits & withdrawals ---
|
|
cursor.execute("SELECT COALESCE(SUM(amount), 0), COUNT(*) FROM deposits")
|
|
row = cursor.fetchone()
|
|
total_deposited, deposit_count = row[0], row[1]
|
|
|
|
# Deposit fees earned (2% of each deposit = platform income)
|
|
deposit_fee_income = round(total_deposited * 0.02, 2)
|
|
|
|
cursor.execute("SELECT COALESCE(SUM(balance), 0) FROM users")
|
|
total_balances = cursor.fetchone()[0]
|
|
|
|
cursor.execute(
|
|
"SELECT COALESCE(SUM(amount), 0), COUNT(*) FROM withdrawals WHERE status != 'rejected'"
|
|
)
|
|
row = cursor.fetchone()
|
|
total_withdrawn, withdrawal_count = row[0], row[1]
|
|
|
|
# --- Platform profit ---
|
|
# Profit = (deposits - user_balances - withdrawals) + service_charges
|
|
# deposits - balances - withdrawals = what users spent/fees
|
|
platform_profit = (total_deposited - total_balances - total_withdrawn) + total_service_revenue
|
|
|
|
# --- User metrics ---
|
|
cursor.execute("SELECT COUNT(*) FROM users")
|
|
total_users = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM users WHERE balance > 0")
|
|
users_with_balance = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(DISTINCT user_id) FROM deposits")
|
|
paying_users = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(DISTINCT user_id) FROM service_charges")
|
|
active_buyers = cursor.fetchone()[0]
|
|
|
|
# Users registered this week
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM users WHERE created_at >= ?",
|
|
(week_ago,)
|
|
)
|
|
new_users_week = cursor.fetchone()[0]
|
|
|
|
# --- Daily revenue (last 14 days) ---
|
|
cursor.execute(
|
|
"""SELECT DATE(created_at) as day, SUM(amount) as total
|
|
FROM service_charges
|
|
WHERE created_at >= ?
|
|
GROUP BY DATE(created_at)
|
|
ORDER BY day""",
|
|
((now - timedelta(days=14)).strftime('%Y-%m-%d'),)
|
|
)
|
|
daily_revenue = [{'date': r[0], 'amount': round(r[1], 2)} for r in cursor.fetchall()]
|
|
|
|
# --- Recent charges ---
|
|
cursor.execute(
|
|
"""SELECT sc.id, u.email, sc.service, sc.amount, sc.details, sc.created_at
|
|
FROM service_charges sc JOIN users u ON sc.user_id = u.id
|
|
ORDER BY sc.id DESC LIMIT 20"""
|
|
)
|
|
recent = [{'id': r[0], 'user': r[1], 'service': r[2], 'amount': r[3],
|
|
'details': r[4], 'date': r[5]} for r in cursor.fetchall()]
|
|
|
|
# --- Recent deposits ---
|
|
cursor.execute(
|
|
"""SELECT d.id, u.email, d.amount, d.from_address, d.confirmed_at
|
|
FROM deposits d JOIN users u ON d.user_id = u.id
|
|
ORDER BY d.id DESC LIMIT 10"""
|
|
)
|
|
recent_deposits = [{'id': r[0], 'user': r[1], 'amount': round(r[2], 2),
|
|
'from': r[3][:8] + '...' if r[3] and len(r[3]) > 8 else (r[3] or ''),
|
|
'date': r[4]} for r in cursor.fetchall()]
|
|
|
|
return {
|
|
# Main totals
|
|
'total_revenue': round(total_service_revenue, 2),
|
|
'deposit_fee_income': deposit_fee_income,
|
|
'platform_profit': round(platform_profit, 2),
|
|
# Time periods
|
|
'today_revenue': round(today_revenue, 2),
|
|
'week_revenue': round(week_revenue, 2),
|
|
'month_revenue': round(month_revenue, 2),
|
|
# Financial flow
|
|
'total_deposited': round(total_deposited, 2),
|
|
'deposit_count': deposit_count,
|
|
'total_user_balances': round(total_balances, 2),
|
|
'total_withdrawn': round(total_withdrawn, 2),
|
|
'withdrawal_count': withdrawal_count,
|
|
# Users
|
|
'total_users': total_users,
|
|
'users_with_balance': users_with_balance,
|
|
'paying_users': paying_users,
|
|
'active_buyers': active_buyers,
|
|
'new_users_week': new_users_week,
|
|
# Breakdowns
|
|
'by_service': by_service,
|
|
'daily_revenue': daily_revenue,
|
|
'recent_charges': recent,
|
|
'recent_deposits': recent_deposits,
|
|
}
|
|
|
|
|
|
PORT_CACHE_MINUTES = config.PORT_CACHE_MINUTES
|
|
|
|
|
|
def get_cached_port_vessels(port_key: str) -> list:
|
|
"""Get cached vessels for a port if cache is fresh (< 15 min)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT vessel_data, cached_at FROM port_vessel_cache WHERE port_key = ? ORDER BY id DESC LIMIT 1",
|
|
(port_key.lower(),)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
return None
|
|
try:
|
|
cached_at = datetime.fromisoformat(str(row[1]))
|
|
except (ValueError, TypeError):
|
|
return None
|
|
if datetime.utcnow() - cached_at > timedelta(minutes=PORT_CACHE_MINUTES):
|
|
return None
|
|
try:
|
|
return json.loads(row[0])
|
|
except (json.JSONDecodeError, TypeError):
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def cache_port_vessels(port_key: str, vessels: list):
|
|
"""Cache vessels list for a port."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO port_vessel_cache (port_key, vessel_data, vessel_count) VALUES (?, ?, ?)",
|
|
(port_key.lower(), json.dumps(vessels, ensure_ascii=False), len(vessels))
|
|
)
|
|
cursor.execute(
|
|
"DELETE FROM port_vessel_cache WHERE port_key = ? AND id NOT IN "
|
|
"(SELECT id FROM port_vessel_cache WHERE port_key = ? ORDER BY id DESC LIMIT 2)",
|
|
(port_key.lower(), port_key.lower())
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def set_user_admin(user_id: int, is_admin: bool = True):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE users SET is_admin = ? WHERE id = ?", (1 if is_admin else 0, user_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# SESSION OPERATIONS
|
|
# =============================================================================
|
|
|
|
def create_session(user_id: int, token: str, expires_hours: int = 72) -> int:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
expires = (datetime.utcnow() + timedelta(hours=expires_hours)).isoformat()
|
|
cursor.execute(
|
|
"INSERT INTO sessions (user_id, token, expires_at) VALUES (?, ?, ?)",
|
|
(user_id, token, expires)
|
|
)
|
|
conn.commit()
|
|
session_id = cursor.lastrowid
|
|
return session_id
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_session(token: str) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM sessions WHERE token = ? AND expires_at > ?",
|
|
(token, datetime.utcnow().isoformat())
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def delete_session(token: str):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM sessions WHERE token = ?", (token,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# CHAT HISTORY OPERATIONS
|
|
# =============================================================================
|
|
|
|
def add_chat_message(user_id: int, role: str, message: str):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO chat_history (user_id, role, message) VALUES (?, ?, ?)",
|
|
(user_id, role, message)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_chat_history(user_id: int, limit: int = 50) -> list:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT role, message, created_at FROM chat_history
|
|
WHERE user_id = ?
|
|
ORDER BY id DESC LIMIT ?
|
|
""", (user_id, limit))
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in reversed(rows)]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def clear_chat_history(user_id: int):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM chat_history WHERE user_id = ?", (user_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_chat_message_count(user_id: int) -> int:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM chat_history WHERE user_id = ?", (user_id,))
|
|
return cursor.fetchone()[0]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# AGENT MEMORY (3-tier system)
|
|
# =============================================================================
|
|
|
|
def save_user_memory(user_id: int, memory_type: str, content: str, entity: str = None, is_permanent: bool = False):
|
|
"""Save a memory fact. Deduplicates by content similarity."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
# Check for duplicate (same user + similar content)
|
|
cursor.execute(
|
|
"SELECT id FROM user_memory WHERE user_id = ? AND content = ?",
|
|
(user_id, content)
|
|
)
|
|
if cursor.fetchone():
|
|
# Update last_accessed instead of duplicating
|
|
cursor.execute(
|
|
"UPDATE user_memory SET last_accessed = CURRENT_TIMESTAMP, relevance_score = 1.0 WHERE user_id = ? AND content = ?",
|
|
(user_id, content)
|
|
)
|
|
conn.commit()
|
|
return
|
|
# Enforce max memories per user
|
|
cursor.execute("SELECT COUNT(*) FROM user_memory WHERE user_id = ? AND is_permanent = FALSE", (user_id,))
|
|
count = cursor.fetchone()[0]
|
|
if count >= 50:
|
|
# Delete oldest non-permanent memory
|
|
cursor.execute("""
|
|
DELETE FROM user_memory WHERE id = (
|
|
SELECT id FROM user_memory WHERE user_id = ? AND is_permanent = FALSE
|
|
ORDER BY relevance_score ASC, last_accessed ASC LIMIT 1
|
|
)
|
|
""", (user_id,))
|
|
cursor.execute(
|
|
"INSERT INTO user_memory (user_id, memory_type, content, entity, is_permanent) VALUES (?, ?, ?, ?, ?)",
|
|
(user_id, memory_type, content, entity, is_permanent)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_user_memories(user_id: int, limit: int = 10) -> list:
|
|
"""Get top memories sorted by: permanent first, then relevance * recency."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT id, memory_type, content, entity, is_permanent, relevance_score, created_at
|
|
FROM user_memory WHERE user_id = ?
|
|
ORDER BY is_permanent DESC, relevance_score DESC, last_accessed DESC
|
|
LIMIT ?
|
|
""", (user_id, limit))
|
|
rows = cursor.fetchall()
|
|
# Update last_accessed for retrieved memories
|
|
if rows:
|
|
ids = [dict(r)['id'] for r in rows]
|
|
placeholders = ','.join(['?' for _ in ids])
|
|
cursor.execute(f"UPDATE user_memory SET last_accessed = CURRENT_TIMESTAMP WHERE id IN ({placeholders})", ids)
|
|
conn.commit()
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def delete_user_memory(user_id: int, memory_id: int):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM user_memory WHERE id = ? AND user_id = ?", (memory_id, user_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def save_conversation_summary(user_id: int, summary: str, message_count: int):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO conversation_summaries (user_id, summary, message_count) VALUES (?, ?, ?)",
|
|
(user_id, summary, message_count)
|
|
)
|
|
# Keep only last 10 summaries per user
|
|
cursor.execute("""
|
|
DELETE FROM conversation_summaries WHERE id NOT IN (
|
|
SELECT id FROM conversation_summaries WHERE user_id = ?
|
|
ORDER BY id DESC LIMIT 10
|
|
) AND user_id = ?
|
|
""", (user_id, user_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_conversation_summaries(user_id: int, limit: int = 3) -> list:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT summary, message_count, created_at FROM conversation_summaries
|
|
WHERE user_id = ? ORDER BY id DESC LIMIT ?
|
|
""", (user_id, limit))
|
|
return [dict(r) for r in cursor.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# USER PROFILE OPERATIONS
|
|
# =============================================================================
|
|
|
|
PROFILE_FIELDS = [
|
|
'company_name', 'role', 'fleet_size', 'vessel_types', 'trade_routes',
|
|
'cargo_types', 'vessels_of_interest', 'experience_years', 'phone',
|
|
'telegram', 'notes', 'home_port', 'preferred_tonnage', 'search_radius'
|
|
]
|
|
|
|
def get_user_profile(user_id: int) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM user_profiles WHERE user_id = ?", (user_id,))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
profile = dict(row)
|
|
for field in ('vessel_types', 'trade_routes', 'cargo_types', 'vessels_of_interest'):
|
|
try:
|
|
profile[field] = json.loads(profile[field])
|
|
except (json.JSONDecodeError, TypeError):
|
|
profile[field] = []
|
|
return profile
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def upsert_user_profile(user_id: int, data: dict) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
for field in ('vessel_types', 'trade_routes', 'cargo_types', 'vessels_of_interest'):
|
|
if field in data and isinstance(data[field], list):
|
|
data[field] = json.dumps(data[field])
|
|
|
|
cursor.execute("SELECT user_id FROM user_profiles WHERE user_id = ?", (user_id,))
|
|
exists = cursor.fetchone()
|
|
|
|
if exists:
|
|
sets = []
|
|
vals = []
|
|
for k in PROFILE_FIELDS:
|
|
if k in data:
|
|
sets.append(f"{k} = ?")
|
|
vals.append(data[k])
|
|
if sets:
|
|
sets.append("updated_at = ?")
|
|
vals.append(datetime.utcnow().isoformat())
|
|
vals.append(user_id)
|
|
cursor.execute(f"UPDATE user_profiles SET {', '.join(sets)} WHERE user_id = ?", vals)
|
|
else:
|
|
cols = ['user_id']
|
|
vals = [user_id]
|
|
for k in PROFILE_FIELDS:
|
|
if k in data:
|
|
cols.append(k)
|
|
vals.append(data[k])
|
|
placeholders = ', '.join(['?'] * len(cols))
|
|
cursor.execute(f"INSERT INTO user_profiles ({', '.join(cols)}) VALUES ({placeholders})", vals)
|
|
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return get_user_profile(user_id)
|
|
|
|
|
|
def _sanitize_profile_field(value, max_len: int = 100) -> str:
|
|
"""Strip prompt-injection patterns, HTML tags, and control chars from profile field."""
|
|
if not value or not isinstance(value, str):
|
|
return ''
|
|
val = re.sub(r'<[^>]+>', '', value) # strip HTML tags
|
|
val = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', val)
|
|
val = val.replace('```', '')
|
|
val = re.sub(
|
|
r'(?i)(ignore|disregard|forget|override).{0,20}(previous|above|all|system|instructions?|prompt)',
|
|
'[filtered]', val)
|
|
val = re.sub(
|
|
r'(?i)(you are now|act as|pretend|new instructions?|system prompt|HACKED)',
|
|
'[filtered]', val)
|
|
return val.strip()[:max_len]
|
|
|
|
|
|
def get_profile_summary(user_id: int) -> str:
|
|
"""Get a human-readable profile summary for AI context."""
|
|
profile = get_user_profile(user_id)
|
|
if not profile:
|
|
return ""
|
|
|
|
parts = []
|
|
if profile.get('company_name'):
|
|
parts.append(f"Company: {_sanitize_profile_field(profile['company_name'], 80)}")
|
|
if profile.get('role'):
|
|
parts.append(f"Role: {_sanitize_profile_field(profile['role'], 50)}")
|
|
if profile.get('fleet_size'):
|
|
parts.append(f"Fleet size: {profile['fleet_size']} vessels")
|
|
if profile.get('vessel_types') and len(profile['vessel_types']) > 0:
|
|
safe = [_sanitize_profile_field(v, 50) for v in profile['vessel_types'] if v]
|
|
if safe:
|
|
parts.append(f"Vessel types: {', '.join(safe)}")
|
|
if profile.get('trade_routes') and len(profile['trade_routes']) > 0:
|
|
safe = [_sanitize_profile_field(v, 80) for v in profile['trade_routes'] if v]
|
|
if safe:
|
|
parts.append(f"Trade routes: {', '.join(safe)}")
|
|
if profile.get('cargo_types') and len(profile['cargo_types']) > 0:
|
|
safe = [_sanitize_profile_field(v, 50) for v in profile['cargo_types'] if v]
|
|
if safe:
|
|
parts.append(f"Cargo types: {', '.join(safe)}")
|
|
if profile.get('vessels_of_interest') and len(profile['vessels_of_interest']) > 0:
|
|
safe = [_sanitize_profile_field(v, 50) for v in profile['vessels_of_interest'] if v]
|
|
if safe:
|
|
parts.append(f"Vessels of interest (fleet/watchlist): {', '.join(safe)}")
|
|
if profile.get('experience_years'):
|
|
parts.append(f"Experience: {profile['experience_years']} years")
|
|
if profile.get('home_port'):
|
|
parts.append(f"Home port: {_sanitize_profile_field(profile['home_port'], 80)}")
|
|
if profile.get('preferred_tonnage'):
|
|
parts.append(f"Preferred tonnage: {profile['preferred_tonnage']} t")
|
|
if profile.get('search_radius'):
|
|
parts.append(f"Search radius: {profile['search_radius']} NM")
|
|
if profile.get('notes'):
|
|
parts.append(f"Notes: {_sanitize_profile_field(profile['notes'], 200)}")
|
|
|
|
return '; '.join(parts)
|
|
|
|
|
|
# =============================================================================
|
|
# WALLET OPERATIONS (USDT TRC20) — keys encrypted at rest
|
|
# =============================================================================
|
|
|
|
def create_wallet(user_id: int, address: str, private_key_hex: str) -> dict:
|
|
encrypted_key = _encrypt_private_key(private_key_hex)
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO wallets (user_id, address, private_key_hex) VALUES (?, ?, ?)",
|
|
(user_id, address, encrypted_key)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return get_user_wallet(user_id)
|
|
|
|
|
|
def get_user_wallet(user_id: int) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM wallets WHERE user_id = ?", (user_id,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
wallet = dict(row)
|
|
wallet['private_key_hex'] = _decrypt_private_key(wallet.get('private_key_hex', ''))
|
|
return wallet
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_wallet_by_address(address: str) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM wallets WHERE address = ?", (address,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
wallet = dict(row)
|
|
wallet['private_key_hex'] = _decrypt_private_key(wallet.get('private_key_hex', ''))
|
|
return wallet
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# DEPOSIT OPERATIONS
|
|
# =============================================================================
|
|
|
|
def add_deposit(user_id: int, tx_id: str, amount: float, from_address: str = '') -> int:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO deposits (user_id, tx_id, amount, from_address) VALUES (?, ?, ?, ?)",
|
|
(user_id, tx_id, amount, from_address)
|
|
)
|
|
conn.commit()
|
|
deposit_id = cursor.lastrowid
|
|
return deposit_id
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def is_deposit_processed(tx_id: str) -> bool:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id FROM deposits WHERE tx_id = ?", (tx_id,))
|
|
row = cursor.fetchone()
|
|
return row is not None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_user_deposits(user_id: int, limit: int = 20) -> list:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT tx_id, amount, from_address, confirmed_at FROM deposits WHERE user_id = ? ORDER BY id DESC LIMIT ?",
|
|
(user_id, limit)
|
|
)
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# WITHDRAWAL OPERATIONS
|
|
# =============================================================================
|
|
|
|
def create_withdrawal(user_id: int, amount: float, to_address: str) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO withdrawals (user_id, amount, to_address) VALUES (?, ?, ?)",
|
|
(user_id, amount, to_address)
|
|
)
|
|
conn.commit()
|
|
wid = cursor.lastrowid
|
|
finally:
|
|
conn.close()
|
|
return get_withdrawal(wid)
|
|
|
|
|
|
def get_withdrawal(withdrawal_id: int) -> dict:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM withdrawals WHERE id = ?", (withdrawal_id,))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_user_withdrawals(user_id: int, limit: int = 20) -> list:
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT id, amount, to_address, status, tx_id, created_at, processed_at FROM withdrawals WHERE user_id = ? ORDER BY id DESC LIMIT ?",
|
|
(user_id, limit)
|
|
)
|
|
rows = cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_withdrawal_status(withdrawal_id: int, status: str, tx_id: str = None):
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE withdrawals SET status = ?, tx_id = ?, processed_at = ? WHERE id = ?",
|
|
(status, tx_id, datetime.utcnow().isoformat(), withdrawal_id)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# EQUASIS CACHE
|
|
# =============================================================================
|
|
|
|
EQUASIS_CACHE_HOURS = config.EQUASIS_CACHE_HOURS
|
|
|
|
EQUASIS_DAILY_LIMIT = config.EQUASIS_DAILY_LIMIT
|
|
|
|
|
|
def get_equasis_cache(cache_key: str, cache_type: str):
|
|
"""Get cached Equasis data if fresh enough. Returns parsed data or None."""
|
|
max_hours = EQUASIS_CACHE_HOURS.get(cache_type, 24)
|
|
since = (datetime.utcnow() - timedelta(hours=max_hours)).isoformat()
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT data FROM equasis_cache WHERE cache_key = ? AND cache_type = ? AND created_at > ? ORDER BY id DESC LIMIT 1",
|
|
(cache_key.lower(), cache_type, since)
|
|
)
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
try:
|
|
return json.loads(row[0])
|
|
except (json.JSONDecodeError, TypeError):
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def set_equasis_cache(cache_key: str, cache_type: str, data):
|
|
"""Cache Equasis query result."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO equasis_cache (cache_key, cache_type, data) VALUES (?, ?, ?)",
|
|
(cache_key.lower(), cache_type, json.dumps(data, ensure_ascii=False))
|
|
)
|
|
# Clean old entries (keep last 3 per key+type)
|
|
cursor.execute(
|
|
"DELETE FROM equasis_cache WHERE cache_key = ? AND cache_type = ? AND id NOT IN "
|
|
"(SELECT id FROM equasis_cache WHERE cache_key = ? AND cache_type = ? ORDER BY id DESC LIMIT 3)",
|
|
(cache_key.lower(), cache_type, cache_key.lower(), cache_type)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def increment_equasis_counter() -> int:
|
|
"""Increment daily Equasis query counter. Returns current count."""
|
|
today = datetime.utcnow().strftime('%Y-%m-%d')
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT count FROM equasis_daily_counter WHERE date_str = ?", (today,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
new_count = row[0] + 1
|
|
cursor.execute("UPDATE equasis_daily_counter SET count = ? WHERE date_str = ?", (new_count, today))
|
|
else:
|
|
new_count = 1
|
|
cursor.execute("INSERT INTO equasis_daily_counter (date_str, count) VALUES (?, ?)", (today, 1))
|
|
|
|
# Clean old dates (keep last 7 days)
|
|
week_ago = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d')
|
|
cursor.execute("DELETE FROM equasis_daily_counter WHERE date_str < ?", (week_ago,))
|
|
|
|
conn.commit()
|
|
return new_count
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_equasis_remaining() -> int:
|
|
"""Get remaining Equasis queries for today."""
|
|
today = datetime.utcnow().strftime('%Y-%m-%d')
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT count FROM equasis_daily_counter WHERE date_str = ?", (today,))
|
|
row = cursor.fetchone()
|
|
used = row[0] if row else 0
|
|
return max(0, EQUASIS_DAILY_LIMIT - used)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_vessels_needing_enrichment(mmsis: list, ttl_days: int = 7, limit: int = 40) -> list:
|
|
"""Return MMSIs that need Equasis enrichment (no companies or stale)."""
|
|
if not mmsis:
|
|
return []
|
|
conn = get_connection()
|
|
try:
|
|
cur = conn.cursor()
|
|
placeholders = ','.join(['?'] * len(mmsis))
|
|
cutoff = (datetime.utcnow() - timedelta(days=ttl_days)).isoformat()
|
|
# Stale known vessels
|
|
cur.execute(f"""
|
|
SELECT mmsi FROM vessels
|
|
WHERE mmsi IN ({placeholders})
|
|
AND (companies_json IS NULL OR updated_at < ?)
|
|
ORDER BY updated_at ASC
|
|
LIMIT ?
|
|
""", mmsis + [cutoff, limit])
|
|
stale = [row[0] for row in cur.fetchall()]
|
|
# Unknown MMSIs (not in vessels table at all)
|
|
cur.execute(f"SELECT mmsi FROM vessels WHERE mmsi IN ({placeholders})", mmsis)
|
|
known_set = {row[0] for row in cur.fetchall()}
|
|
unknown = [m for m in mmsis if m not in known_set]
|
|
return (unknown + stale)[:limit]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# SUBSCRIPTION PLANS
|
|
# =============================================================================
|
|
|
|
SUBSCRIPTION_PLANS = {
|
|
'free': {
|
|
'name': 'Free',
|
|
'price_monthly': 0,
|
|
'vessel_lookups_day': 10,
|
|
'route_calcs_day': 5,
|
|
'port_scans_day': 3,
|
|
'contact_unlock_price': 10.0,
|
|
},
|
|
'basic': {
|
|
'name': 'Basic',
|
|
'price_monthly': 29,
|
|
'vessel_lookups_day': 100,
|
|
'route_calcs_day': 50,
|
|
'port_scans_day': 20,
|
|
'contact_unlock_price': 5.0,
|
|
},
|
|
'pro': {
|
|
'name': 'Professional',
|
|
'price_monthly': 99,
|
|
'vessel_lookups_day': -1, # unlimited
|
|
'route_calcs_day': -1,
|
|
'port_scans_day': -1,
|
|
'contact_unlock_price': 0.0,
|
|
},
|
|
}
|
|
|
|
|
|
def get_user_plan(user_id: int) -> dict:
|
|
"""Get user's current subscription plan details."""
|
|
user = get_user_by_id(user_id)
|
|
if not user:
|
|
return SUBSCRIPTION_PLANS['free']
|
|
|
|
if user.get('is_admin'):
|
|
return {**SUBSCRIPTION_PLANS['pro'], 'name': 'Admin (unlimited)'}
|
|
|
|
plan_key = user.get('plan', 'free')
|
|
expires = user.get('plan_expires_at')
|
|
|
|
# Check if plan expired
|
|
if plan_key != 'free' and expires:
|
|
try:
|
|
exp_dt = datetime.fromisoformat(str(expires))
|
|
if datetime.utcnow() > exp_dt:
|
|
# Plan expired — downgrade to free
|
|
set_user_plan(user_id, 'free')
|
|
return SUBSCRIPTION_PLANS['free']
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
return SUBSCRIPTION_PLANS.get(plan_key, SUBSCRIPTION_PLANS['free'])
|
|
|
|
|
|
def set_user_plan(user_id: int, plan: str, days: int = 30):
|
|
"""Set user's subscription plan."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
if plan == 'free':
|
|
cursor.execute("UPDATE users SET plan = ?, plan_expires_at = NULL WHERE id = ?",
|
|
(plan, user_id))
|
|
else:
|
|
expires = (datetime.utcnow() + timedelta(days=days)).isoformat()
|
|
cursor.execute("UPDATE users SET plan = ?, plan_expires_at = ? WHERE id = ?",
|
|
(plan, expires, user_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# STATS
|
|
# =============================================================================
|
|
|
|
def get_stats() -> dict:
|
|
"""Get database statistics."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM vessels")
|
|
stats['vessels'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM positions")
|
|
stats['positions'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM contacts")
|
|
stats['contacts'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM port_calls")
|
|
stats['port_calls'] = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM users")
|
|
stats['users'] = cursor.fetchone()[0]
|
|
|
|
stats['db_backend'] = 'PostgreSQL' if USE_POSTGRES else 'SQLite'
|
|
|
|
return stats
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# QUERY ANALYTICS
|
|
# =============================================================================
|
|
|
|
def log_query(user_id: int = None, query_text: str = '', query_type: str = None,
|
|
tool_used: str = None, response_time_ms: int = None,
|
|
input_tokens: int = None, output_tokens: int = None,
|
|
cached_tokens: int = None, cost_usd: float = None,
|
|
iterations: int = None):
|
|
"""Log a user query for analytics with optional token/cost tracking."""
|
|
conn = None
|
|
try:
|
|
conn = get_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO query_log (user_id, query_text, query_type, tool_used, response_time_ms,
|
|
input_tokens, output_tokens, cached_tokens, cost_usd, iterations)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (user_id, query_text[:500], query_type, tool_used, response_time_ms,
|
|
input_tokens, output_tokens, cached_tokens, cost_usd, iterations))
|
|
conn.commit()
|
|
except Exception:
|
|
pass # Don't fail on analytics
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
def get_query_stats(days: int = 7) -> dict:
|
|
"""Get query analytics for the last N days."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
since = (datetime.utcnow() - timedelta(days=days)).isoformat()
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM query_log WHERE created_at > ?", (since,))
|
|
total = cursor.fetchone()[0]
|
|
|
|
cursor.execute("""
|
|
SELECT tool_used, COUNT(*) as cnt FROM query_log
|
|
WHERE created_at > ? AND tool_used IS NOT NULL
|
|
GROUP BY tool_used ORDER BY cnt DESC
|
|
""", (since,))
|
|
by_tool = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
cursor.execute("""
|
|
SELECT query_type, COUNT(*) as cnt FROM query_log
|
|
WHERE created_at > ? AND query_type IS NOT NULL
|
|
GROUP BY query_type ORDER BY cnt DESC
|
|
""", (since,))
|
|
by_type = {row[0]: row[1] for row in cursor.fetchall()}
|
|
|
|
cursor.execute("""
|
|
SELECT AVG(response_time_ms) FROM query_log
|
|
WHERE created_at > ? AND response_time_ms IS NOT NULL
|
|
""", (since,))
|
|
avg_time = cursor.fetchone()[0]
|
|
|
|
return {
|
|
'period_days': days,
|
|
'total_queries': total,
|
|
'by_tool': by_tool,
|
|
'by_type': by_type,
|
|
'avg_response_ms': round(avg_time) if avg_time else None,
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_cost_stats(days: int = 30) -> dict:
|
|
"""Get Claude API cost analytics from query_log."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
now = datetime.utcnow()
|
|
since = (now - timedelta(days=days)).isoformat()
|
|
today_str = now.strftime('%Y-%m-%d')
|
|
week_ago = (now - timedelta(days=7)).isoformat()
|
|
month_ago = (now - timedelta(days=30)).isoformat()
|
|
|
|
# Overall totals
|
|
cursor.execute("""
|
|
SELECT COUNT(*), COALESCE(SUM(cost_usd), 0),
|
|
COALESCE(SUM(input_tokens), 0), COALESCE(SUM(output_tokens), 0),
|
|
COALESCE(SUM(cached_tokens), 0), COALESCE(AVG(iterations), 0)
|
|
FROM query_log WHERE created_at > ? AND cost_usd IS NOT NULL
|
|
""", (since,))
|
|
row = cursor.fetchone()
|
|
total_queries = row[0] or 0
|
|
total_cost = round(row[1], 4)
|
|
total_input = row[2] or 0
|
|
total_output = row[3] or 0
|
|
total_cached = row[4] or 0
|
|
avg_iterations = round(row[5], 1) if row[5] else 0
|
|
|
|
# Cost by period
|
|
cursor.execute("SELECT COALESCE(SUM(cost_usd), 0) FROM query_log WHERE created_at >= ? AND cost_usd IS NOT NULL", (today_str,))
|
|
cost_today = round(cursor.fetchone()[0], 4)
|
|
|
|
cursor.execute("SELECT COALESCE(SUM(cost_usd), 0) FROM query_log WHERE created_at >= ? AND cost_usd IS NOT NULL", (week_ago,))
|
|
cost_week = round(cursor.fetchone()[0], 4)
|
|
|
|
cursor.execute("SELECT COALESCE(SUM(cost_usd), 0) FROM query_log WHERE created_at >= ? AND cost_usd IS NOT NULL", (month_ago,))
|
|
cost_month = round(cursor.fetchone()[0], 4)
|
|
|
|
# Top users by cost
|
|
cursor.execute("""
|
|
SELECT q.user_id, COALESCE(u.email, 'anonymous'), COUNT(*) as queries,
|
|
COALESCE(SUM(q.cost_usd), 0) as total_cost
|
|
FROM query_log q LEFT JOIN users u ON q.user_id = u.id
|
|
WHERE q.created_at > ? AND q.cost_usd IS NOT NULL
|
|
GROUP BY q.user_id, u.email ORDER BY total_cost DESC LIMIT 10
|
|
""", (since,))
|
|
top_users = [{'user_id': r[0], 'email': r[1], 'queries': r[2],
|
|
'total_cost': round(r[3], 4)} for r in cursor.fetchall()]
|
|
|
|
# Daily cost trend
|
|
cursor.execute("""
|
|
SELECT DATE(created_at) as day, COUNT(*) as queries,
|
|
COALESCE(SUM(cost_usd), 0) as cost,
|
|
COALESCE(AVG(input_tokens), 0) as avg_in,
|
|
COALESCE(AVG(output_tokens), 0) as avg_out
|
|
FROM query_log
|
|
WHERE created_at > ? AND cost_usd IS NOT NULL
|
|
GROUP BY DATE(created_at) ORDER BY day
|
|
""", (since,))
|
|
daily_costs = [{'date': r[0], 'queries': r[1], 'cost': round(r[2], 4),
|
|
'avg_input': round(r[3]), 'avg_output': round(r[4])} for r in cursor.fetchall()]
|
|
|
|
# Cache efficiency
|
|
cache_rate = round(total_cached * 100.0 / (total_input + total_cached), 1) if (total_input + total_cached) > 0 else 0
|
|
|
|
return {
|
|
'period_days': days,
|
|
'total_queries': total_queries,
|
|
'total_cost_usd': total_cost,
|
|
'avg_cost_per_query': round(total_cost / total_queries, 4) if total_queries else 0,
|
|
'cost_today': cost_today,
|
|
'cost_week': cost_week,
|
|
'cost_month': cost_month,
|
|
'top_users': top_users,
|
|
'daily_costs': daily_costs,
|
|
'cache_stats': {
|
|
'total_input_tokens': total_input,
|
|
'total_cached_tokens': total_cached,
|
|
'cache_hit_rate_pct': cache_rate,
|
|
},
|
|
'efficiency': {
|
|
'avg_iterations': avg_iterations,
|
|
'avg_input_per_query': round(total_input / total_queries) if total_queries else 0,
|
|
'avg_output_per_query': round(total_output / total_queries) if total_queries else 0,
|
|
},
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# MOLTBOOK BOT
|
|
# =============================================================================
|
|
|
|
def has_commented_post(post_id: str) -> bool:
|
|
"""Check if we already commented on a Moltbook post."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT 1 FROM moltbook_comments WHERE post_id = ?", (str(post_id),))
|
|
result = cursor.fetchone()
|
|
return result is not None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def save_moltbook_comment(post_id: str, title: str, body: str):
|
|
"""Save record of a Moltbook comment."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
if USE_POSTGRES:
|
|
cursor.execute(
|
|
"INSERT INTO moltbook_comments (post_id, post_title, comment_body) VALUES (?, ?, ?) ON CONFLICT DO NOTHING",
|
|
(str(post_id), title[:500] if title else '', body[:2000] if body else '')
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO moltbook_comments (post_id, post_title, comment_body) VALUES (?, ?, ?)",
|
|
(str(post_id), title[:500] if title else '', body[:2000] if body else '')
|
|
)
|
|
conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"DB error saving moltbook comment: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_moltbook_stats() -> dict:
|
|
"""Get Moltbook bot statistics."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM moltbook_comments")
|
|
total = cursor.fetchone()[0]
|
|
cutoff = (datetime.utcnow() - timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor.execute("SELECT COUNT(*) FROM moltbook_comments WHERE commented_at > ?", (cutoff,))
|
|
last_24h = cursor.fetchone()[0]
|
|
return {'total_comments': total, 'last_24h': last_24h}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# =============================================================================
|
|
# VESSEL WATCHES (Telegram bot: /watch command)
|
|
# =============================================================================
|
|
|
|
def add_vessel_watch(chat_id: int, mmsi: str, vessel_name: str, watch_type: str,
|
|
destination_port: str = None, destination_lat: float = None,
|
|
destination_lon: float = None, port_radius_nm: float = 15,
|
|
last_status: str = None) -> int:
|
|
"""Create a vessel watch. Returns watch ID."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO vessel_watches (chat_id, mmsi, vessel_name, watch_type, "
|
|
"destination_port, destination_lat, destination_lon, port_radius_nm, last_status) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(chat_id, mmsi, vessel_name, watch_type,
|
|
destination_port, destination_lat, destination_lon, port_radius_nm, last_status)
|
|
)
|
|
conn.commit()
|
|
return cursor.lastrowid
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_active_watches(chat_id: int = None) -> list:
|
|
"""Get active (not notified) watches. If chat_id given, filter by chat."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
if chat_id is not None:
|
|
cursor.execute(
|
|
"SELECT * FROM vessel_watches WHERE notified = 0 AND chat_id = ? ORDER BY id",
|
|
(chat_id,)
|
|
)
|
|
else:
|
|
cursor.execute("SELECT * FROM vessel_watches WHERE notified = 0 ORDER BY id")
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def mark_watch_notified(watch_id: int):
|
|
"""Mark a watch as notified (fired)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE vessel_watches SET notified = 1 WHERE id = ?", (watch_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def delete_watch(watch_id: int, chat_id: int) -> bool:
|
|
"""Delete a watch (only if it belongs to chat_id). Returns True if deleted."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"DELETE FROM vessel_watches WHERE id = ? AND chat_id = ?",
|
|
(watch_id, chat_id)
|
|
)
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_watch_position(watch_id: int, last_status: str = None,
|
|
last_lat: float = None, last_lon: float = None):
|
|
"""Update watch with latest position/status data."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE vessel_watches SET last_status = ?, last_lat = ?, last_lon = ?, "
|
|
"last_check = CURRENT_TIMESTAMP WHERE id = ?",
|
|
(last_status, last_lat, last_lon, watch_id)
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_db()
|
|
logger.info("Stats: %s", get_stats())
|
|
logger.info("Equasis remaining: %s", get_equasis_remaining())
|
|
logger.info("Moltbook stats: %s", get_moltbook_stats())
|
|
|
|
|
|
def get_vessels_heading_to_staging(destination_patterns: list, vessel_type: str = None, limit: int = 50) -> list:
|
|
"""Query mt_bulk_staging for vessels whose AIS destination matches patterns."""
|
|
if not destination_patterns:
|
|
return []
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
like_clauses = []
|
|
params = []
|
|
for pat in destination_patterns:
|
|
like_clauses.append("UPPER(destination) LIKE ?")
|
|
params.append(f"%{pat.upper()}%")
|
|
where = " OR ".join(like_clauses)
|
|
|
|
type_filter = ""
|
|
if vessel_type:
|
|
type_map = {
|
|
'tanker': ('tanker',), 'bulk': ('bulk', 'general'),
|
|
'container': ('container',), 'general': ('general', 'cargo'),
|
|
}
|
|
types = type_map.get(vessel_type.lower(), (vessel_type.lower(),))
|
|
type_clauses = " OR ".join(["LOWER(type_category) = ?" for _ in types])
|
|
type_filter = f"AND ({type_clauses})"
|
|
params.extend(types)
|
|
|
|
params.append(limit)
|
|
cursor.execute(f"""
|
|
SELECT mmsi, name, flag, dwt, type_category, gt_shiptype,
|
|
lat, lon, speed, course, destination, imo,
|
|
owner, operator, year_built, scraped_at
|
|
FROM mt_bulk_staging
|
|
WHERE ({where}) AND destination IS NOT NULL AND destination != ''
|
|
AND lat IS NOT NULL AND lon IS NOT NULL
|
|
{type_filter}
|
|
ORDER BY scraped_at DESC NULLS LAST
|
|
LIMIT ?
|
|
""", params)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"get_vessels_heading_to_staging error: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|