montana/Русский/Логистика/marinetraffic_parser.py

3381 lines
145 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
MarineTraffic Data Parser & Maritime Tools
Web scraping, AIS data, ports DB (16,553 ports), routing engine,
cargo matching, and 15+ premium maritime tool helpers.
Logical modules (split candidates for future refactoring):
1. PORTS DATABASE (lines ~31-120) _load_world_ports, WORLD_PORTS, PORT_ALIASES
2. CARGO CLASSIFICATION (lines ~123-175) classify_cargo, CARGO_MAP
3. VESSEL CLASSIFICATION (lines ~176-328) classify_vessel_type/subtype, resolve_port, find_nearby_ports
4. SCRAPER CLASS (lines ~329-750) MarineTrafficParser (web scraping)
5. ROUTING ENGINE (lines ~751-1169) calculate_sea_route, fuel/canal/port cost estimators
6. CARGO MATCHING (lines ~1170-1370) fixture_match
7. FREIGHT RATES (lines ~1371-1449) estimate_freight_rate
8. SANCTIONS SCREENING (lines ~1450-1596) screen_sanctions
9. PORT CONGESTION (lines ~1598-1703) estimate_port_congestion
10. BUNKER PRICES (lines ~1704-1891) get_bunker_prices, optimize_bunker_route
11. CHARTER PARTY (lines ~1892-2002) generate_charter_party
12. VESSEL PERFORMANCE (lines ~2003-2135) analyze_vessel_performance
13. BILL OF LADING (lines ~2136-2215) generate_bill_of_lading
14. CREW CHANGE (lines ~2216-2322) optimize_crew_change
15. INSURANCE (lines ~2323-2449) calculate_maritime_insurance
16. PORT COSTS (lines ~2445-2553) estimate_port_costs
17. WEATHER ROUTING (lines ~2554-2661) calculate_weather_routing
18. FIXTURE RECAP (lines ~2662-2753) generate_fixture_recap
19. AIS ANOMALY (lines ~2754-2890) detect_ais_anomalies
20. DARK FLEET (lines ~2891-3081) detect_dark_fleet
21. MODULE API (lines ~3082-end) get_parser, search_vessel, get_vessel
Ɉ MONTANA PROTOCOL ML-DSA-65 (FIPS 204)
"""
import os
import re
import json
import hashlib
import logging
import math
import time
import requests
try:
from curl_cffi import requests as cf_requests
_HAS_CURL_CFFI = True
except ImportError:
_HAS_CURL_CFFI = False
from datetime import datetime
from typing import Optional, Dict, List
from bs4 import BeautifulSoup
logger = logging.getLogger('marinetraffic_parser')
# API key from environment (when available)
MT_API_KEY = os.environ.get("MARINETRAFFIC_API_KEY")
# Base URLs
MT_BASE = "https://www.marinetraffic.com"
MT_API_BASE = "https://services.marinetraffic.com/api"
# =============================================================================
# WORLD PORTS DATABASE (~2000 ports loaded from JSON)
# =============================================================================
def _load_world_ports():
"""Load ports from world_ports.json, fall back to minimal built-in set."""
_ports_json = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'world_ports.json')
if os.path.exists(_ports_json):
try:
with open(_ports_json, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load world_ports.json: {e}")
# Minimal fallback (top 10 ports)
return {
'rotterdam': {'name': 'Rotterdam', 'country': 'Netherlands', 'unlocode': 'NLRTM', 'lat': 51.9244, 'lon': 4.4777, 'radius_nm': 15, 'size': 'large'},
'singapore': {'name': 'Singapore', 'country': 'Singapore', 'unlocode': 'SGSIN', 'lat': 1.2644, 'lon': 103.82, 'radius_nm': 20, 'size': 'large'},
'shanghai': {'name': 'Shanghai', 'country': 'China', 'unlocode': 'CNSHA', 'lat': 31.3622, 'lon': 121.5882, 'radius_nm': 20, 'size': 'large'},
'houston': {'name': 'Houston', 'country': 'USA', 'unlocode': 'USHOU', 'lat': 29.7262, 'lon': -95.0102, 'radius_nm': 20, 'size': 'large'},
'busan': {'name': 'Busan', 'country': 'South Korea', 'unlocode': 'KRPUS', 'lat': 35.0979, 'lon': 129.0371, 'radius_nm': 15, 'size': 'large'},
'jebel_ali': {'name': 'Jebel Ali', 'country': 'UAE', 'unlocode': 'AEJEA', 'lat': 25.0117, 'lon': 55.0637, 'radius_nm': 15, 'size': 'large'},
'new_york': {'name': 'New York', 'country': 'USA', 'unlocode': 'USNYC', 'lat': 40.6701, 'lon': -74.0376, 'radius_nm': 15, 'size': 'large'},
'antwerp': {'name': 'Antwerp', 'country': 'Belgium', 'unlocode': 'BEANR', 'lat': 51.2322, 'lon': 4.3986, 'radius_nm': 12, 'size': 'large'},
'mumbai': {'name': 'Mumbai', 'country': 'India', 'unlocode': 'INBOM', 'lat': 18.94, 'lon': 72.835, 'radius_nm': 15, 'size': 'large'},
'durban': {'name': 'Durban', 'country': 'South Africa', 'unlocode': 'ZADUR', 'lat': -29.8674, 'lon': 31.0386, 'radius_nm': 12, 'size': 'large'},
}
WORLD_PORTS = _load_world_ports()
PORT_ALIASES = {
'europoort': 'rotterdam', 'hook of holland': 'rotterdam',
'changi': 'singapore', 'tanjong pagar': 'singapore', 'jurong': 'singapore',
'pusan': 'busan',
'shenzhen': 'hong_kong', 'yantian': 'hong_kong',
'dubai': 'jebel_ali', 'dp world': 'jebel_ali',
'abu dhabi': 'khalifa_port',
'new jersey': 'new_york', 'newark': 'new_york', 'bayonne': 'new_york',
'tacoma': 'seattle',
'la': 'los_angeles', 'san pedro': 'los_angeles',
'lb': 'long_beach',
'bombay': 'mumbai', 'nhava sheva': 'mumbai', 'jnpt': 'mumbai',
'madras': 'chennai',
'saigon': 'ho_chi_minh',
'tangier': 'tanger_med',
'cape of good hope': 'cape_town',
'johor': 'tanjung_pelepas',
'yangshan': 'shanghai',
'apapa': 'lagos_apapa', 'tin can island': 'tin_can_island',
'sao paulo': 'santos',
# Caspian Sea aliases
'alat port': 'alat', 'baku new port': 'alat', 'port of baku': 'alat',
'krasnovodsk': 'turkmenbashi', 'krw': 'turkmenbashi',
'anzali': 'bandar_anzali', 'bandar-e anzali': 'bandar_e_anzali',
'guryev': 'atyrau',
'noshahr': 'nowshahr',
# Russian port names (пользователь пишет на русском)
'баку': 'baku', 'алят': 'alat', 'актау': 'aktau', 'астрахань': 'astrakhan',
'махачкала': 'makhachkala', 'туркменбаши': 'turkmenbashi', 'красноводск': 'turkmenbashi',
'энзели': 'bandar_anzali', 'анзали': 'bandar_anzali',
'ноушехр': 'nowshahr', 'амирабад': 'amirabad', 'нека': 'neka',
'баутино': 'bautino', 'курык': 'kuryk', 'оля': 'olya',
'роттердам': 'rotterdam', 'сингапур': 'singapore', 'шанхай': 'shanghai',
'стамбул': 'istanbul', 'гонконг': 'hong_kong', 'дубай': 'jebel_ali',
'нью-йорк': 'new_york', 'лондон': 'london', 'гамбург': 'hamburg',
'антверпен': 'antwerp', 'пирей': 'piraeus', 'генуя': 'genoa',
'барселона': 'barcelona', 'марсель': 'marseille', 'одесса': 'odessa',
'новороссийск': 'novorossiysk', 'санкт-петербург': 'saint_petersburg',
'мумбаи': 'mumbai', 'бомбей': 'mumbai', 'токио': 'tokyo',
'пусан': 'busan', 'иокогама': 'yokohama', 'сидней': 'sydney',
'джидда': 'jeddah', 'джебель-али': 'jebel_ali',
'суэц': 'suez', 'порт-саид': 'port_said',
'хьюстон': 'houston', 'лос-анджелес': 'los_angeles',
'сантус': 'santos', 'буэнос-айрес': 'buenos_aires',
'кейптаун': 'cape_town', 'лагос': 'lagos_apapa',
'дар-эс-салам': 'dar_es_salaam',
# Legacy space-key aliases for backward compat
'jebel ali': 'jebel_ali', 'hong kong': 'hong_kong', 'new york': 'new_york',
'los angeles': 'los_angeles', 'long beach': 'long_beach', 'ho chi minh': 'ho_chi_minh',
'tanger med': 'tanger_med', 'cape town': 'cape_town', 'tanjung pelepas': 'tanjung_pelepas',
'port klang': 'port_klang', 'le havre': 'le_havre', 'dar es salaam': 'dar_es_salaam',
'port said': 'port_said', 'richards bay': 'richards_bay', 'khalifa port': 'khalifa_port',
'ras tanura': 'ras_tanura', 'bandar abbas': 'bandar_abbas', 'buenos aires': 'buenos_aires',
'new orleans': 'new_orleans', 'st petersburg': 'saint_petersburg',
'laem chabang': 'laem_chabang', 'gioia tauro': 'gioia_tauro', 'port hedland': 'port_hedland',
}
# =============================================================================
# VESSEL TYPE & NAVIGATION STATUS MAPPINGS
# =============================================================================
# Cargo → vessel type mapping
CARGO_TO_VESSEL = {
# Dry bulk
'grain': 'bulk', 'wheat': 'bulk', 'corn': 'bulk', 'rice': 'bulk', 'barley': 'bulk',
'soybean': 'bulk', 'soybeans': 'bulk', 'coal': 'bulk', 'iron ore': 'bulk', 'ore': 'bulk',
'bauxite': 'bulk', 'phosphate': 'bulk', 'fertilizer': 'bulk', 'cement': 'bulk',
'sugar': 'bulk', 'salt': 'bulk', 'steel': 'bulk', 'scrap': 'bulk', 'clinker': 'bulk',
'minerals': 'bulk', 'aggregate': 'bulk', 'sand': 'bulk', 'gravel': 'bulk',
'flour': 'general', 'meal': 'general', 'feed': 'bulk', 'animal feed': 'bulk',
# Liquid
'crude oil': 'tanker', 'crude': 'tanker', 'oil': 'tanker', 'petroleum': 'tanker',
'diesel': 'tanker', 'gasoline': 'tanker', 'fuel oil': 'tanker', 'naphtha': 'tanker',
'chemicals': 'tanker', 'palm oil': 'tanker', 'vegetable oil': 'tanker',
'lng': 'tanker', 'lpg': 'tanker', 'methanol': 'tanker',
# Container
'containers': 'container', 'container': 'container', 'teu': 'container',
'electronics': 'container', 'machinery': 'container', 'furniture': 'container',
'clothing': 'container', 'textiles': 'container', 'consumer goods': 'container',
'manufactured goods': 'container', 'general merchandise': 'container',
# Ro-Ro / vehicles
'cars': 'roro', 'vehicles': 'roro', 'trucks': 'roro', 'automobiles': 'roro',
'heavy equipment': 'roro', 'tractors': 'roro',
# General
'timber': 'general', 'lumber': 'general', 'wood': 'general', 'plywood': 'general',
'project cargo': 'general', 'breakbulk': 'general', 'pipes': 'general',
}
def classify_cargo(cargo_description: str) -> Optional[str]:
"""Classify cargo description to vessel type category."""
if not cargo_description:
return None
desc = cargo_description.lower().strip()
# Direct match
if desc in CARGO_TO_VESSEL:
return CARGO_TO_VESSEL[desc]
# Partial match
for keyword, vtype in CARGO_TO_VESSEL.items():
if keyword in desc:
return vtype
return None
# AIS ship type codes → category
VESSEL_TYPE_CATEGORIES = {
'bulk': ['bulk carrier', 'bulk', 'ore carrier'],
'tanker': ['tanker', 'oil tanker', 'chemical tanker', 'oil/chemical tanker', 'lng tanker', 'lpg tanker', 'crude oil tanker'],
'container': ['container ship', 'container', 'containership'],
'general': ['general cargo', 'cargo', 'multipurpose', 'general cargo ship'],
'passenger': ['passenger', 'cruise', 'passenger ship', 'cruise ship', 'ferry', 'ro-ro/passenger'],
'roro': ['ro-ro', 'roro', 'vehicles carrier', 'car carrier', 'ro-ro cargo'],
'offshore': ['offshore', 'supply vessel', 'platform', 'anchor handling', 'fpso', 'offshore supply ship'],
'tug': ['tug', 'tugboat', 'towing', 'pusher tug', 'pilot'],
'fishing': ['fishing', 'fishing vessel', 'trawler'],
'highspeed': ['high speed', 'high-speed', 'hsc', 'hydrofoil', 'wing in ground'],
'pleasure': ['pleasure', 'yacht', 'sailing yacht', 'motor yacht'],
'sailing': ['sailing vessel', 'sailing'],
'military': ['military', 'naval', 'warship', 'patrol'],
}
# AIS type codes (numeric) → category
AIS_TYPE_CODE_MAP = {
range(70, 80): 'cargo', # 70-79: Cargo, general cargo
range(80, 90): 'tanker', # 80-89: Tanker
range(60, 70): 'passenger', # 60-69: Passenger
range(40, 50): 'highspeed', # 40-49: High-speed craft
range(30, 30+1): 'fishing', # 30: Fishing
range(31, 33): 'tug', # 31-32: Towing
range(50, 55): 'offshore', # 50-54: Pilot/SAR/port tender
range(35, 36): 'military', # 35: Military
range(36, 37): 'sailing', # 36: Sailing
range(37, 38): 'pleasure', # 37: Pleasure craft
}
# AIS navigation status codes
NAV_STATUS_MAP = {
0: 'underway', # Under way using engine
1: 'at anchor', # At anchor
2: 'not under command',
3: 'restricted maneuverability',
4: 'constrained by draught',
5: 'moored', # Moored
6: 'aground',
7: 'fishing', # Engaged in fishing
8: 'underway sailing', # Under way sailing
14: 'ais-sart',
15: 'undefined',
}
# =============================================================================
# VESSEL SUBTYPES BY DWT (industry standard classifications)
# =============================================================================
VESSEL_SUBTYPES = {
'bulk': [
{'name': 'River-Sea', 'dwt_min': 1000, 'dwt_max': 9999, 'typical_dwt': 5000},
{'name': 'Handysize', 'dwt_min': 10000, 'dwt_max': 39999, 'typical_dwt': 28000},
{'name': 'Handymax', 'dwt_min': 40000, 'dwt_max': 49999, 'typical_dwt': 45000},
{'name': 'Supramax', 'dwt_min': 50000, 'dwt_max': 64999, 'typical_dwt': 58000},
{'name': 'Panamax', 'dwt_min': 65000, 'dwt_max': 99999, 'typical_dwt': 75000},
{'name': 'Capesize', 'dwt_min': 100000, 'dwt_max': 199999, 'typical_dwt': 180000},
{'name': 'VLOC', 'dwt_min': 200000, 'dwt_max': 400000, 'typical_dwt': 300000},
],
'tanker': [
{'name': 'River-Sea Tanker', 'dwt_min': 1000, 'dwt_max': 24999, 'typical_dwt': 7000},
{'name': 'MR (Medium Range)', 'dwt_min': 25000, 'dwt_max': 54999, 'typical_dwt': 45000},
{'name': 'LR1', 'dwt_min': 55000, 'dwt_max': 79999, 'typical_dwt': 73000},
{'name': 'Aframax', 'dwt_min': 80000, 'dwt_max': 119999, 'typical_dwt': 105000},
{'name': 'Suezmax', 'dwt_min': 120000, 'dwt_max': 199999, 'typical_dwt': 160000},
{'name': 'VLCC', 'dwt_min': 200000, 'dwt_max': 320000, 'typical_dwt': 300000},
],
'container': [
{'name': 'Feeder', 'dwt_min': 5000, 'dwt_max': 24999, 'teu_range': '500-2,500 TEU', 'typical_dwt': 15000},
{'name': 'Feedermax', 'dwt_min': 25000, 'dwt_max': 39999, 'teu_range': '2,500-5,000 TEU', 'typical_dwt': 33000},
{'name': 'Panamax', 'dwt_min': 40000, 'dwt_max': 64999, 'teu_range': '5,000-8,000 TEU', 'typical_dwt': 55000},
{'name': 'Post-Panamax', 'dwt_min': 65000, 'dwt_max': 99999, 'teu_range': '8,000-12,000 TEU', 'typical_dwt': 80000},
{'name': 'Neo-Panamax', 'dwt_min': 100000, 'dwt_max': 149999, 'teu_range': '12,000-15,000 TEU', 'typical_dwt': 120000},
{'name': 'ULCV', 'dwt_min': 150000, 'dwt_max': 250000, 'teu_range': '15,000-24,000 TEU', 'typical_dwt': 200000},
],
'general': [
{'name': 'Small General Cargo', 'dwt_min': 1000, 'dwt_max': 9999, 'typical_dwt': 5000},
{'name': 'General Cargo', 'dwt_min': 10000, 'dwt_max': 25000, 'typical_dwt': 15000},
],
}
def classify_vessel_subtype(vessel_type: str, dwt: float = None) -> Optional[Dict]:
"""
Classify vessel into subtype by DWT.
Returns {'subtype': 'Panamax', 'typical_dwt': 75000} or None.
"""
if not vessel_type:
return None
vtype = vessel_type.lower().strip()
# Map common synonyms
for category, keywords in VESSEL_TYPE_CATEGORIES.items():
for kw in keywords:
if kw in vtype:
vtype = category
break
else:
continue
break
subtypes = VESSEL_SUBTYPES.get(vtype)
if not subtypes:
return None
if dwt:
try:
dwt_val = float(dwt)
for st in subtypes:
if st['dwt_min'] <= dwt_val <= st['dwt_max']:
return {**st, 'category': vtype}
except (ValueError, TypeError):
pass
# If DWT out of range, return closest
return {**subtypes[len(subtypes) // 2], 'category': vtype, 'estimated': True}
# No DWT — return mid-range default
mid = subtypes[len(subtypes) // 2]
return {**mid, 'category': vtype, 'estimated': True}
def classify_vessel_type(type_str: str, type_code=None) -> str:
"""Classify vessel type string into a standard category."""
if type_str:
t = type_str.lower().strip()
for category, keywords in VESSEL_TYPE_CATEGORIES.items():
for kw in keywords:
if kw in t:
return category
if type_code is not None:
try:
code = int(type_code)
for code_range, category in AIS_TYPE_CODE_MAP.items():
if code in code_range:
return category
except (ValueError, TypeError):
pass
return 'other'
def get_destination_patterns(port: dict) -> List[str]:
"""Generate AIS destination search patterns for a port.
AIS destination is free-text entered by crew, e.g. "BOSTON", "US BOS", "USBOS".
We generate multiple patterns to maximize matching chances.
Args:
port: dict from resolve_port() with 'name', 'unlocode', 'country', etc.
Returns:
list of uppercase pattern strings for SQL LIKE queries
"""
patterns = set()
name = (port.get('name') or '').upper().strip()
unlocode = (port.get('unlocode') or '').upper().strip()
if name and len(name) >= 4:
patterns.add(name) # "BOSTON"
if unlocode and len(unlocode) >= 4:
patterns.add(unlocode) # "USBOS"
# Split LOCODE: country "US" + loc "BOS"
if len(unlocode) >= 5:
country = unlocode[:2]
loc_part = unlocode[2:]
patterns.add(f"{country} {loc_part}") # "US BOS"
return list(patterns)
def find_nearby_ports(lat: float, lon: float, radius_nm: float = 100) -> List[Dict]:
"""Find ports within radius_nm nautical miles from given coordinates."""
nearby = []
for key, port in WORLD_PORTS.items():
# Approximate distance in NM (1 degree lat ≈ 60 NM)
dlat = abs(port['lat'] - lat) * 60
dlon = abs(port['lon'] - lon) * 60 * max(math.cos(math.radians(lat)), 0.01)
dist_nm = math.sqrt(dlat**2 + dlon**2)
if dist_nm <= radius_nm:
nearby.append({**port, 'key': key, 'distance_nm': round(dist_nm, 1)})
nearby.sort(key=lambda p: p['distance_nm'])
return nearby
def resolve_port(query: str) -> Optional[Dict]:
"""Resolve port name to port info dict. Supports exact, alias, partial, and UNLOCODE match.
Always includes 'key' in the returned dict for reverse lookups."""
if not query:
return None
q = query.lower().strip()
q_under = q.replace(' ', '_').replace('-', '_').replace("'", "")
def _with_key(key):
port = WORLD_PORTS[key]
if 'key' not in port:
return {**port, 'key': key}
return port
# Exact match (spaces or underscores)
if q in WORLD_PORTS:
return _with_key(q)
if q_under in WORLD_PORTS:
return _with_key(q_under)
# Alias match
if q in PORT_ALIASES:
alias_key = PORT_ALIASES[q]
if alias_key in WORLD_PORTS:
return _with_key(alias_key)
# Partial match (query in port key or port key in query)
for key, port in WORLD_PORTS.items():
if q_under in key or key in q_under:
return _with_key(key)
# Match by port name (case-insensitive)
for key, port in WORLD_PORTS.items():
if q in port['name'].lower():
return _with_key(key)
# Match by UNLOCODE
q_upper = q.upper()
for key, port in WORLD_PORTS.items():
if port.get('unlocode') == q_upper:
return _with_key(key)
return None
def _generate_totp(secret: str, digits: int = 6, interval: int = 30) -> str:
"""Generate TOTP 6-digit code from base32 secret (RFC 6238). No external deps."""
import hmac
import hashlib
import struct
import base64
# Normalize base32 (uppercase, pad to multiple of 8)
secret_clean = secret.upper().replace(' ', '')
pad = (-len(secret_clean)) % 8
key = base64.b32decode(secret_clean + '=' * pad)
counter = int(time.time()) // interval
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
return str(code % (10 ** digits)).zfill(digits)
class MarineTrafficParser:
"""Parser for MarineTraffic public data + API"""
def __init__(self, api_key: str = None, totp_secret: str = None):
self.api_key = api_key or MT_API_KEY
self.totp_secret = totp_secret # Google Authenticator 2FA secret (base32)
# Use curl_cffi to impersonate Chrome TLS fingerprint → bypasses Cloudflare
if _HAS_CURL_CFFI:
self.session = cf_requests.Session(impersonate="chrome120")
logger.info("Using curl_cffi Chrome impersonation (Cloudflare bypass)")
else:
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
})
# =========================================================================
# PUBLIC DATA (no API key needed)
# =========================================================================
def search_vessel_public(self, query: str) -> List[Dict]:
"""
Search vessel using public search
Returns basic info: name, MMSI, IMO, type, flag
"""
url = f"{MT_BASE}/en/ais/index/search/all/keyword:{query}"
try:
resp = self.session.get(url, timeout=10)
if resp.status_code != 200:
return []
soup = BeautifulSoup(resp.text, 'html.parser')
results = []
# Parse search results
for item in soup.select('.search-result-item, .vessel-item'):
vessel = {}
# Name
name_el = item.select_one('.vessel-name, .ship-name, a[href*="/vessels/"]')
if name_el:
vessel['name'] = name_el.get_text(strip=True)
href = name_el.get('href', '')
# Extract MMSI from URL
if '/vessels/' in href:
parts = href.split('/')
for i, p in enumerate(parts):
if p == 'vessels' and i + 1 < len(parts):
vessel['mmsi'] = parts[-1].split('-')[0]
# Type
type_el = item.select_one('.ship-type, .vessel-type')
if type_el:
vessel['type'] = type_el.get_text(strip=True)
# Flag
flag_el = item.select_one('.flag, [class*="flag-"]')
if flag_el:
vessel['flag'] = flag_el.get('title', '') or flag_el.get_text(strip=True)
if vessel.get('name'):
results.append(vessel)
return results
except Exception as e:
logger.error(f"Search error: {e}")
return []
def get_vessel_page(self, mmsi: str) -> Dict:
"""
Get vessel details from public page
"""
url = f"{MT_BASE}/en/ais/details/ships/mmsi:{mmsi}"
try:
resp = self.session.get(url, timeout=10)
if resp.status_code != 200:
return {}
soup = BeautifulSoup(resp.text, 'html.parser')
vessel = {'mmsi': mmsi}
# Parse vessel details
# Name
name_el = soup.select_one('h1.title, .vessel-name')
if name_el:
vessel['name'] = name_el.get_text(strip=True)
# Details table
for row in soup.select('.vessel-details tr, .details-table tr'):
cells = row.select('td')
if len(cells) >= 2:
key = cells[0].get_text(strip=True).lower()
value = cells[1].get_text(strip=True)
if 'imo' in key:
vessel['imo'] = value
elif 'mmsi' in key:
vessel['mmsi'] = value
elif 'call sign' in key:
vessel['callsign'] = value
elif 'flag' in key:
vessel['flag'] = value
elif 'type' in key:
vessel['type'] = value
elif 'length' in key:
vessel['length'] = self._parse_number(value)
elif 'width' in key or 'beam' in key:
vessel['width'] = self._parse_number(value)
elif 'draught' in key or 'draft' in key:
vessel['draught'] = self._parse_number(value)
elif 'gross tonnage' in key:
vessel['gross_tonnage'] = self._parse_number(value)
elif 'deadweight' in key:
vessel['deadweight'] = self._parse_number(value)
elif 'year built' in key:
vessel['year_built'] = self._parse_number(value)
# Current position
pos_el = soup.select_one('.position-data, [data-lat], [data-lon]')
if pos_el:
vessel['latitude'] = pos_el.get('data-lat')
vessel['longitude'] = pos_el.get('data-lon')
# Last position from text
pos_text = soup.select_one('.last-position, .position-info')
if pos_text:
text = pos_text.get_text()
# Parse coordinates from text
lat_match = re.search(r'(\d+\.\d+)[°\s]*[NS]', text)
lon_match = re.search(r'(\d+\.\d+)[°\s]*[EW]', text)
if lat_match:
vessel['latitude'] = float(lat_match.group(1))
if lon_match:
vessel['longitude'] = float(lon_match.group(1))
return vessel
except Exception as e:
logger.error(f"Page parse error: {e}")
return {}
# =========================================================================
# MT PRO AUTHENTICATION & OWNERSHIP DATA
# =========================================================================
def login(self, email: str, password: str, totp_secret: str = None) -> bool:
"""Authenticate to MarineTraffic Pro. Returns True on success.
Handles email+password login + optional Google Authenticator 2FA (TOTP).
totp_secret: base32 secret from Google Authenticator (overrides self.totp_secret)."""
totp_key = totp_secret or self.totp_secret
_HDRS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
}
try:
# Step 1: GET login page to extract CSRF token
resp = self.session.get(f'{MT_BASE}/en/users/login', headers=_HDRS, timeout=15)
if resp.status_code != 200:
logger.error(f"MT login page error: {resp.status_code}")
return False
soup = BeautifulSoup(resp.text, 'html.parser')
csrf_el = soup.select_one('input[name="authenticity_token"]')
csrf_token = csrf_el['value'] if csrf_el else ''
if not csrf_token:
meta_csrf = soup.select_one('meta[name="csrf-token"]')
if meta_csrf:
csrf_token = meta_csrf.get('content', '')
# Step 2: POST login credentials
login_resp = self.session.post(
f'{MT_BASE}/en/users/login',
data={
'user[email]': email,
'user[password]': password,
'authenticity_token': csrf_token,
'commit': 'Log in',
},
headers={**_HDRS, 'Content-Type': 'application/x-www-form-urlencoded',
'Referer': f'{MT_BASE}/en/users/login', 'Origin': MT_BASE},
allow_redirects=True,
timeout=20
)
# Step 3: Check if 2FA is required
page_lower = login_resp.text.lower()
otp_indicators = ['two-factor', 'two_factor', 'otp', 'authenticator',
'verification code', 'authentication code', '2fa', 'totp']
needs_2fa = any(ind in page_lower for ind in otp_indicators)
if needs_2fa and totp_key:
logger.info("MT requires 2FA — generating TOTP code...")
otp_code = _generate_totp(totp_key)
logger.info(f"TOTP code: {otp_code}")
# Extract fresh CSRF for 2FA form
soup2 = BeautifulSoup(login_resp.text, 'html.parser')
csrf_el2 = soup2.select_one('input[name="authenticity_token"]')
csrf2 = csrf_el2['value'] if csrf_el2 else csrf_token
if not csrf2:
meta2 = soup2.select_one('meta[name="csrf-token"]')
if meta2:
csrf2 = meta2.get('content', csrf_token)
# Try common 2FA form field names
otp_fields = ['user[otp_attempt]', 'otp_attempt', 'code', 'token',
'user[two_factor_code]', 'two_factor_code']
# Detect actual field name from form
otp_input = soup2.select_one('input[type="text"][name*="otp"], input[type="number"][name*="code"], input[name*="otp"], input[name*="code"], input[name*="token"]')
if otp_input and otp_input.get('name'):
otp_fields = [otp_input['name']] + otp_fields
# POST 2FA code
two_fa_url = login_resp.url # might have redirected to /users/two_factor
for field_name in otp_fields[:2]:
otp_resp = self.session.post(
two_fa_url,
data={field_name: otp_code, 'authenticity_token': csrf2, 'commit': 'Verify'},
headers={**_HDRS, 'Content-Type': 'application/x-www-form-urlencoded',
'Referer': two_fa_url, 'Origin': MT_BASE},
allow_redirects=True,
timeout=20
)
page_lower = otp_resp.text.lower()
success_indicators = ['sign_out', 'logout', 'profile', 'my-fleet', 'subscription']
if any(ind in page_lower for ind in success_indicators):
logger.info(f"MT Pro login + 2FA successful for {email}")
return True
if 'invalid' in page_lower or 'incorrect' in page_lower:
logger.error(f"2FA code rejected (field={field_name}). Code was: {otp_code}")
break
logger.warning("2FA submission failed — checking cookies anyway")
elif needs_2fa and not totp_key:
logger.error("MT requires 2FA but no TOTP secret provided! Use --totp-secret")
return False
# Step 4: Verify login success
success_indicators = ['sign_out', 'logout', 'profile', 'my-fleet', 'subscription']
if any(ind in page_lower for ind in success_indicators):
logger.info(f"MT Pro login successful for {email}")
return True
# Check cookies for session marker
cookie_names = [c.name for c in self.session.cookies]
logger.debug(f"Session cookies after login: {cookie_names}")
if any(c in cookie_names for c in ['remember_user_token', '_mt_session', 'user_credentials']):
logger.info(f"MT Pro login successful (cookie check) for {email}")
return True
logger.warning(f"MT Pro login status uncertain for {email} — continuing anyway")
return True # Optimistic: let vessel fetches confirm
except Exception as e:
logger.error(f"MT Pro login error: {e}")
return False
def get_vessel_ownership(self, mmsi: str) -> dict:
"""Scrape full vessel page including Ownership section (requires MT Pro login).
Returns vessel specs + ownership dict with owner, operator, manager, etc."""
url = f'{MT_BASE}/en/ais/details/ships/mmsi:{mmsi}'
try:
resp = self.session.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Referer': MT_BASE,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
},
timeout=20
)
if resp.status_code == 404:
return {}
if resp.status_code != 200:
logger.warning(f"MT vessel page {mmsi}: HTTP {resp.status_code}")
return {}
return self._parse_vessel_full_page(resp.text, mmsi)
except Exception as e:
logger.error(f"get_vessel_ownership({mmsi}): {e}")
return {}
def _parse_vessel_full_page(self, html: str, mmsi: str) -> dict:
"""Parse full MT vessel detail page: specs (name/flag/DWT/year) + Ownership section."""
soup = BeautifulSoup(html, 'html.parser')
result = {'mmsi': mmsi}
# --- Vessel name ---
for sel in ['h1.title', '.vessel-name', 'h1', '.shipname']:
el = soup.select_one(sel)
if el:
name = el.get_text(strip=True)
if name and len(name) > 1:
result['name'] = name
break
# --- Vessel specs table ---
# MT Pro shows details in various table formats
for row in soup.select('table tr, .vessel-details tr, dl dt, .info-table tr'):
cells = row.select('td, dd, th')
if len(cells) < 2:
# Try dt/dd pairs
label_el = row if row.name == 'dt' else None
if label_el:
val_el = label_el.find_next_sibling('dd')
if val_el:
key = label_el.get_text(strip=True).lower()
val = val_el.get_text(strip=True)
self._apply_vessel_field(result, key, val)
continue
key = cells[0].get_text(strip=True).lower()
val = cells[1].get_text(strip=True)
self._apply_vessel_field(result, key, val)
# Also try JSON-LD structured data (MT sometimes embeds this)
for script in soup.select('script[type="application/ld+json"]'):
try:
data = json.loads(script.string or '{}')
if data.get('@type') == 'Product' or 'vessel' in str(data).lower():
if data.get('name') and not result.get('name'):
result['name'] = data['name']
except Exception:
pass
# --- Ownership section ---
companies = []
ownership_role_map = {
'beneficial owner': 'beneficial_owner',
'registered owner': 'registered_owner',
'commercial manager': 'commercial_manager',
'disponent owner': 'commercial_manager',
'ship manager': 'commercial_manager',
'operator': 'operator',
'charterer': 'operator',
'technical manager': 'operator',
}
# Try multiple selectors MT uses for ownership tables
ownership_rows = soup.select(
'[data-id="ownership"] tr, '
'.ownership-widget tr, '
'.widget-body table tr, '
'#ownership tr, '
'.ownership tr, '
'section.ownership table tr'
)
# Also try finding ownership by header text
if not ownership_rows:
for h in soup.find_all(['h2', 'h3', 'h4', 'div'], string=re.compile(r'[Oo]wnership')):
container = h.find_next('table')
if container:
ownership_rows = container.select('tr')
break
for row in ownership_rows:
cells = row.select('td')
if len(cells) < 2:
continue
role_text = cells[0].get_text(strip=True).lower()
company_el = cells[1]
company_name = company_el.get_text(strip=True)
if not company_name or company_name == '-' or company_name == 'N/A':
continue
# Extract company MT profile URL
company_link = company_el.select_one('a[href]')
company_href = company_link['href'] if company_link else None
if company_href and not company_href.startswith('http'):
company_href = MT_BASE + company_href
country = cells[2].get_text(strip=True) if len(cells) > 2 else ''
# Map role to field name
matched_field = None
for role_key, field in ownership_role_map.items():
if role_key in role_text:
matched_field = field
break
if not matched_field:
matched_field = 'operator' # fallback
# Set top-level fields (first occurrence wins)
if not result.get(matched_field):
result[matched_field] = company_name
if country:
result[f'{matched_field}_country'] = country
# Also build owner/operator aliases for DB compatibility
if matched_field in ('beneficial_owner', 'registered_owner') and not result.get('owner'):
result['owner'] = company_name
if country:
result['owner_country'] = country
elif matched_field == 'operator' and not result.get('operator'):
result['operator'] = company_name
if country:
result['operator_country'] = country
companies.append({
'role': role_text,
'name': company_name,
'country': country,
'mt_profile_url': company_href,
})
result['companies'] = companies # stored as companies_json
return result
def _apply_vessel_field(self, result: dict, key: str, val: str):
"""Apply a parsed key-value pair to vessel result dict."""
if not val or val in ('-', 'N/A', 'Unknown', ''):
return
if 'imo' in key and not result.get('imo'):
result['imo'] = val.strip()
elif 'mmsi' in key and key != 'mmsi' and not result.get('mmsi'):
result['mmsi'] = val.strip()
elif 'call sign' in key or 'callsign' in key:
result['callsign'] = val
elif 'flag' in key and not result.get('flag'):
result['flag'] = val
elif 'type' in key and 'ship' in key and not result.get('type'):
result['type'] = val
elif 'year' in key and 'built' in key:
result['year_built'] = self._parse_number(val)
elif 'deadweight' in key or key == 'dwt':
result['deadweight'] = self._parse_number(val)
elif 'gross' in key and 'tonnage' in key:
result['gross_tonnage'] = self._parse_number(val)
elif 'length' in key and 'overall' in key:
result['length'] = self._parse_number(val)
elif 'breadth' in key or ('width' in key and 'ext' in key):
result['width'] = self._parse_number(val)
def get_company_website(self, company_mt_url: str) -> str:
"""Try to extract company website URL from MT company profile page."""
if not company_mt_url or 'marinetraffic.com' not in company_mt_url:
return None
try:
resp = self.session.get(
company_mt_url,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'},
timeout=10
)
if resp.status_code != 200:
return None
soup = BeautifulSoup(resp.text, 'html.parser')
# MT company pages often show website in a link that goes to external domain
for a in soup.select('a[href^="http"]'):
href = a['href']
if 'marinetraffic.com' not in href and '.' in href and len(href) < 120:
# Filter obvious non-website links
if any(skip in href for skip in ['facebook.', 'twitter.', 'linkedin.', 'google.', 'youtube.']):
continue
return href.rstrip('/')
return None
except Exception as e:
logger.debug(f"get_company_website({company_mt_url}): {e}")
return None
# =========================================================================
# PORT / AREA VESSEL SCRAPING
# =========================================================================
def scrape_area_vessels(self, lat: float, lon: float, radius_nm: float = 15) -> List[Dict]:
"""
Scrape vessels in area via MarineTraffic map data JSON endpoint.
Uses tile-based internal API with session cookies.
"""
# Convert radius NM → degrees
lat_delta = radius_nm / 60.0
lon_delta = radius_nm / (60.0 * max(math.cos(math.radians(lat)), 0.01))
lat_min = lat - lat_delta
lat_max = lat + lat_delta
lon_min = lon - lon_delta
lon_max = lon + lon_delta
# Step 1: Establish session (get cookies)
try:
self.session.get(
f"{MT_BASE}/en/ais/home/centerx:{lon:.1f}/centery:{lat:.1f}/zoom:11",
timeout=10
)
except Exception:
pass
time.sleep(1) # Be polite
# Step 2: Calculate Web Mercator tile coordinates for zoom level 11
zoom = 11
def lat_lon_to_tile(lat_deg, lon_deg, z):
lat_rad = math.radians(lat_deg)
n = 2.0 ** z
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return xtile, ytile
x_tile, y_tile = lat_lon_to_tile(lat, lon, zoom)
# Step 3: Request map data
url = f"{MT_BASE}/getData/get_data_json_4/z:{zoom}/X:{x_tile}/Y:{y_tile}/station:0"
try:
resp = self.session.get(url, timeout=15, headers={
'Referer': f'{MT_BASE}/en/ais/home/centerx:{lon:.1f}/centery:{lat:.1f}/zoom:{zoom}',
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*'
})
if resp.status_code != 200:
logger.warning(f"Area scrape HTTP {resp.status_code}")
return []
data = resp.json()
vessels = []
# Parse response — format varies
rows = data
if isinstance(data, dict):
rows = data.get('data', data.get('rows', data.get('type1', [])))
if isinstance(rows, dict):
rows = rows.get('rows', [])
if not isinstance(rows, list):
return []
for item in rows:
if isinstance(item, dict):
v_lat = item.get('LAT', item.get('lat', 0))
v_lon = item.get('LON', item.get('lon', 0))
try:
v_lat = float(v_lat) if v_lat else 0
v_lon = float(v_lon) if v_lon else 0
except (ValueError, TypeError):
continue
# Filter to bounding box
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
continue
mmsi = str(item.get('MMSI', item.get('mmsi', '')))
if not mmsi or mmsi == '0':
continue
type_raw = item.get('SHIPTYPE', item.get('ship_type', item.get('type_name', '')))
type_code = item.get('TYPE_CODE', item.get('type_code'))
nav_status_code = item.get('NAVSTAT', item.get('nav_status', item.get('STATUS', None)))
nav_status = None
if nav_status_code is not None:
try:
nav_status = NAV_STATUS_MAP.get(int(nav_status_code))
except (ValueError, TypeError):
pass
dwt = item.get('DWT', item.get('dwt', item.get('DEADWEIGHT')))
vessels.append({
'mmsi': mmsi,
'name': item.get('SHIPNAME', item.get('shipname', item.get('name', ''))),
'type': str(type_raw) if type_raw else '',
'type_category': classify_vessel_type(str(type_raw) if type_raw else '', type_code),
'flag': item.get('FLAG', item.get('flag', '')),
'speed': item.get('SPEED', item.get('speed')),
'course': item.get('COURSE', item.get('course')),
'nav_status': nav_status,
'dwt': dwt,
'lat': v_lat,
'lon': v_lon,
'destination': item.get('DESTINATION', item.get('destination', '')),
})
elif isinstance(item, list) and len(item) >= 7:
# Some endpoints return arrays: [mmsi, lat, lon, speed, course, type, name, ...]
try:
v_lat = float(item[1]) / 10000 if item[1] > 1000 else float(item[1])
v_lon = float(item[2]) / 10000 if abs(item[2]) > 1000 else float(item[2])
except (ValueError, TypeError, IndexError):
continue
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
continue
type_val = str(item[5]) if len(item) > 5 else ''
vessels.append({
'mmsi': str(item[0]),
'lat': v_lat,
'lon': v_lon,
'speed': item[3] if len(item) > 3 else None,
'course': item[4] if len(item) > 4 else None,
'type': type_val,
'type_category': classify_vessel_type(type_val),
'nav_status': None,
'dwt': None,
'name': str(item[6]) if len(item) > 6 else '',
})
return vessels
except Exception as e:
logger.error(f"Area scrape error: {e}")
return []
def scrape_viewport_vessels(self, lat_min: float, lat_max: float,
lon_min: float, lon_max: float,
max_tiles: int = 6) -> List[Dict]:
"""
Scrape vessels covering a map viewport via multiple MarineTraffic tiles.
Adapts zoom level to viewport size and fetches multiple tiles.
Returns deduplicated list of vessel dicts with lat/lon.
"""
lat_span = lat_max - lat_min
lon_span = lon_max - lon_min
# Choose zoom based on viewport size (larger viewport → lower zoom → bigger tiles)
if lat_span > 20 or lon_span > 40:
zoom = 4
elif lat_span > 10 or lon_span > 20:
zoom = 5
elif lat_span > 5 or lon_span > 10:
zoom = 6
elif lat_span > 2 or lon_span > 4:
zoom = 8
else:
zoom = 10
def lat_lon_to_tile(lat_deg, lon_deg, z):
lat_rad = math.radians(lat_deg)
n = 2.0 ** z
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return xtile, ytile
# Calculate tiles covering the viewport
x_min, y_min = lat_lon_to_tile(lat_max, lon_min, zoom) # NW corner
x_max, y_max = lat_lon_to_tile(lat_min, lon_max, zoom) # SE corner
# Collect unique tiles to fetch (limit to max_tiles)
tiles = []
for x in range(x_min, x_max + 1):
for y in range(y_min, y_max + 1):
tiles.append((x, y))
if len(tiles) >= max_tiles:
break
if len(tiles) >= max_tiles:
break
if not tiles:
center_lat = (lat_min + lat_max) / 2
center_lon = (lon_min + lon_max) / 2
cx, cy = lat_lon_to_tile(center_lat, center_lon, zoom)
tiles = [(cx, cy)]
# Establish session
center_lat = (lat_min + lat_max) / 2
center_lon = (lon_min + lon_max) / 2
try:
self.session.get(
f"{MT_BASE}/en/ais/home/centerx:{center_lon:.1f}/centery:{center_lat:.1f}/zoom:{zoom}",
timeout=10
)
except Exception:
pass
# Fetch tiles
all_vessels = []
seen_mmsi = set()
for x_tile, y_tile in tiles:
try:
time.sleep(0.5) # Rate limit between tiles
url = f"{MT_BASE}/getData/get_data_json_4/z:{zoom}/X:{x_tile}/Y:{y_tile}/station:0"
resp = self.session.get(url, timeout=10, headers={
'Referer': f'{MT_BASE}/en/ais/home/centerx:{center_lon:.1f}/centery:{center_lat:.1f}/zoom:{zoom}',
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*'
})
if resp.status_code != 200:
continue
data = resp.json()
rows = data
if isinstance(data, dict):
rows = data.get('data', data.get('rows', data.get('type1', [])))
if isinstance(rows, dict):
rows = rows.get('rows', [])
if not isinstance(rows, list):
continue
for item in rows:
v = self._parse_tile_item(item, lat_min, lat_max, lon_min, lon_max)
if v and v.get('mmsi') and v['mmsi'] not in seen_mmsi:
seen_mmsi.add(v['mmsi'])
all_vessels.append(v)
except Exception as e:
logger.debug(f"Tile {x_tile},{y_tile} z{zoom} error: {e}")
continue
logger.info(f"MT viewport scrape: {len(tiles)} tiles z{zoom}, {len(all_vessels)} vessels")
return all_vessels
def _parse_tile_item(self, item, lat_min, lat_max, lon_min, lon_max) -> dict:
"""Parse a single vessel item from MT tile data."""
if isinstance(item, dict):
v_lat = item.get('LAT', item.get('lat', 0))
v_lon = item.get('LON', item.get('lon', 0))
try:
v_lat = float(v_lat) if v_lat else 0
v_lon = float(v_lon) if v_lon else 0
except (ValueError, TypeError):
return None
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
return None
mmsi = str(item.get('MMSI', item.get('mmsi', '')))
if not mmsi or mmsi == '0':
return None
type_raw = item.get('SHIPTYPE', item.get('ship_type', item.get('type_name', '')))
type_code = item.get('TYPE_CODE', item.get('type_code'))
nav_status_code = item.get('NAVSTAT', item.get('nav_status', item.get('STATUS', None)))
nav_status = None
if nav_status_code is not None:
try:
nav_status = NAV_STATUS_MAP.get(int(nav_status_code))
except (ValueError, TypeError):
pass
dwt = item.get('DWT', item.get('dwt', item.get('DEADWEIGHT')))
return {
'mmsi': mmsi,
'name': item.get('SHIPNAME', item.get('shipname', item.get('name', ''))),
'type': str(type_raw) if type_raw else '',
'type_category': classify_vessel_type(str(type_raw) if type_raw else '', type_code),
'flag': item.get('FLAG', item.get('flag', '')),
'speed': item.get('SPEED', item.get('speed')),
'course': item.get('COURSE', item.get('course')),
'nav_status': nav_status,
'dwt': dwt,
'lat': v_lat,
'lon': v_lon,
'destination': item.get('DESTINATION', item.get('destination', '')),
}
elif isinstance(item, list) and len(item) >= 7:
try:
v_lat = float(item[1]) / 10000 if item[1] > 1000 else float(item[1])
v_lon = float(item[2]) / 10000 if abs(item[2]) > 1000 else float(item[2])
except (ValueError, TypeError, IndexError):
return None
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
return None
type_val = str(item[5]) if len(item) > 5 else ''
return {
'mmsi': str(item[0]),
'lat': v_lat,
'lon': v_lon,
'speed': item[3] if len(item) > 3 else None,
'course': item[4] if len(item) > 4 else None,
'type': type_val,
'type_category': classify_vessel_type(type_val),
'name': str(item[6]) if len(item) > 6 else '',
}
return None
def get_port_vessels(self, port_name: str) -> List[Dict]:
"""
Get vessels currently in/near a port.
Tries: 1) map tile scraping, 2) paid API (if key), 3) empty.
"""
port = resolve_port(port_name)
if not port:
return []
lat, lon = port['lat'], port['lon']
radius = port.get('radius_nm', 15)
# Strategy 1: Scrape map tile data
vessels = self.scrape_area_vessels(lat, lon, radius)
# Strategy 2: Paid API fallback
if not vessels and self.has_api_key():
lat_d = radius / 60.0
lon_d = radius / (60.0 * max(math.cos(math.radians(lat)), 0.01))
try:
api_result = self.api_vessels_in_area(
lat - lat_d, lat + lat_d, lon - lon_d, lon + lon_d
)
if isinstance(api_result, list):
for item in api_result:
mmsi = str(item.get('MMSI', ''))
if mmsi and mmsi != '0':
vessels.append({
'mmsi': mmsi,
'name': item.get('SHIPNAME', ''),
'type': item.get('TYPE_NAME', item.get('SHIPTYPE', '')),
'flag': item.get('FLAG', ''),
'speed': item.get('SPEED'),
'course': item.get('COURSE'),
'lat': item.get('LAT'),
'lon': item.get('LON'),
'destination': item.get('DESTINATION', ''),
})
except Exception as e:
logger.error(f"API area fallback error: {e}")
# Deduplicate by MMSI
seen = set()
unique = []
for v in vessels:
if v.get('mmsi') and v['mmsi'] not in seen:
seen.add(v['mmsi'])
unique.append(v)
return unique
# =========================================================================
# API METHODS (requires API key)
# =========================================================================
def _api_call(self, service: str, params: dict) -> dict:
"""Make API call"""
if not self.api_key:
raise ValueError("API key required. Set MARINETRAFFIC_API_KEY")
url = f"{MT_API_BASE}/{service}/{self.api_key}"
params['protocol'] = 'jsono' # JSON output
try:
resp = self.session.get(url, params=params, timeout=30)
return resp.json()
except Exception as e:
logger.error(f"API error: {e}")
return {}
def api_vessel_info(self, mmsi: str = None, imo: str = None) -> dict:
"""
PS01 - Vessel Particulars
Get detailed vessel info via API
"""
params = {}
if mmsi:
params['mmsi'] = mmsi
elif imo:
params['imo'] = imo
else:
raise ValueError("MMSI or IMO required")
return self._api_call('vesselparticulars', params)
def api_vessel_position(self, mmsi: str = None, imo: str = None) -> dict:
"""
PS07 - Single Vessel Position
Get current vessel position via API
"""
params = {'timespan': 60} # Last 60 minutes
if mmsi:
params['mmsi'] = mmsi
elif imo:
params['imo'] = imo
return self._api_call('exportvessel', params)
def api_port_calls(self, mmsi: str = None, imo: str = None) -> dict:
"""
VD02 - Port Calls
Get vessel port call history
"""
params = {}
if mmsi:
params['mmsi'] = mmsi
elif imo:
params['imo'] = imo
return self._api_call('portcalls', params)
def api_vessels_in_area(self, lat_min: float, lat_max: float,
lon_min: float, lon_max: float) -> list:
"""
PS02 - Vessels in Area
Get all vessels in geographic area
"""
params = {
'MINLAT': lat_min,
'MAXLAT': lat_max,
'MINLON': lon_min,
'MAXLON': lon_max
}
return self._api_call('exportvessels', params)
# =========================================================================
# HELPERS
# =========================================================================
def _parse_number(self, text: str) -> Optional[float]:
"""Extract number from text"""
if not text:
return None
match = re.search(r'[\d,]+\.?\d*', text.replace(',', ''))
if match:
try:
return float(match.group())
except:
pass
return None
def has_api_key(self) -> bool:
"""Check if API key is configured"""
return bool(self.api_key)
# =============================================================================
# SEA ROUTE CALCULATION
# =============================================================================
# Maritime waypoints (choke points, canals, straits)
WAYPOINTS = {
'gibraltar': {'name': 'Strait of Gibraltar', 'lat': 36.13, 'lon': -5.35},
'suez_n': {'name': 'Suez Canal (Port Said)', 'lat': 31.26, 'lon': 32.30},
'suez_s': {'name': 'Suez Canal (Suez)', 'lat': 29.97, 'lon': 32.55},
'bab_el_mandeb': {'name': 'Bab el-Mandeb Strait', 'lat': 12.65, 'lon': 43.30},
'hormuz': {'name': 'Strait of Hormuz', 'lat': 26.55, 'lon': 56.25},
'malacca': {'name': 'Strait of Malacca', 'lat': 1.27, 'lon': 103.82},
'cape_good_hope': {'name': 'Cape of Good Hope', 'lat': -34.35, 'lon': 18.50},
'panama_atl': {'name': 'Panama Canal (Atlantic)', 'lat': 9.36, 'lon': -79.90},
'panama_pac': {'name': 'Panama Canal (Pacific)', 'lat': 8.95, 'lon': -79.55},
'cape_horn': {'name': 'Cape Horn', 'lat': -55.98, 'lon': -67.28},
'dover': {'name': 'Dover Strait', 'lat': 51.13, 'lon': 1.33},
'bosphorus': {'name': 'Bosphorus Strait', 'lat': 41.12, 'lon': 29.05},
'good_hope_e': {'name': 'Cape Agulhas (East)', 'lat': -34.83, 'lon': 20.02},
'volga_don': {'name': 'Volga-Don Canal (Astrakhan)', 'lat': 46.35, 'lon': 48.03},
}
# Average ECO speeds by vessel type (knots) — modern slow-steaming
VESSEL_SPEEDS = {
'bulk': 12.0, # Eco steaming (was 14.5 design)
'tanker': 12.5, # Eco steaming (was 15 design)
'container': 15.0, # Eco steaming (was 22-25 design)
'general': 12.0,
'passenger': 18.0,
'roro': 15.0,
'offshore': 10.0,
'default': 12.5,
}
# Daily fuel consumption (metric tons/day) — ranges by DWT
# Returns (low, high) MT/day for given vessel type and DWT
def estimate_fuel_consumption(vessel_type: str, dwt: float = None) -> tuple:
"""Estimate daily fuel consumption in MT/day as (low, high) range."""
vtype = vessel_type.lower() if vessel_type else 'default'
# DWT-based tables: (dwt_threshold, low_mt_day, high_mt_day)
FUEL_TABLE = {
'bulk': [
(40000, 18, 25), # Handysize
(65000, 25, 35), # Handymax/Supramax
(100000, 30, 42), # Panamax
(200000, 40, 55), # Capesize
(999999, 55, 70), # VLOC
],
'tanker': [
(55000, 22, 30), # MR
(80000, 28, 38), # LR1
(120000, 35, 48), # Aframax
(200000, 45, 60), # Suezmax
(999999, 60, 85), # VLCC
],
'container': [
(25000, 25, 40), # Feeder
(40000, 40, 60), # Feedermax
(65000, 55, 80), # Panamax
(100000, 75, 110), # Post-Panamax
(150000, 100, 150), # Neo-Panamax
(999999, 140, 200), # ULCV
],
'general': [(999999, 12, 22)],
'roro': [(999999, 35, 55)],
'passenger': [(999999, 80, 150)],
}
table = FUEL_TABLE.get(vtype, [(999999, 25, 40)])
if dwt:
try:
dwt_val = float(dwt)
for threshold, low, high in table:
if dwt_val <= threshold:
return (low, high)
except (ValueError, TypeError):
pass
# Default: mid-range entry
mid = table[len(table) // 2]
return (mid[1], mid[2])
# Canal transit costs — DWT-based (USD)
def estimate_suez_cost(dwt: float = None) -> tuple:
"""Estimate Suez Canal transit cost as (low, high) USD based on SCNT formula."""
if not dwt:
return (200000, 600000)
try:
d = float(dwt)
# Simplified SCNT-based tiers (Suez Canal Net Tonnage)
# Actual formula uses SCNT, but we approximate from DWT
if d <= 30000:
return (80000, 150000)
elif d <= 60000:
return (150000, 300000)
elif d <= 100000:
return (250000, 450000)
elif d <= 200000:
return (350000, 700000)
else:
return (500000, 1200000)
except (ValueError, TypeError):
return (200000, 600000)
def estimate_panama_cost(dwt: float = None) -> tuple:
"""Estimate Panama Canal transit cost as (low, high) USD based on PC/UMS tonnage."""
if not dwt:
return (150000, 500000)
try:
d = float(dwt)
if d <= 30000:
return (50000, 120000)
elif d <= 60000:
return (100000, 250000)
elif d <= 100000:
return (200000, 400000)
elif d <= 150000:
return (350000, 600000)
else:
# Neo-Panamax locks
return (500000, 900000)
except (ValueError, TypeError):
return (150000, 500000)
CANAL_TRANSIT_HOURS = {
'suez': 12,
'panama': 10,
}
# Bunker fuel price (VLSFO, USD/ton) — approximate 2024-2026 range
BUNKER_PRICE_USD = 600
# Port charges — DWT-based (USD per call)
def estimate_port_charges(dwt: float = None) -> tuple:
"""Estimate port charges per call as (low, high) USD."""
if not dwt:
return (15000, 50000)
try:
d = float(dwt)
if d <= 30000:
return (8000, 18000)
elif d <= 60000:
return (15000, 30000)
elif d <= 100000:
return (25000, 50000)
elif d <= 200000:
return (40000, 80000)
else:
return (60000, 150000)
except (ValueError, TypeError):
return (15000, 50000)
# Port region assignments for routing
PORT_REGIONS = {}
def _assign_regions():
"""Assign routing regions to ports based on coordinates."""
for key, port in WORLD_PORTS.items():
lat, lon = port['lat'], port['lon']
# Americas first (all lon < -30)
if lon < -100 and lat > 25:
region = 'USWC' # US West Coast
elif lon < -30 and lat > 25:
region = 'USEC' # US East Coast
elif lon < -30 and -5 < lat <= 25:
region = 'CARIB' # Caribbean / Central America
elif lon < -70 and lat <= -5:
region = 'SAW' # South America West
elif lon < -30 and lat <= -5:
region = 'SAE' # South America East
# Asia-Pacific
elif lon > 100 and lat < -10:
region = 'AUSNZ' # Australia / NZ
elif lon > 100:
region = 'EASIA' # East/SE Asia
elif 60 < lon <= 100 and lat > 0:
region = 'SASIA' # South Asia
elif 36 <= lat <= 47 and 46 <= lon <= 55:
region = 'CASP' # Caspian Sea (landlocked, before GULF)
elif 40 < lon <= 60 and lat > 10:
region = 'GULF' # Persian Gulf
# Africa
elif 10 < lon < 45 and lat < -20:
region = 'SAFR' # Southern Africa (must be before ERED)
elif 30 < lon <= 45 and -10 < lat < 20:
region = 'ERED' # East Africa / Red Sea
elif -5 < lon < 15 and -5 < lat < 15:
region = 'WAFR' # West Africa
# Europe
elif 25 < lon <= 40 and 40 < lat <= 48:
region = 'BSEA' # Black Sea (Istanbul, Novorossiysk, Constanta)
elif lon < 40 and lat > 48:
region = 'NEUR' # North Europe + Baltic (above 48N)
elif -10 <= lon <= 40 and 25 < lat <= 48:
region = 'MED' # Mediterranean + Southern Europe
else:
region = 'OTHER'
PORT_REGIONS[key] = region
_assign_regions()
# Route templates: from_region → to_region → waypoint sequence
ROUTE_WAYPOINTS = {
('NEUR', 'EASIA'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('NEUR', 'SASIA'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb'],
('NEUR', 'GULF'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('NEUR', 'MED'): ['dover', 'gibraltar'],
('NEUR', 'USEC'): [], # Direct Atlantic
('NEUR', 'SAFR'): ['dover', 'gibraltar', 'cape_good_hope'],
('NEUR', 'WAFR'): ['dover', 'gibraltar'],
('NEUR', 'ERED'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb'],
('NEUR', 'CARIB'): [], # Direct Atlantic
('NEUR', 'SAE'): [],
('NEUR', 'AUSNZ'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('NEUR', 'BSEA'): ['dover'],
('MED', 'EASIA'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('MED', 'SASIA'): ['suez_n', 'suez_s', 'bab_el_mandeb'],
('MED', 'GULF'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('MED', 'NEUR'): ['gibraltar', 'dover'],
('MED', 'USEC'): ['gibraltar'],
('MED', 'SAFR'): ['gibraltar', 'cape_good_hope'],
('MED', 'ERED'): ['suez_n', 'suez_s', 'bab_el_mandeb'],
('MED', 'AUSNZ'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('BSEA', 'MED'): ['bosphorus'],
('BSEA', 'EASIA'): ['bosphorus', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('BSEA', 'GULF'): ['bosphorus', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('GULF', 'EASIA'): ['hormuz', 'malacca'],
('GULF', 'SASIA'): ['hormuz'],
('GULF', 'NEUR'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
('GULF', 'MED'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n'],
('GULF', 'USEC'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar'],
('GULF', 'SAFR'): ['hormuz', 'bab_el_mandeb', 'cape_good_hope'],
('EASIA', 'SASIA'): ['malacca'],
('EASIA', 'GULF'): ['malacca', 'hormuz'],
('EASIA', 'NEUR'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
('EASIA', 'MED'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n'],
('EASIA', 'USEC'): ['panama_pac', 'panama_atl'],
('EASIA', 'USWC'): [], # Direct Pacific
('EASIA', 'SAFR'): ['malacca', 'cape_good_hope'],
('EASIA', 'AUSNZ'): ['malacca'],
('SASIA', 'EASIA'): ['malacca'],
('SASIA', 'NEUR'): ['bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
('SASIA', 'MED'): ['bab_el_mandeb', 'suez_s', 'suez_n'],
('SASIA', 'GULF'): ['hormuz'],
('USEC', 'EASIA'): ['panama_atl', 'panama_pac'],
('USEC', 'NEUR'): [], # Direct Atlantic
('USEC', 'MED'): ['gibraltar'],
('USEC', 'GULF'): ['gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('USEC', 'SAE'): [],
('USEC', 'CARIB'): [],
('USWC', 'EASIA'): [], # Direct Pacific
('USWC', 'NEUR'): ['panama_pac', 'panama_atl'],
('USWC', 'SAW'): [],
('USWC', 'AUSNZ'): [],
('SAFR', 'NEUR'): ['cape_good_hope', 'gibraltar', 'dover'],
('SAFR', 'EASIA'): ['cape_good_hope', 'malacca'],
('SAFR', 'SASIA'): ['cape_good_hope'],
('SAE', 'NEUR'): [],
('SAE', 'USEC'): [],
('SAE', 'EASIA'): ['cape_good_hope', 'malacca'],
('SAW', 'USEC'): ['panama_pac', 'panama_atl'],
('SAW', 'USWC'): [],
('CARIB', 'NEUR'): [],
('CARIB', 'USEC'): [],
('CARIB', 'EASIA'): ['panama_atl', 'panama_pac'],
('AUSNZ', 'EASIA'): ['malacca'],
('AUSNZ', 'NEUR'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
# Caspian Sea (landlocked — river-sea vessels only via Volga-Don Canal)
('CASP', 'CASP'): [], # Direct within Caspian
('CASP', 'BSEA'): ['volga_don'], # Via Volga-Don → Azov → Black Sea
('CASP', 'MED'): ['volga_don', 'bosphorus'], # Caspian → Black Sea → Med
('CASP', 'NEUR'): ['volga_don', 'bosphorus', 'gibraltar', 'dover'],
('CASP', 'GULF'): [], # INSTC: Caspian → Iran overland → AG
('CASP', 'SASIA'): [], # INSTC: Caspian → Iran → India
('BSEA', 'CASP'): ['volga_don'],
('MED', 'CASP'): ['bosphorus', 'volga_don'],
('NEUR', 'CASP'): ['dover', 'gibraltar', 'bosphorus', 'volga_don'],
('GULF', 'CASP'): [], # INSTC reverse
('SASIA', 'CASP'): [], # INSTC reverse
}
def _haversine_nm(lat1, lon1, lat2, lon2):
"""Calculate great circle distance in nautical miles."""
R = 3440.065 # Earth radius in NM
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(min(a, 1.0)))
return R * c
def calculate_sea_route(from_port: str, to_port: str, vessel_type: str = 'default',
dwt: float = None) -> Optional[Dict]:
"""
Calculate sea route between two ports.
Returns distance, time, waypoints, estimated cost RANGES based on DWT.
"""
port_a = resolve_port(from_port)
port_b = resolve_port(to_port)
if not port_a or not port_b:
return None
# Get port keys for region lookup (resolve_port includes 'key')
key_a = port_a.get('key')
key_b = port_b.get('key')
if not key_a or not key_b:
return None
region_a = PORT_REGIONS.get(key_a, 'OTHER')
region_b = PORT_REGIONS.get(key_b, 'OTHER')
# Find waypoint sequence
wp_keys = ROUTE_WAYPOINTS.get((region_a, region_b))
if wp_keys is None:
# Try reverse
wp_keys_rev = ROUTE_WAYPOINTS.get((region_b, region_a))
if wp_keys_rev is not None:
wp_keys = list(reversed(wp_keys_rev))
else:
wp_keys = [] # Direct route
# Build coordinate chain: port_a → waypoints → port_b
chain = [(port_a['lat'], port_a['lon'], port_a['name'])]
for wpk in wp_keys:
wp = WAYPOINTS[wpk]
chain.append((wp['lat'], wp['lon'], wp['name']))
chain.append((port_b['lat'], port_b['lon'], port_b['name']))
# Calculate total distance
total_nm = 0.0
legs = []
for i in range(len(chain) - 1):
lat1, lon1, name1 = chain[i]
lat2, lon2, name2 = chain[i + 1]
dist = _haversine_nm(lat1, lon1, lat2, lon2)
# Sea routes are ~15-20% longer than great circle due to coastlines
sea_dist = dist * 1.15
total_nm += sea_dist
legs.append({
'from': name1,
'to': name2,
'distance_nm': round(sea_dist),
})
# Canals used — DWT-based costs
canals_used = []
canal_cost_low = 0
canal_cost_high = 0
canal_hours = 0
if 'suez_n' in wp_keys or 'suez_s' in wp_keys:
s_low, s_high = estimate_suez_cost(dwt)
canals_used.append({'name': 'Suez Canal', 'cost_low': s_low, 'cost_high': s_high})
canal_cost_low += s_low
canal_cost_high += s_high
canal_hours += CANAL_TRANSIT_HOURS['suez']
if 'panama_atl' in wp_keys or 'panama_pac' in wp_keys:
p_low, p_high = estimate_panama_cost(dwt)
canals_used.append({'name': 'Panama Canal', 'cost_low': p_low, 'cost_high': p_high})
canal_cost_low += p_low
canal_cost_high += p_high
canal_hours += CANAL_TRANSIT_HOURS['panama']
# Speed and time
vtype = vessel_type.lower() if vessel_type else 'default'
speed = VESSEL_SPEEDS.get(vtype, VESSEL_SPEEDS['default'])
sailing_hours = total_nm / speed
total_hours = sailing_hours + canal_hours
total_days = total_hours / 24
# Cost estimation — ranges
fuel_low, fuel_high = estimate_fuel_consumption(vtype, dwt)
fuel_cost_low = (sailing_hours / 24) * fuel_low * BUNKER_PRICE_USD
fuel_cost_high = (sailing_hours / 24) * fuel_high * BUNKER_PRICE_USD
port_low, port_high = estimate_port_charges(dwt)
port_cost_low = port_low * 2 # Load + discharge
port_cost_high = port_high * 2
total_cost_low = fuel_cost_low + canal_cost_low + port_cost_low
total_cost_high = fuel_cost_high + canal_cost_high + port_cost_high
# Vessel subtype info
subtype = classify_vessel_subtype(vtype, dwt)
# Waypoint names for display
via_points = []
for wpk in wp_keys:
wp = WAYPOINTS[wpk]
# Skip pairs (suez_n + suez_s → "Suez Canal", panama_atl + panama_pac → "Panama Canal")
if wpk in ('suez_s', 'panama_pac'):
continue
via_points.append(wp['name'])
# Build coordinate chain for map polyline
route_coords = [[lat, lon] for lat, lon, _ in chain]
result = {
'from': port_a['name'],
'from_country': port_a['country'],
'from_lat': port_a['lat'],
'from_lon': port_a['lon'],
'to': port_b['name'],
'to_country': port_b['country'],
'to_lat': port_b['lat'],
'to_lon': port_b['lon'],
'route_coords': route_coords,
'distance_nm': round(total_nm),
'distance_km': round(total_nm * 1.852),
'vessel_type': vtype,
'speed_knots': speed,
'sailing_hours': round(sailing_hours),
'canal_transit_hours': canal_hours,
'total_hours': round(total_hours),
'total_days': round(total_days, 1),
'via': via_points,
'canals': [c['name'] for c in canals_used],
'legs': legs,
'cost_estimate': {
'fuel_usd': {'low': round(fuel_cost_low), 'high': round(fuel_cost_high)},
'canal_fees_usd': {'low': canal_cost_low, 'high': canal_cost_high},
'port_charges_usd': {'low': port_cost_low, 'high': port_cost_high},
'total_usd': {'low': round(total_cost_low), 'high': round(total_cost_high)},
'bunker_price_usd_ton': BUNKER_PRICE_USD,
'note': 'Estimated range based on vessel size. Actual costs depend on market rates, cargo, season, and operator.'
}
}
if dwt:
result['dwt'] = dwt
if subtype:
result['vessel_subtype'] = subtype.get('name')
return result
# =============================================================================
# FEATURE 1: FIXTURE MATCHING ENGINE
# =============================================================================
def fixture_match(cargo_type: str, tonnage: float, from_port: str,
to_port: str = None, vessels: list = None) -> Dict:
"""
Match cargo to suitable vessels with scoring.
Returns scored candidates + route info.
"""
# Determine required vessel type
vtype = classify_cargo(cargo_type)
if not vtype:
return {'error': f"Cannot classify cargo '{cargo_type}'", 'candidates': []}
load_port = resolve_port(from_port)
if not load_port:
return {'error': f"Port '{from_port}' not found", 'candidates': []}
# Subtype recommendation based on tonnage
subtype = classify_vessel_subtype(vtype, tonnage * 1.15 if tonnage else None) # 15% margin
# Score each vessel
scored = []
for v in (vessels or []):
score = 0
reasons = []
# Type match (40 pts)
v_cat = v.get('type_category', '')
if v_cat == vtype:
score += 40
reasons.append('type_match')
elif vtype in (v.get('type', '') or '').lower():
score += 25
reasons.append('partial_type_match')
else:
continue # Skip non-matching types entirely
# DWT fit (30 pts)
v_dwt = None
try:
v_dwt = float(v.get('dwt', 0) or 0)
except (ValueError, TypeError):
pass
if v_dwt and tonnage:
ratio = v_dwt / tonnage if tonnage > 0 else 0
if 1.05 <= ratio <= 1.5:
score += 30 # Perfect fit (5-50% margin)
reasons.append('dwt_perfect')
elif 1.0 <= ratio <= 2.0:
score += 20
reasons.append('dwt_acceptable')
elif ratio > 2.0:
score += 5
reasons.append('dwt_oversized')
else:
score += 0
reasons.append('dwt_too_small')
elif not v_dwt:
score += 10 # Unknown DWT — neutral
reasons.append('dwt_unknown')
# Availability — anchored/moored vessels are more likely available (20 pts)
nav = v.get('nav_status', '')
spd = 0
try:
spd = float(v.get('speed', 0) or 0)
except (ValueError, TypeError):
pass
if nav in ('at anchor', 'moored') or spd < 0.5:
score += 20
reasons.append('likely_available')
elif spd < 3:
score += 15
reasons.append('slow_moving')
else:
score += 5
reasons.append('underway')
# Proximity bonus (10 pts) — vessels closer to load port
v_lat = v.get('lat')
v_lon = v.get('lon')
if v_lat and v_lon:
try:
dist = _haversine_nm(float(v_lat), float(v_lon),
load_port['lat'], load_port['lon'])
if dist < 50:
score += 10
reasons.append('very_close')
elif dist < 200:
score += 7
reasons.append('nearby')
elif dist < 500:
score += 3
reasons.append('regional')
except (ValueError, TypeError):
pass
v_scored = {**v, '_score': score, '_match_reasons': reasons}
if v_dwt:
v_scored['_subtype'] = classify_vessel_subtype(vtype, v_dwt)
scored.append(v_scored)
# Sort by score descending
scored.sort(key=lambda x: -x['_score'])
# Route calculation if destination provided
route = None
if to_port:
route = calculate_sea_route(from_port, to_port, vtype, dwt=tonnage)
return {
'cargo_type': cargo_type,
'vessel_type_required': vtype,
'recommended_subtype': subtype.get('name') if subtype else None,
'recommended_dwt_range': f"{subtype['dwt_min']:,}{subtype['dwt_max']:,}" if subtype else None,
'loading_port': load_port['name'],
'tonnage': tonnage,
'candidates': scored[:15],
'total_matches': len(scored),
'route': route,
}
# =============================================================================
# FEATURE 2: FREIGHT RATE INTELLIGENCE
# =============================================================================
# Freight rates $/ton by route corridor + vessel type (Q1 2026 approximations)
# Based on Baltic Exchange indices: BDI (bulk), BDTI (tanker), SCFI (container)
FREIGHT_RATES = {
'bulk': {
# route_key: (low_$/ton, high_$/ton, benchmark)
('EASIA', 'NEUR'): (18, 28, 'C3 Cape Brazil-China equiv'),
('NEUR', 'EASIA'): (12, 20, 'Backhaul'),
('SAE', 'EASIA'): (15, 25, 'C5TC Capesize Brazil-China'),
('AUSNZ', 'EASIA'): (8, 14, 'C5 Australia-China'),
('GULF', 'EASIA'): (12, 20, 'AG-Far East'),
('USEC', 'NEUR'): (10, 18, 'USG-Continent grain'),
('USEC', 'EASIA'): (25, 40, 'USG-Far East'),
('SAE', 'NEUR'): (12, 20, 'ECSA-Continent'),
('BSEA', 'MED'): (8, 15, 'Black Sea-Med grain'),
('BSEA', 'EASIA'): (22, 35, 'Black Sea-Far East'),
('SAFR', 'EASIA'): (10, 16, 'S.Africa-China'),
('SASIA', 'EASIA'): (6, 12, 'India-China coastal'),
('MED', 'NEUR'): (6, 12, 'Short-sea Med-Continent'),
('NEUR', 'USEC'): (8, 14, 'Continent-USG backhaul'),
('CASP', 'CASP'): (8, 18, 'Caspian Sea internal'),
('CASP', 'BSEA'): (20, 40, 'Caspian-Black Sea via Volga-Don'),
('BSEA', 'CASP'): (18, 35, 'Black Sea-Caspian via Volga-Don'),
('CASP', 'GULF'): (12, 25, 'Caspian-Persian Gulf INSTC'),
('GULF', 'CASP'): (14, 28, 'Persian Gulf-Caspian INSTC'),
('CASP', 'SASIA'): (22, 45, 'Caspian-India INSTC corridor'),
},
'tanker': {
('GULF', 'EASIA'): (8, 15, 'TD3C VLCC AG-China'),
('GULF', 'NEUR'): (10, 18, 'AG-UKC'),
('GULF', 'USEC'): (12, 20, 'AG-USAC'),
('GULF', 'MED'): (9, 16, 'AG-Med'),
('GULF', 'SASIA'): (5, 10, 'AG-India'),
('WAFR', 'EASIA'): (12, 22, 'WAF-East'),
('WAFR', 'USEC'): (8, 14, 'WAF-USG'),
('USEC', 'NEUR'): (6, 12, 'USG-UKC'),
('NEUR', 'USEC'): (5, 10, 'UKC-USG backhaul'),
('MED', 'NEUR'): (4, 8, 'Med-Continent'),
('BSEA', 'MED'): (5, 10, 'CPC-Med Aframax'),
('EASIA', 'EASIA'): (3, 7, 'Intra-Asia'),
('CASP', 'CASP'): (6, 14, 'Caspian tanker coastal'),
('CASP', 'BSEA'): (18, 35, 'Caspian-Black Sea crude via VDC'),
('BSEA', 'CASP'): (16, 30, 'Black Sea-Caspian products via VDC'),
('CASP', 'GULF'): (10, 22, 'Caspian-AG crude shuttle'),
('CASP', 'MED'): (25, 50, 'Caspian-Med crude via BTC/pipeline'),
},
'container': {
# $/TEU rates (not per ton)
('EASIA', 'NEUR'): (1200, 2800, 'SCFI Shanghai-Europe'),
('EASIA', 'USEC'): (2500, 5000, 'SCFI Shanghai-USEC'),
('EASIA', 'USWC'): (1800, 3500, 'SCFI Shanghai-USWC'),
('EASIA', 'MED'): (1400, 3000, 'Shanghai-Med'),
('NEUR', 'EASIA'): (400, 900, 'Europe-Asia backhaul'),
('NEUR', 'USEC'): (800, 1500, 'Europe-USEC'),
('EASIA', 'SASIA'): (300, 700, 'Intra-Asia'),
('EASIA', 'AUSNZ'): (600, 1200, 'China-Australia'),
('EASIA', 'ERED'): (800, 1500, 'Asia-East Africa'),
('EASIA', 'SAE'): (1000, 2000, 'Asia-S.America'),
('MED', 'ERED'): (600, 1200, 'Med-East Africa'),
('USEC', 'SAE'): (500, 1000, 'US-S.America'),
},
}
# DWT multipliers for rate adjustment (smaller vessels = higher $/ton)
_RATE_DWT_MULTIPLIER = {
'bulk': [
(10000, 1.8), # River-Sea premium (Caspian/inland)
(40000, 1.4), # Handysize premium
(65000, 1.15), # Supramax
(100000, 1.0), # Panamax (base)
(200000, 0.85), # Capesize discount
(999999, 0.75), # VLOC
],
'tanker': [
(25000, 1.8), # River-Sea tanker premium (Caspian/inland)
(55000, 1.5), # MR premium
(80000, 1.2), # LR1
(120000, 1.0), # Aframax (base)
(200000, 0.8), # Suezmax
(999999, 0.65), # VLCC
],
}
def estimate_freight_rate(from_port: str, to_port: str, vessel_type: str = 'bulk',
dwt: float = None) -> Optional[Dict]:
"""
Estimate freight rate for a route + vessel type.
Returns $/ton range (bulk/tanker) or $/TEU range (container).
"""
port_a = resolve_port(from_port)
port_b = resolve_port(to_port)
if not port_a or not port_b:
return None
# Get regions (resolve_port includes 'key')
key_a = port_a.get('key')
key_b = port_b.get('key')
if not key_a or not key_b:
return None
region_a = PORT_REGIONS.get(key_a, 'OTHER')
region_b = PORT_REGIONS.get(key_b, 'OTHER')
vtype = vessel_type.lower() if vessel_type else 'bulk'
rates_table = FREIGHT_RATES.get(vtype, {})
# Direct match
rate_data = rates_table.get((region_a, region_b))
# Try reverse with discount (backhaul)
if not rate_data:
rev = rates_table.get((region_b, region_a))
if rev:
rate_data = (int(rev[0] * 0.6), int(rev[1] * 0.7), f"Backhaul from {rev[2]}")
# Fallback: estimate from distance
if not rate_data:
route = calculate_sea_route(from_port, to_port, vtype, dwt=dwt)
if route:
dist = route['distance_nm']
# ~$0.003-$0.006/ton/NM for bulk
base_rates = {'bulk': (0.003, 0.006), 'tanker': (0.0025, 0.005), 'container': (0.3, 0.7)}
r = base_rates.get(vtype, (0.003, 0.006))
rate_data = (round(dist * r[0]), round(dist * r[1]), 'Distance-based estimate')
else:
return None
low, high, benchmark = rate_data
# DWT adjustment
multiplier = 1.0
if dwt and vtype in _RATE_DWT_MULTIPLIER:
try:
d = float(dwt)
for threshold, mult in _RATE_DWT_MULTIPLIER[vtype]:
if d <= threshold:
multiplier = mult
break
except (ValueError, TypeError):
pass
low_adj = round(low * multiplier)
high_adj = round(high * multiplier)
unit = '$/TEU' if vtype == 'container' else '$/ton'
subtype = classify_vessel_subtype(vtype, dwt)
return {
'from': port_a['name'],
'to': port_b['name'],
'vessel_type': vtype,
'vessel_subtype': subtype.get('name') if subtype else None,
'rate_low': low_adj,
'rate_high': high_adj,
'unit': unit,
'benchmark': benchmark,
'dwt': dwt,
'dwt_multiplier': round(multiplier, 2),
'note': 'Indicative market rate range. Actual fixtures depend on market conditions, vessel age, port costs, and negotiation.',
}
# =============================================================================
# FEATURE 3: SANCTIONS & COMPLIANCE SCREENER
# (Extracted to maritime_compliance.py — re-exported for backward compatibility)
# =============================================================================
from maritime_compliance import (
SANCTIONED_FLAGS, SANCTIONED_ENTITIES, DARK_FLEET_INDICATORS,
screen_sanctions, _max_risk,
)
# =============================================================================
# FEATURE 4: PORT CONGESTION PREDICTOR
# =============================================================================
# Average port congestion data (wait days by port, updated periodically)
# Sources: public port authority reports, shipping indices
PORT_CONGESTION = {
# port_key: {'wait_days': (low, high), 'berth_utilization': %, 'trend': up/down/stable}
'shanghai': {'wait_days': (1, 3), 'berth_utilization': 85, 'trend': 'stable', 'peak_months': [3, 4, 9, 10, 11]},
'singapore': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [1, 2, 11, 12]},
'rotterdam': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'antwerp': {'wait_days': (0.5, 2), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'houston': {'wait_days': (1, 4), 'berth_utilization': 82, 'trend': 'up', 'peak_months': [1, 2, 6, 7, 8]},
'long_beach': {'wait_days': (0.5, 3), 'berth_utilization': 72, 'trend': 'down', 'peak_months': [8, 9, 10, 11]},
'los_angeles': {'wait_days': (0.5, 3), 'berth_utilization': 73, 'trend': 'down', 'peak_months': [8, 9, 10, 11]},
'busan': {'wait_days': (0.5, 1.5), 'berth_utilization': 70, 'trend': 'stable', 'peak_months': [3, 4, 10, 11]},
'ningbo': {'wait_days': (1, 3), 'berth_utilization': 88, 'trend': 'up', 'peak_months': [3, 4, 9, 10, 11]},
'hong_kong': {'wait_days': (0.5, 1), 'berth_utilization': 60, 'trend': 'down', 'peak_months': [1, 2, 10, 11]},
'jebel_ali': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [1, 2, 3, 11, 12]},
'santos': {'wait_days': (2, 6), 'berth_utilization': 90, 'trend': 'up', 'peak_months': [2, 3, 4, 5]},
'paranagua': {'wait_days': (3, 10), 'berth_utilization': 92, 'trend': 'up', 'peak_months': [2, 3, 4, 5]},
'port_hedland': {'wait_days': (1, 4), 'berth_utilization': 88, 'trend': 'stable', 'peak_months': [6, 7, 8, 9]},
'richards_bay': {'wait_days': (2, 7), 'berth_utilization': 85, 'trend': 'stable', 'peak_months': [6, 7, 8]},
'new_orleans': {'wait_days': (1, 4), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [9, 10, 11, 12]},
'novorossiysk': {'wait_days': (2, 8), 'berth_utilization': 85, 'trend': 'up', 'peak_months': [6, 7, 8, 9]},
'istanbul': {'wait_days': (1, 5), 'berth_utilization': 82, 'trend': 'stable', 'peak_months': [6, 7, 8, 9]},
'piraeus': {'wait_days': (0.5, 1.5), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [6, 7, 8]},
'laem_chabang': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [11, 12, 1]},
'mumbai': {'wait_days': (1, 4), 'berth_utilization': 85, 'trend': 'up', 'peak_months': [10, 11, 12, 1]},
'durban': {'wait_days': (2, 5), 'berth_utilization': 88, 'trend': 'up', 'peak_months': [1, 2, 3]},
'callao': {'wait_days': (1, 3), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [2, 3, 4]},
'vancouver': {'wait_days': (1, 5), 'berth_utilization': 82, 'trend': 'up', 'peak_months': [8, 9, 10, 11]},
'savannah': {'wait_days': (0.5, 2), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [8, 9, 10]},
'new_york': {'wait_days': (0.5, 2), 'berth_utilization': 76, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'hamburg': {'wait_days': (0.5, 1.5), 'berth_utilization': 70, 'trend': 'down', 'peak_months': [9, 10, 11]},
'felixstowe': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'tanger_med': {'wait_days': (0.5, 1), 'berth_utilization': 72, 'trend': 'stable', 'peak_months': [6, 7, 8]},
'port_said': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [1, 2, 3]},
}
def estimate_port_congestion(port_name: str, month: int = None) -> Optional[Dict]:
"""
Estimate port congestion level and expected wait time.
Returns congestion assessment with wait days, level, and trend.
"""
port = resolve_port(port_name)
if not port:
return None
# Get port key (resolve_port includes 'key')
port_key = port.get('key')
if not port_key:
return None
congestion = PORT_CONGESTION.get(port_key)
if not congestion:
# Default estimate based on region
region = PORT_REGIONS.get(port_key, 'OTHER')
defaults = {
'EASIA': {'wait_days': (1, 3), 'berth_utilization': 80, 'trend': 'stable'},
'NEUR': {'wait_days': (0.5, 2), 'berth_utilization': 72, 'trend': 'stable'},
'MED': {'wait_days': (0.5, 2), 'berth_utilization': 75, 'trend': 'stable'},
'GULF': {'wait_days': (0.5, 2), 'berth_utilization': 75, 'trend': 'stable'},
'USEC': {'wait_days': (0.5, 3), 'berth_utilization': 76, 'trend': 'stable'},
'USWC': {'wait_days': (0.5, 2), 'berth_utilization': 70, 'trend': 'stable'},
}
congestion = defaults.get(region, {'wait_days': (1, 3), 'berth_utilization': 75, 'trend': 'stable'})
wait_low, wait_high = congestion['wait_days']
utilization = congestion['berth_utilization']
trend = congestion['trend']
# Seasonal adjustment
current_month = month or datetime.now().month
peak_months = congestion.get('peak_months', [])
is_peak = current_month in peak_months
if is_peak:
wait_low = round(wait_low * 1.3, 1)
wait_high = round(wait_high * 1.5, 1)
utilization = min(98, utilization + 8)
# Congestion level
if utilization >= 90 or wait_high >= 6:
level = 'severe'
elif utilization >= 80 or wait_high >= 3:
level = 'moderate'
elif utilization >= 70:
level = 'normal'
else:
level = 'low'
return {
'port': port['name'],
'country': port['country'],
'congestion_level': level,
'wait_days': {'low': wait_low, 'high': wait_high},
'berth_utilization_pct': utilization,
'trend': trend,
'is_peak_season': is_peak,
'peak_months': peak_months,
'note': 'Estimate based on historical patterns and seasonal trends. Real-time congestion may vary.',
}
# =============================================================================
# FEATURE 5: BUNKER PRICE OPTIMIZER
# =============================================================================
# VLSFO bunker prices by major bunkering port (USD/ton, approximate Q1 2026)
BUNKER_PRICES = {
# port_key: {'vlsfo': $/ton, 'hsfo': $/ton, 'mgo': $/ton, 'supply': 'good'|'limited'}
'singapore': {'vlsfo': 580, 'hsfo': 440, 'mgo': 750, 'supply': 'excellent'},
'fujairah': {'vlsfo': 570, 'hsfo': 430, 'mgo': 740, 'supply': 'excellent'},
'rotterdam': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'excellent'},
'houston': {'vlsfo': 575, 'hsfo': 430, 'mgo': 730, 'supply': 'excellent'},
'busan': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'hong_kong': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'good'},
'shanghai': {'vlsfo': 605, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'antwerp': {'vlsfo': 595, 'hsfo': 455, 'mgo': 775, 'supply': 'good'},
'hamburg': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'piraeus': {'vlsfo': 585, 'hsfo': 445, 'mgo': 760, 'supply': 'good'},
'istanbul': {'vlsfo': 595, 'hsfo': 450, 'mgo': 770, 'supply': 'good'},
'algeciras': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'good'},
'tanger_med': {'vlsfo': 580, 'hsfo': 440, 'mgo': 750, 'supply': 'good'},
'port_said': {'vlsfo': 600, 'hsfo': 460, 'mgo': 790, 'supply': 'limited'},
'durban': {'vlsfo': 620, 'hsfo': 480, 'mgo': 810, 'supply': 'limited'},
'cape_town': {'vlsfo': 615, 'hsfo': 475, 'mgo': 800, 'supply': 'limited'},
'santos': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'limited'},
'colon': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'good'},
'new_york': {'vlsfo': 595, 'hsfo': 450, 'mgo': 775, 'supply': 'good'},
'los_angeles': {'vlsfo': 600, 'hsfo': 460, 'mgo': 785, 'supply': 'good'},
'long_beach': {'vlsfo': 600, 'hsfo': 460, 'mgo': 785, 'supply': 'good'},
'vancouver': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'good'},
'jebel_ali': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'excellent'},
'mumbai': {'vlsfo': 605, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'colombo': {'vlsfo': 595, 'hsfo': 455, 'mgo': 775, 'supply': 'good'},
'laem_chabang': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'limited'},
'jeddah': {'vlsfo': 580, 'hsfo': 440, 'mgo': 755, 'supply': 'good'},
'salalah': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'limited'},
'marsaxlokk': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'limited'},
'kaohsiung': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
}
def get_bunker_prices(port_name: str) -> Optional[Dict]:
"""Get bunker fuel prices for a port + nearby alternatives."""
port = resolve_port(port_name)
if not port:
return None
# Get port key (resolve_port includes 'key')
port_key = port.get('key')
prices = BUNKER_PRICES.get(port_key) if port_key else None
# Find nearby bunkering alternatives
alternatives = []
nearby = find_nearby_ports(port['lat'], port['lon'], radius_nm=500)
for np in nearby:
np_key = np.get('key')
if np_key and np_key != port_key and np_key in BUNKER_PRICES:
bp = BUNKER_PRICES[np_key]
alternatives.append({
'port': np['name'],
'distance_nm': np['distance_nm'],
'vlsfo': bp['vlsfo'],
'hsfo': bp['hsfo'],
'mgo': bp['mgo'],
'supply': bp['supply'],
})
alternatives.sort(key=lambda x: x['vlsfo']) # Sort by cheapest VLSFO
result = {
'port': port['name'],
'country': port['country'],
}
if prices:
result.update({
'vlsfo_usd_ton': prices['vlsfo'],
'hsfo_usd_ton': prices['hsfo'],
'mgo_usd_ton': prices['mgo'],
'supply_level': prices['supply'],
})
else:
result['note'] = 'No direct bunker price data for this port.'
if alternatives:
result['alternatives'] = alternatives[:5]
# Savings calculation
if prices and alternatives:
cheapest = alternatives[0]
if cheapest['vlsfo'] < prices['vlsfo']:
saving = prices['vlsfo'] - cheapest['vlsfo']
result['cheapest_alternative'] = {
'port': cheapest['port'],
'vlsfo': cheapest['vlsfo'],
'saving_per_ton': saving,
'distance_nm': cheapest['distance_nm'],
'note': f'Save ${saving}/ton VLSFO by bunkering at {cheapest["port"]} ({cheapest["distance_nm"]} NM away)',
}
return result
def optimize_bunker_route(from_port: str, to_port: str, vessel_type: str = 'bulk',
dwt: float = None) -> Optional[Dict]:
"""
Find optimal bunkering ports along a route.
Returns route with bunker price comparison at key stops.
"""
route = calculate_sea_route(from_port, to_port, vessel_type, dwt=dwt)
if not route:
return None
# Collect bunkering options along the route
bunker_options = []
# Check departure port
dep_bunker = get_bunker_prices(from_port)
if dep_bunker and dep_bunker.get('vlsfo_usd_ton'):
bunker_options.append({
'port': dep_bunker['port'],
'position': 'departure',
'vlsfo': dep_bunker['vlsfo_usd_ton'],
'supply': dep_bunker.get('supply_level', 'unknown'),
})
# Check waypoint ports (canals often have nearby bunkering)
waypoint_bunker_ports = {
'gibraltar': 'algeciras',
'suez_n': 'port said',
'bab_el_mandeb': 'jeddah',
'hormuz': 'fujairah',
'malacca': 'singapore',
'panama_atl': 'colon',
'cape_good_hope': 'cape town',
'dover': 'rotterdam',
}
for wp in route.get('via', []):
wp_lower = wp.lower()
for waypoint_key, bunker_port_key in waypoint_bunker_ports.items():
if waypoint_key.replace('_', ' ') in wp_lower or wp_lower in waypoint_key:
bp = BUNKER_PRICES.get(bunker_port_key)
if bp:
port_info = WORLD_PORTS.get(bunker_port_key)
bunker_options.append({
'port': port_info['name'] if port_info else bunker_port_key,
'position': 'en_route',
'vlsfo': bp['vlsfo'],
'supply': bp['supply'],
})
break
# Check arrival port
arr_bunker = get_bunker_prices(to_port)
if arr_bunker and arr_bunker.get('vlsfo_usd_ton'):
bunker_options.append({
'port': arr_bunker['port'],
'position': 'arrival',
'vlsfo': arr_bunker['vlsfo_usd_ton'],
'supply': arr_bunker.get('supply_level', 'unknown'),
})
# Find cheapest
if bunker_options:
cheapest = min(bunker_options, key=lambda x: x['vlsfo'])
cheapest['recommended'] = True
# Estimate savings
fuel_low, fuel_high = estimate_fuel_consumption(vessel_type, dwt)
avg_fuel = (fuel_low + fuel_high) / 2
sailing_days = route['total_days']
total_fuel_tons = avg_fuel * sailing_days
return {
'route': {'from': route['from'], 'to': route['to'], 'distance_nm': route['distance_nm'],
'total_days': route['total_days'], 'canals': route['canals']},
'estimated_fuel_tons': round(total_fuel_tons),
'bunker_options': bunker_options,
'price_spread': {
'cheapest_vlsfo': min(b['vlsfo'] for b in bunker_options) if bunker_options else None,
'most_expensive_vlsfo': max(b['vlsfo'] for b in bunker_options) if bunker_options else None,
'potential_saving_usd': round((max(b['vlsfo'] for b in bunker_options) - min(b['vlsfo'] for b in bunker_options)) * total_fuel_tons) if bunker_options else 0,
},
'note': 'VLSFO prices are approximate and fluctuate daily. Contact bunker suppliers for live quotes.',
}
# =============================================================================
# FEATURE 6: CHARTER PARTY GENERATOR
# =============================================================================
def generate_charter_party(vessel_name: str, cargo_type: str, tonnage: float,
from_port: str, to_port: str, rate: float = None,
laydays: str = None, demurrage_rate: float = None,
charterer: str = None, owner: str = None) -> Optional[Dict]:
"""Generate a structured charter party agreement."""
port_load = resolve_port(from_port)
port_discharge = resolve_port(to_port)
if not port_load or not port_discharge:
return None
route = calculate_sea_route(from_port, to_port, cargo_type)
distance = route['distance_nm'] if route else 0
voyage_days = route['total_days'] if route else 0
# Estimate rate if not provided
cargo_lower = cargo_type.lower() if cargo_type else 'bulk'
if not rate:
freight = estimate_freight_rate(from_port, to_port, cargo_lower)
rate = freight['rate_high'] if freight else 15.0
# Default laytime calculation (based on tonnage, ~5000t/day loading rate)
load_rate = 5000 if tonnage <= 50000 else 8000 if tonnage <= 100000 else 12000
laytime_days = round(tonnage / load_rate, 1) if tonnage else 5
if not demurrage_rate:
demurrage_rate = round(tonnage * 0.15 if tonnage else 15000, 0)
demurrage_rate = min(max(demurrage_rate, 8000), 80000)
from datetime import datetime, timedelta
today = datetime.now()
laycan_start = today + timedelta(days=14)
laycan_end = laycan_start + timedelta(days=5)
cp_number = f"CP-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name or 'TBN').encode()).hexdigest()[:8], 16) % 10000:04d}"
return {
'cp_number': cp_number,
'date': today.strftime('%Y-%m-%d'),
'type': 'Voyage Charter Party',
'parties': {
'owner': owner or '[Owner name to be inserted]',
'charterer': charterer or '[Charterer name to be inserted]',
},
'vessel': {
'name': vessel_name or 'TBN (To Be Nominated)',
'type': cargo_lower.title(),
'dwt': tonnage,
},
'cargo': {
'type': cargo_type,
'quantity_mt': tonnage,
'tolerance': '5% MOLOO (More or Less at Owner\'s Option)',
'stowage_factor': 'As per standard',
},
'ports': {
'loading': {
'name': port_load['name'],
'country': port_load['country'],
'unlocode': port_load.get('unlocode', ''),
'terms': 'FIOS (Free In/Out Stowed)',
},
'discharge': {
'name': port_discharge['name'],
'country': port_discharge['country'],
'unlocode': port_discharge.get('unlocode', ''),
'terms': 'FIOS (Free In/Out Stowed)',
},
},
'commercial': {
'freight_rate': f"${rate:.2f}/MT",
'total_freight': f"${rate * tonnage:,.0f}",
'payment': 'Within 5 banking days of completion of loading, less address commission 3.75%',
'laycan': laydays or f"{laycan_start.strftime('%d %b')} - {laycan_end.strftime('%d %b %Y')}",
},
'laytime': {
'loading_days': laytime_days,
'discharge_days': laytime_days,
'total_days': laytime_days * 2,
'loading_rate': f"{load_rate:,} MT/day SHINC",
'commencement': 'NOR tendered, whether in berth or not (WIBON), whether in port or not (WIPON)',
},
'demurrage': {
'rate_per_day': f"${demurrage_rate:,.0f}/day",
'despatch': f"${demurrage_rate / 2:,.0f}/day (half demurrage rate)",
'payment': 'Within 30 days of completion of discharge',
},
'voyage': {
'distance_nm': distance,
'estimated_days': voyage_days,
'canals': route.get('canals', []) if route else [],
},
'clauses': [
'General Average to be settled according to York-Antwerp Rules 2016',
'Both-to-Blame Collision Clause as per Conwartime 2013',
'War Risk Clause — Voywar 2013',
'Ice Clause — if applicable',
'BIMCO Sanctions Clause 2020',
'BIMCO Infectious or Contagious Diseases Clause 2022',
'ISM/ISPS Clause as per BIMCO',
'US Clause Paramount — if US port involved',
'Hague-Visby Rules to apply',
],
'arbitration': 'London, English Law — LMAA Terms',
'note': 'DRAFT — This is an AI-generated charter party template. Must be reviewed by qualified maritime lawyers before execution.',
}
# =============================================================================
# FEATURE 7: VESSEL PERFORMANCE ANALYTICS
# =============================================================================
# Typical fuel consumption by vessel subtype (tons/day at design speed)
VESSEL_FUEL_BENCHMARKS = {
'capesize': {'speed_kn': 14.5, 'fuel_td': 55, 'dwt_range': (100000, 200000)},
'panamax_bulk': {'speed_kn': 14.0, 'fuel_td': 35, 'dwt_range': (65000, 99999)},
'supramax': {'speed_kn': 13.5, 'fuel_td': 28, 'dwt_range': (50000, 64999)},
'handysize': {'speed_kn': 13.0, 'fuel_td': 22, 'dwt_range': (15000, 49999)},
'vlcc': {'speed_kn': 15.0, 'fuel_td': 80, 'dwt_range': (200000, 350000)},
'suezmax': {'speed_kn': 14.5, 'fuel_td': 55, 'dwt_range': (120000, 199999)},
'aframax': {'speed_kn': 14.5, 'fuel_td': 45, 'dwt_range': (80000, 119999)},
'mr_tanker': {'speed_kn': 14.0, 'fuel_td': 30, 'dwt_range': (45000, 79999)},
'ulcv': {'speed_kn': 22.0, 'fuel_td': 250, 'dwt_range': (150000, 250000)},
'neo_panamax': {'speed_kn': 21.0, 'fuel_td': 180, 'dwt_range': (100000, 149999)},
'panamax_container': {'speed_kn': 20.0, 'fuel_td': 130, 'dwt_range': (50000, 99999)},
'feeder': {'speed_kn': 18.0, 'fuel_td': 50, 'dwt_range': (5000, 49999)},
}
def analyze_vessel_performance(vessel_name: str = None, imo: str = None,
dwt: float = None, speed: float = None,
vessel_type: str = None, year_built: int = None,
fuel_consumption: float = None) -> Optional[Dict]:
"""Analyze vessel performance: fuel efficiency, speed, hull condition."""
if not vessel_type and not dwt:
return {'error': 'Vessel type or DWT required for performance analysis'}
vtype = (vessel_type or 'bulk').lower()
# Find benchmark
benchmark = None
if dwt:
for name, data in VESSEL_FUEL_BENCHMARKS.items():
low, high = data['dwt_range']
if low <= dwt <= high and (vtype in name or name in vtype or vtype == 'bulk' or vtype == 'tanker'):
benchmark = {**data, 'subtype': name}
break
if not benchmark:
# Default by type
defaults = {'bulk': 'handysize', 'tanker': 'mr_tanker', 'container': 'feeder'}
key = defaults.get(vtype, 'handysize')
benchmark = {**VESSEL_FUEL_BENCHMARKS[key], 'subtype': key}
design_speed = benchmark['speed_kn']
design_fuel = benchmark['fuel_td']
actual_speed = speed or design_speed
actual_fuel = fuel_consumption or design_fuel
# Vessel age impact
current_year = datetime.now().year
age = (current_year - year_built) if year_built else 10
age_fuel_penalty = min(age * 0.8, 20) # ~0.8% per year, max 20%
expected_fuel = round(design_fuel * (1 + age_fuel_penalty / 100), 1)
# Speed vs fuel (cubic relationship)
speed_ratio = actual_speed / design_speed if design_speed else 1
theoretical_fuel = round(design_fuel * (speed_ratio ** 3), 1)
# Efficiency score (100 = perfect)
if actual_fuel > 0 and theoretical_fuel > 0:
efficiency = round(min(100, (theoretical_fuel / actual_fuel) * 100), 1)
else:
efficiency = round(100 - age_fuel_penalty, 1)
# Hull condition estimate
if efficiency >= 95:
hull_condition = 'Excellent — recently dry-docked'
elif efficiency >= 85:
hull_condition = 'Good — within normal parameters'
elif efficiency >= 75:
hull_condition = 'Fair — hull cleaning recommended within 3-6 months'
elif efficiency >= 65:
hull_condition = 'Below average — significant fouling likely, dry-dock recommended'
else:
hull_condition = 'Poor — urgent hull treatment needed, significant fuel waste'
# CII rating estimate (Carbon Intensity Indicator)
if efficiency >= 90:
cii_rating = 'A'
elif efficiency >= 80:
cii_rating = 'B'
elif efficiency >= 70:
cii_rating = 'C'
elif efficiency >= 60:
cii_rating = 'D'
else:
cii_rating = 'E'
recommendations = []
if actual_speed > design_speed * 0.95:
recommendations.append(f'Slow steaming to {design_speed * 0.85:.1f} kn could save ~{round(design_fuel * 0.25)}t/day fuel')
if efficiency < 85:
recommendations.append('Hull cleaning/dry-docking would improve fuel efficiency by 10-15%')
if age > 15:
recommendations.append('Consider engine overhaul or retrofit for improved performance')
if cii_rating in ('D', 'E'):
recommendations.append(f'CII rating {cii_rating} — corrective plan required under IMO regulations')
if not recommendations:
recommendations.append('Vessel performing within expected parameters')
return {
'vessel': vessel_name or 'Unknown',
'imo': imo or '',
'vessel_type': vtype,
'subtype': benchmark['subtype'].replace('_', ' ').title(),
'dwt': dwt,
'age_years': age,
'year_built': year_built,
'performance': {
'design_speed_kn': design_speed,
'actual_speed_kn': actual_speed,
'speed_utilization_pct': round(speed_ratio * 100, 1),
'design_fuel_td': design_fuel,
'expected_fuel_td': expected_fuel,
'actual_fuel_td': actual_fuel,
'efficiency_score': efficiency,
},
'hull_condition': hull_condition,
'cii_rating': cii_rating,
'cii_note': 'IMO Carbon Intensity Indicator (A=best, E=worst). D/E requires corrective action plan.',
'fleet_comparison': f"{'Above' if efficiency >= 80 else 'Below'} average for {benchmark['subtype'].replace('_',' ')} fleet",
'annual_fuel_cost_estimate': {
'at_current_speed': f"${round(actual_fuel * 365 * BUNKER_PRICE_USD):,}",
'at_eco_speed': f"${round(design_fuel * 0.7 * 365 * BUNKER_PRICE_USD):,}",
'potential_annual_saving': f"${round((actual_fuel - design_fuel * 0.7) * 365 * BUNKER_PRICE_USD):,}",
},
'recommendations': recommendations,
'note': 'Performance estimates based on vessel class benchmarks and reported parameters. Actual performance depends on sea/weather conditions, cargo, trim, and maintenance.',
}
# =============================================================================
# FEATURE 8: DIGITAL BILL OF LADING
# =============================================================================
def generate_bill_of_lading(shipper: str, consignee: str, vessel_name: str,
from_port: str, to_port: str, cargo_description: str,
weight_mt: float = None, packages: int = None,
notify_party: str = None, voyage_no: str = None,
marks: str = None, freight_terms: str = 'PREPAID') -> Optional[Dict]:
"""Generate a structured digital Bill of Lading."""
port_load = resolve_port(from_port)
port_discharge = resolve_port(to_port)
if not port_load or not port_discharge:
return None
from datetime import datetime
today = datetime.now()
bl_number = f"SFBL-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name + shipper).encode()).hexdigest()[:8], 16) % 100000:05d}"
return {
'bl_number': bl_number,
'date_issued': today.strftime('%Y-%m-%d'),
'type': 'Negotiable OCEAN BILL OF LADING',
'original_copies': 3,
'shipper': {
'name': shipper,
'address': '[Shipper address]',
},
'consignee': {
'name': consignee,
'instruction': 'TO ORDER' if consignee.lower() in ('to order', 'order') else consignee,
},
'notify_party': {
'name': notify_party or consignee,
'address': '[Notify party address]',
},
'vessel': {
'name': vessel_name,
'voyage_no': voyage_no or f"V.{today.strftime('%y')}{int(hashlib.md5(vessel_name.encode()).hexdigest()[:4], 16) % 100:02d}",
'flag': '[Flag state]',
},
'port_of_loading': {
'name': port_load['name'],
'country': port_load['country'],
'unlocode': port_load.get('unlocode', ''),
},
'port_of_discharge': {
'name': port_discharge['name'],
'country': port_discharge['country'],
'unlocode': port_discharge.get('unlocode', ''),
},
'cargo': {
'description': cargo_description,
'marks_and_numbers': marks or 'N/M (No Marks)',
'packages': packages,
'gross_weight_mt': weight_mt,
'measurement': f"{round(weight_mt * 1.2, 1)} CBM" if weight_mt else None,
},
'freight': {
'terms': f'FREIGHT {freight_terms.upper()}',
'payable_at': port_load['name'],
},
'conditions': {
'shipped_on_board': today.strftime('%Y-%m-%d'),
'clean_on_board': True,
'said_to_contain': True,
'shipper_load_stow_count': True,
},
'clauses': [
'Shipped on board in apparent good order and condition',
'Weight, measure, quantity, quality, contents and value unknown',
'Subject to all terms and conditions of the Charter Party dated as per C/P',
'Clause Paramount — Hague-Visby Rules apply',
],
'place_of_issue': port_load['name'],
'signatory': 'Master or Agent on behalf of the Master',
'note': 'DRAFT — This is a digital B/L template. For legally binding B/L, use eBL platforms (Bolero, essDOCS, WAVE BL) or contact your P&I club.',
}
# =============================================================================
# FEATURE 9: CREW CHANGE OPTIMIZER
# =============================================================================
# Major crew change hubs with airport connectivity and cost factors
CREW_CHANGE_HUBS = {
'singapore': {'airport': 'SIN (Changi)', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 800, 'hotel_day': 120, 'connectivity': 10},
'rotterdam': {'airport': 'AMS (Schiphol)', 'flights': 'excellent', 'visa': 'schengen', 'agent_cost': 1200, 'hotel_day': 150, 'connectivity': 9},
'hamburg': {'airport': 'HAM', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 1100, 'hotel_day': 140, 'connectivity': 8},
'piraeus': {'airport': 'ATH', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 900, 'hotel_day': 100, 'connectivity': 8},
'istanbul': {'airport': 'IST', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 700, 'hotel_day': 80, 'connectivity': 9},
'dubai': {'airport': 'DXB', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 1000, 'hotel_day': 130, 'connectivity': 10},
'jebel_ali': {'airport': 'DXB', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 1000, 'hotel_day': 130, 'connectivity': 10},
'mumbai': {'airport': 'BOM', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 500, 'hotel_day': 60, 'connectivity': 8},
'manila': {'airport': 'MNL', 'flights': 'good', 'visa': 'easy', 'agent_cost': 400, 'hotel_day': 50, 'connectivity': 9},
'hong_kong': {'airport': 'HKG', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 900, 'hotel_day': 140, 'connectivity': 10},
'shanghai': {'airport': 'PVG', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 800, 'hotel_day': 100, 'connectivity': 8},
'busan': {'airport': 'PUS', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 700, 'hotel_day': 90, 'connectivity': 7},
'houston': {'airport': 'IAH', 'flights': 'good', 'visa': 'difficult', 'agent_cost': 1500, 'hotel_day': 130, 'connectivity': 7},
'new_york': {'airport': 'JFK', 'flights': 'excellent', 'visa': 'difficult', 'agent_cost': 1600, 'hotel_day': 180, 'connectivity': 8},
'santos': {'airport': 'GRU (São Paulo)', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 900, 'hotel_day': 90, 'connectivity': 7},
'durban': {'airport': 'DUR', 'flights': 'limited', 'visa': 'moderate', 'agent_cost': 700, 'hotel_day': 70, 'connectivity': 5},
'cape_town': {'airport': 'CPT', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 750, 'hotel_day': 80, 'connectivity': 6},
'las_palmas': {'airport': 'LPA', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 800, 'hotel_day': 90, 'connectivity': 7},
'colombo': {'airport': 'CMB', 'flights': 'good', 'visa': 'easy', 'agent_cost': 450, 'hotel_day': 50, 'connectivity': 7},
'port_said': {'airport': 'CAI (Cairo)', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 600, 'hotel_day': 60, 'connectivity': 6},
}
def optimize_crew_change(current_port: str, next_ports: list = None,
crew_count: int = None, vessel_name: str = None) -> Optional[Dict]:
"""Find optimal crew change ports along vessel route."""
port = resolve_port(current_port)
if not port:
return None
crew_count = crew_count or 20
candidates = []
# Score ports: current port + next ports + nearby hubs
ports_to_check = [current_port]
if next_ports:
ports_to_check.extend(next_ports[:5])
# Also add nearby crew change hubs
nearby = find_nearby_ports(port['lat'], port['lon'], radius_nm=500)
for n in nearby[:10]:
nkey = n.get('key', '')
if nkey in CREW_CHANGE_HUBS:
ports_to_check.append(n['name'])
seen = set()
for pname in ports_to_check:
p = resolve_port(pname)
if not p:
continue
pkey = p.get('key', '')
if pkey in seen:
continue
seen.add(pkey)
hub = CREW_CHANGE_HUBS.get(pkey)
if not hub:
# Generic scoring for non-hub ports
hub = {'airport': 'Regional', 'flights': 'limited', 'visa': 'moderate',
'agent_cost': 1000, 'hotel_day': 100, 'connectivity': 4}
# Scoring (0-100)
scores = {}
conn_map = {'excellent': 10, 'good': 7, 'limited': 4}
scores['flight_connectivity'] = hub['connectivity'] * 10
scores['visa_ease'] = {'easy': 90, 'schengen': 80, 'moderate': 60, 'difficult': 30}.get(hub['visa'], 50)
scores['cost_efficiency'] = max(0, 100 - int(hub['agent_cost'] / 20))
scores['agent_availability'] = conn_map.get(hub['flights'], 4) * 10
overall = round(sum(scores.values()) / len(scores), 1)
est_cost = hub['agent_cost'] + hub['hotel_day'] * 2 + 800 * crew_count # flights estimate
candidates.append({
'port': p['name'],
'country': p['country'],
'airport': hub['airport'],
'scores': scores,
'overall_score': overall,
'estimated_cost_usd': est_cost,
'cost_breakdown': {
'agent_fees': hub['agent_cost'],
'hotel_2_nights': hub['hotel_day'] * 2,
'flights_estimate': 800 * crew_count,
'transport_launch': 500,
},
'visa_requirement': hub['visa'],
'flight_availability': hub['flights'],
})
candidates.sort(key=lambda x: -x['overall_score'])
return {
'vessel': vessel_name or 'Unknown',
'crew_count': crew_count,
'current_location': port['name'],
'recommended_ports': candidates[:5],
'best_option': candidates[0]['port'] if candidates else None,
'note': 'Scores based on airport connectivity, visa requirements, agent availability, and cost. Actual costs depend on crew nationality, flight routes, and local regulations.',
}
# =============================================================================
# FEATURE 10: MARITIME INSURANCE CALCULATOR
# =============================================================================
# War risk / high risk areas
WAR_RISK_ZONES = {
'gulf_of_aden': {'lat_range': (11, 15), 'lon_range': (43, 51), 'premium_pct': 0.05},
'strait_of_hormuz': {'lat_range': (25, 27), 'lon_range': (54, 57), 'premium_pct': 0.03},
'red_sea': {'lat_range': (12, 30), 'lon_range': (32, 44), 'premium_pct': 0.07},
'west_africa_gog': {'lat_range': (-5, 7), 'lon_range': (-5, 10), 'premium_pct': 0.03},
'black_sea': {'lat_range': (41, 47), 'lon_range': (27, 42), 'premium_pct': 0.10},
'south_china_sea': {'lat_range': (5, 22), 'lon_range': (105, 120), 'premium_pct': 0.01},
}
def calculate_maritime_insurance(vessel_name: str = None, imo: str = None,
vessel_type: str = None, dwt: float = None,
year_built: int = None, flag: str = None,
from_port: str = None, to_port: str = None,
cargo_value: float = None, hull_value: float = None) -> Optional[Dict]:
"""Calculate maritime insurance premiums (H&M, P&I, Cargo, War Risk)."""
vtype = (vessel_type or 'bulk').lower()
# Estimate hull value if not provided
if not hull_value and dwt:
age = (datetime.now().year - year_built) if year_built else 10
base_value = dwt * 120 # ~$120/dwt for new vessel
depreciation = max(0.2, 1 - age * 0.04) # 4%/year, min 20% residual
hull_value = round(base_value * depreciation)
hull_value = hull_value or 20000000
cargo_value = cargo_value or 0
# H&M premium (0.15% - 0.45% of hull value)
age = (datetime.now().year - year_built) if year_built else 10
hm_base_rate = 0.002 # 0.2% base
if age > 20:
hm_base_rate += 0.002
elif age > 15:
hm_base_rate += 0.001
elif age > 10:
hm_base_rate += 0.0005
# Flag discount/surcharge
flag_lower = (flag or '').lower()
if flag_lower in ('panama', 'liberia', 'marshall islands', 'bahamas', 'singapore', 'hong kong'):
hm_base_rate *= 0.95 # Reputable open registries
elif flag_lower in SANCTIONED_FLAGS:
hm_base_rate *= 2.0 # Sanctioned flag = major surcharge
hm_premium = round(hull_value * hm_base_rate)
# P&I premium (based on GRT, typically $5-30/GRT)
grt_estimate = round(dwt * 0.6) if dwt else 15000
pi_rate = 8 # $/GRT base
if age > 20:
pi_rate = 15
elif age > 15:
pi_rate = 12
pi_premium = round(grt_estimate * pi_rate)
# Cargo insurance (0.1% - 0.5% of cargo value)
cargo_premium = 0
if cargo_value > 0:
cargo_rate = 0.002 # 0.2% base
cargo_premium = round(cargo_value * cargo_rate)
# War risk premium
war_risk = 0
war_zones_crossed = []
if from_port and to_port:
p1 = resolve_port(from_port)
p2 = resolve_port(to_port)
if p1 and p2:
for zone_name, zone in WAR_RISK_ZONES.items():
lat_min, lat_max = zone['lat_range']
lon_min, lon_max = zone['lon_range']
# Check if route might cross this zone (simplified)
lats = [p1['lat'], p2['lat']]
lons = [p1['lon'], p2['lon']]
if (min(lats) <= lat_max and max(lats) >= lat_min and
min(lons) <= lon_max and max(lons) >= lon_min):
war_zones_crossed.append(zone_name.replace('_', ' ').title())
war_risk += round(hull_value * zone['premium_pct'])
total = hm_premium + pi_premium + cargo_premium + war_risk
risk_factors = []
if age > 20:
risk_factors.append(f'Vessel age ({age} years) increases H&M and P&I premiums')
if war_zones_crossed:
risk_factors.append(f'Route crosses war risk zones: {", ".join(war_zones_crossed)}')
if flag_lower in SANCTIONED_FLAGS:
risk_factors.append(f'Flag state ({flag}) has sanctions exposure')
if not risk_factors:
risk_factors.append('Standard risk profile — no significant surcharges')
return {
'vessel': vessel_name or 'Unknown',
'vessel_type': vtype,
'dwt': dwt,
'age_years': age,
'hull_value_usd': hull_value,
'cargo_value_usd': cargo_value,
'premiums': {
'hull_and_machinery': {'annual_usd': hm_premium, 'rate_pct': round(hm_base_rate * 100, 3)},
'p_and_i': {'annual_usd': pi_premium, 'rate_per_grt': pi_rate, 'grt_estimate': grt_estimate},
'cargo': {'premium_usd': cargo_premium, 'rate_pct': 0.2} if cargo_value else None,
'war_risk': {'premium_usd': war_risk, 'zones': war_zones_crossed} if war_risk else None,
},
'total_annual_premium_usd': total,
'risk_factors': risk_factors,
'recommended_coverage': [
'H&M (Hull & Machinery) — physical damage to vessel',
'P&I (Protection & Indemnity) — third-party liability, crew, pollution',
'Loss of Hire — income protection during off-hire periods',
'FD&D (Freight, Demurrage & Defence) — legal costs',
],
'note': 'Indicative premiums based on vessel profile and standard market rates. Contact insurance broker for firm quotation. Underwriting subject to survey and claims history.',
}
# =============================================================================
# FEATURE 11: PORT COST ESTIMATOR
# =============================================================================
# Regional port cost multipliers (base = 1.0 for average)
def _assign_region(lat, lon):
"""Assign maritime region from coordinates."""
if lon < -100 and lat > 25: return 'USWC'
elif lon < -30 and lat > 25: return 'USEC'
elif lon < -30 and -5 < lat <= 25: return 'CARIB'
elif lon < -70 and lat <= -5: return 'SAW'
elif lon < -30 and lat <= -5: return 'SAE'
elif lon > 100 and lat < -10: return 'AUSNZ'
elif lon > 100: return 'EASIA'
elif 60 < lon <= 100 and lat > 0: return 'SASIA'
elif 40 < lon <= 60 and lat > 10: return 'GULF'
elif 10 < lon < 45 and lat < -20: return 'SAFR'
elif 30 < lon <= 45 and -10 < lat < 20: return 'ERED'
elif -5 < lon < 15 and -5 < lat < 15: return 'WAFR'
elif 25 < lon <= 40 and 40 < lat <= 48: return 'BSEA'
elif lon < 40 and lat > 48: return 'NEUR'
elif -10 <= lon <= 40 and 25 < lat <= 48: return 'MED'
else: return 'OTHER'
PORT_COST_REGIONS = {
'NEUR': 1.3, 'MED': 1.0, 'BSEA': 0.8, 'EASIA': 0.9, 'SASIA': 0.7,
'GULF': 0.85, 'ERED': 0.75, 'USWC': 1.2, 'USEC': 1.25, 'CARIB': 0.9,
'SAE': 0.8, 'SAW': 0.85, 'WAFR': 0.8, 'SAFR': 0.85, 'AUSNZ': 1.15,
'OTHER': 0.9,
}
def estimate_port_costs(port_name: str, vessel_type: str = 'bulk',
dwt: float = None, grt: float = None,
duration_days: float = 3) -> Optional[Dict]:
"""Estimate total port call costs (pilotage, towage, berth, agency, etc.)."""
port = resolve_port(port_name)
if not port:
return None
dwt = dwt or 50000
grt = grt or round(dwt * 0.6)
vtype = (vessel_type or 'bulk').lower()
port_key = port.get('key', '')
region = port.get('region') or _assign_region(port.get('lat', 0), port.get('lon', 0))
multiplier = PORT_COST_REGIONS.get(region, 1.0)
# Base costs (USD) for 50,000 GRT vessel
size_factor = grt / 50000
pilotage_in = round(2500 * size_factor * multiplier)
pilotage_out = round(2500 * size_factor * multiplier)
towage = round(5000 * size_factor * multiplier)
berth_per_day = round(1500 * size_factor * multiplier)
mooring = round(800 * size_factor * multiplier)
port_dues = round(grt * 0.08 * multiplier) # ~$0.08/GRT
light_dues = round(grt * 0.03 * multiplier) # ~$0.03/GRT
agency_fees = round(3000 * multiplier)
waste_disposal = round(500 * multiplier)
fresh_water = round(300 * duration_days)
documentation = round(400 * multiplier)
# Canal surcharges for specific ports
canal_fee = 0
canal_name = None
if port_key in ('port_said', 'suez', 'ismailia'):
canal_fee = round(dwt * 0.12)
canal_name = 'Suez Canal Transit'
elif port_key in ('colon', 'balboa', 'cristobal'):
canal_fee = round(dwt * 0.10)
canal_name = 'Panama Canal Transit'
berth_total = round(berth_per_day * duration_days)
total = (pilotage_in + pilotage_out + towage + berth_total + mooring +
port_dues + light_dues + agency_fees + waste_disposal +
fresh_water + documentation + canal_fee)
return {
'port': port['name'],
'country': port['country'],
'region': region,
'vessel_type': vtype,
'dwt': dwt,
'grt': grt,
'duration_days': duration_days,
'cost_breakdown': {
'pilotage_inbound': pilotage_in,
'pilotage_outbound': pilotage_out,
'towage': towage,
'berth_charges': berth_total,
'mooring_unmooring': mooring,
'port_dues': port_dues,
'light_dues': light_dues,
'agency_fees': agency_fees,
'waste_disposal': waste_disposal,
'fresh_water': fresh_water,
'documentation': documentation,
'canal_transit': {'fee': canal_fee, 'canal': canal_name} if canal_fee else None,
},
'total_estimated_usd': total,
'cost_per_day': round(total / max(duration_days, 1)),
'comparison': f"{'Above' if multiplier > 1.0 else 'Below'} global average (region factor: {multiplier}x)",
'note': 'Estimated based on port region tariffs and vessel size. Actual costs vary by terminal operator, berth availability, and local regulations. Request proforma DA from port agent for exact figures.',
}
# =============================================================================
# FEATURE 12: WEATHER ROUTING
# =============================================================================
# Seasonal weather patterns by region/month
WEATHER_PATTERNS = {
'north_atlantic_winter': {'months': [11, 12, 1, 2, 3], 'lat_range': (35, 65), 'lon_range': (-60, 0),
'risk': 'high', 'wave_m': (4, 8), 'wind_kn': (25, 45), 'speed_loss_pct': 15},
'north_pacific_winter': {'months': [11, 12, 1, 2, 3], 'lat_range': (35, 60), 'lon_range': (140, 180),
'risk': 'high', 'wave_m': (4, 8), 'wind_kn': (25, 45), 'speed_loss_pct': 15},
'indian_ocean_monsoon': {'months': [6, 7, 8, 9], 'lat_range': (0, 25), 'lon_range': (50, 80),
'risk': 'moderate', 'wave_m': (3, 6), 'wind_kn': (20, 35), 'speed_loss_pct': 10},
'typhoon_season_wp': {'months': [7, 8, 9, 10, 11], 'lat_range': (10, 35), 'lon_range': (120, 160),
'risk': 'high', 'wave_m': (3, 10), 'wind_kn': (25, 65), 'speed_loss_pct': 20},
'hurricane_season_carib': {'months': [6, 7, 8, 9, 10, 11], 'lat_range': (10, 30), 'lon_range': (-90, -50),
'risk': 'high', 'wave_m': (3, 10), 'wind_kn': (25, 65), 'speed_loss_pct': 20},
'cape_of_good_hope': {'months': list(range(1, 13)), 'lat_range': (-40, -30), 'lon_range': (15, 30),
'risk': 'moderate', 'wave_m': (3, 7), 'wind_kn': (20, 40), 'speed_loss_pct': 12},
'drake_passage': {'months': list(range(1, 13)), 'lat_range': (-65, -55), 'lon_range': (-70, -55),
'risk': 'high', 'wave_m': (4, 10), 'wind_kn': (30, 50), 'speed_loss_pct': 20},
}
def calculate_weather_routing(from_port: str, to_port: str, vessel_type: str = 'bulk',
dwt: float = None, departure_month: int = None) -> Optional[Dict]:
"""Calculate weather-optimized route vs direct route."""
route = calculate_sea_route(from_port, to_port, vessel_type, dwt)
if not route:
return None
port_from = resolve_port(from_port)
port_to = resolve_port(to_port)
if not port_from or not port_to:
return None
month = departure_month or datetime.now().month
# Check which weather zones the route might cross
weather_risks = []
total_speed_loss = 0
for zone_name, pattern in WEATHER_PATTERNS.items():
if month not in pattern['months']:
continue
lat_min, lat_max = pattern['lat_range']
lon_min, lon_max = pattern['lon_range']
lats = [port_from['lat'], port_to['lat']]
lons = [port_from['lon'], port_to['lon']]
if (min(lats) <= lat_max and max(lats) >= lat_min and
min(lons) <= lon_max and max(lons) >= lon_min):
weather_risks.append({
'zone': zone_name.replace('_', ' ').title(),
'risk_level': pattern['risk'],
'wave_height_m': f"{pattern['wave_m'][0]}-{pattern['wave_m'][1]}m",
'wind_speed_kn': f"{pattern['wind_kn'][0]}-{pattern['wind_kn'][1]} kn",
'speed_reduction': f"{pattern['speed_loss_pct']}%",
})
total_speed_loss = max(total_speed_loss, pattern['speed_loss_pct'])
direct_days = route['total_days']
weather_delay = round(direct_days * total_speed_loss / 100, 1)
optimized_extra_nm = round(route['distance_nm'] * 0.03) if total_speed_loss > 10 else 0
optimized_days = round(direct_days + weather_delay * 0.5 + optimized_extra_nm / (route.get('speed_knots', 14) * 24), 1)
# Fuel comparison
speed = route.get('speed_knots', 14)
fuel_day = VESSEL_FUEL_BENCHMARKS.get('handysize', {}).get('fuel_td', 30)
if dwt:
for _, bench in VESSEL_FUEL_BENCHMARKS.items():
if bench['dwt_range'][0] <= dwt <= bench['dwt_range'][1]:
fuel_day = bench['fuel_td']
break
direct_fuel = round(direct_days * fuel_day)
optimized_fuel = round(optimized_days * fuel_day * 0.9) # Eco speed on longer route
fuel_saving = direct_fuel - optimized_fuel
return {
'from': route['from'],
'to': route['to'],
'departure_month': month,
'direct_route': {
'distance_nm': route['distance_nm'],
'estimated_days': direct_days,
'fuel_consumption_mt': direct_fuel,
'canals': route.get('canals', []),
},
'optimized_route': {
'distance_nm': route['distance_nm'] + optimized_extra_nm,
'estimated_days': optimized_days,
'fuel_consumption_mt': optimized_fuel,
'deviation_nm': optimized_extra_nm,
'strategy': 'Great circle with weather avoidance' if total_speed_loss > 10 else 'Direct route acceptable',
},
'weather_risks': weather_risks,
'savings': {
'time_saved_vs_bad_weather': f"{weather_delay - (optimized_days - direct_days):.1f} days",
'fuel_difference_mt': fuel_saving,
'fuel_cost_difference_usd': round(fuel_saving * BUNKER_PRICE_USD),
},
'recommendations': [
f"{'Route deviation recommended — significant weather risk' if total_speed_loss >= 15 else 'Direct route feasible with monitoring' if total_speed_loss >= 5 else 'Direct route recommended — low weather risk'}",
f"Monitor {'tropical cyclone warnings' if any('typhoon' in r['zone'].lower() or 'hurricane' in r['zone'].lower() for r in weather_risks) else 'weather forecasts'}" if weather_risks else 'Standard weather monitoring sufficient',
],
'note': 'Weather routing based on seasonal patterns. For real-time optimization, subscribe to routing services (StormGeo, DTN, WRI).',
}
# =============================================================================
# FEATURE 13: FIXTURE RECAP GENERATOR
# =============================================================================
def generate_fixture_recap(vessel_name: str, cargo_type: str, tonnage: float,
from_port: str, to_port: str, rate: float,
laycan: str = None, charterer: str = None,
owner: str = None, demurrage_rate: float = None,
commission: float = 3.75) -> Optional[Dict]:
"""Generate a fixture recap summary."""
port_load = resolve_port(from_port)
port_discharge = resolve_port(to_port)
if not port_load or not port_discharge:
return None
route = calculate_sea_route(from_port, to_port)
from datetime import datetime, timedelta
today = datetime.now()
recap_number = f"FIX-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name + cargo_type).encode()).hexdigest()[:8], 16) % 10000:04d}"
if not laycan:
start = today + timedelta(days=10)
end = start + timedelta(days=5)
laycan = f"{start.strftime('%d %b')} - {end.strftime('%d %b %Y')}"
if not demurrage_rate:
demurrage_rate = round(max(8000, min(80000, tonnage * 0.15)))
load_rate = 5000 if (tonnage or 0) <= 50000 else 8000 if (tonnage or 0) <= 100000 else 12000
return {
'recap_number': recap_number,
'date': today.strftime('%Y-%m-%d %H:%M UTC'),
'status': 'ON SUBJECTS — Subject to charterer/owner approval within 24 hours',
'principals': {
'owner': owner or '[Owner / Disponent Owner]',
'charterer': charterer or '[Charterer]',
},
'vessel': {
'name': vessel_name,
'type': cargo_type.title() if cargo_type else 'TBN',
'dwt': tonnage,
'flag': '[TBC]',
'year_built': '[TBC]',
'class': '[TBC]',
},
'cargo': {
'description': cargo_type,
'quantity': f"{tonnage:,.0f} MT" if tonnage else 'TBC',
'tolerance': '5% MOLOO',
},
'loading': {
'port': port_load['name'],
'country': port_load['country'],
'terms': 'FIOS',
'rate': f"{load_rate:,} MT/SHINC",
},
'discharge': {
'port': port_discharge['name'],
'country': port_discharge['country'],
'terms': 'FIOS',
'rate': f"{load_rate:,} MT/SHINC",
},
'laycan': laycan,
'freight': {
'rate': f"${rate:.2f}/MT",
'total': f"${rate * tonnage:,.0f}" if tonnage else 'TBC',
'payment': '95% within 5 banking days of signing B/L, 5% on completion of discharge',
},
'demurrage_despatch': {
'demurrage': f"${demurrage_rate:,.0f}/day pro rata",
'despatch': f"${demurrage_rate / 2:,.0f}/day pro rata (half demurrage)",
},
'commission': f"{commission}% total — {commission - 1.25}% address + 1.25% brokerage",
'voyage_estimate': {
'distance_nm': route['distance_nm'] if route else 0,
'days': route['total_days'] if route else 0,
'via': route.get('canals', []) if route else [],
},
'governing_law': 'English Law, London Arbitration (LMAA)',
'cp_form': 'GENCON 2022 (or applicable standard form)',
'subjects': [
'Subject charterer board approval — 24 hours',
'Subject owner confirmation — 24 hours',
'Subject stem confirmation',
'Subject satisfactory vessel inspection',
],
'note': 'DRAFT RECAP — This is an AI-generated fixture recap template. All terms subject to main charter party agreement.',
}
# =============================================================================
# FEATURE 14: AIS ANOMALY DETECTOR
# FEATURE 15: DARK FLEET DETECTOR
# (Extracted to maritime_compliance.py — re-exported for backward compatibility)
# =============================================================================
from maritime_compliance import STS_HOTSPOTS, detect_ais_anomalies, detect_dark_fleet
# =============================================================================
# TILE FETCHER — curl_cffi (Cloudflare bypass, no browser)
# =============================================================================
def lat_lon_to_tile(lat, lon, zoom):
"""Convert lat/lon to slippy map tile X/Y at given zoom level."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
y = int((1.0 - math.log(math.tan(math.radians(lat)) + 1.0 / math.cos(math.radians(lat))) / math.pi) / 2.0 * n)
return max(0, min(x, n - 1)), max(0, min(y, n - 1))
def _fetch_mt_tile(z, x, y):
"""Fetch single MarineTraffic tile via curl_cffi (TLS fingerprint bypass).
Returns list of raw vessel dicts from tile data, or empty list on failure."""
if not _HAS_CURL_CFFI:
return []
url = f'https://www.marinetraffic.com/getData/get_data_json_4/z:{z}/X:{x}/Y:{y}/station:0'
headers = {
'Referer': f'https://www.marinetraffic.com/en/ais/home/centerx:0/centery:0/zoom:{z}',
'X-Requested-With': 'XMLHttpRequest',
}
try:
resp = cf_requests.get(url, headers=headers, impersonate='chrome', timeout=15)
if resp.status_code == 200:
data = resp.json()
if isinstance(data, dict):
rows = data.get('data', data)
if isinstance(rows, dict):
rows = rows.get('rows', [])
return rows if isinstance(rows, list) else []
return []
except Exception as e:
logger.debug(f"Tile z:{z}/X:{x}/Y:{y} fetch error: {e}")
return []
LIVE_SCAN_ZOOM = 3 # Free MT API limit: z:2-4 only. z:3 = best per-tile density.
def live_scan_area(lat, lon, radius_nm, zoom=LIVE_SCAN_ZOOM):
"""Live scan MarineTraffic tiles covering area around (lat, lon).
Uses curl_cffi to bypass Cloudflare no browser needed.
Free MT API works at z:2-4 only (z:5+ requires Pro login).
At z:3: 1-4 tiles per port query, ~1 second.
Returns list of raw MT vessel row dicts (SHIP_ID, SHIPNAME, LAT, LON, etc).
"""
if not _HAS_CURL_CFFI:
logger.warning("curl_cffi not available, live_scan_area disabled")
return []
delta_lat = radius_nm / 60.0
cos_lat = max(math.cos(math.radians(lat)), 0.01)
delta_lon = radius_nm / (60.0 * cos_lat)
x_min, y_min = lat_lon_to_tile(lat + delta_lat, lon - delta_lon, zoom) # NW
x_max, y_max = lat_lon_to_tile(lat - delta_lat, lon + delta_lon, zoom) # SE
all_rows = []
tile_count = 0
for x in range(x_min, x_max + 1):
for y in range(y_min, y_max + 1):
rows = _fetch_mt_tile(zoom, x, y)
all_rows.extend(rows)
tile_count += 1
logger.info(f"live_scan_area({lat:.1f}, {lon:.1f}, {radius_nm}nm): "
f"{tile_count} tiles z{zoom}, {len(all_rows)} raw vessels")
return all_rows
# =============================================================================
# CONVENIENCE FUNCTIONS
# =============================================================================
_parser = None
def get_parser() -> MarineTrafficParser:
"""Get singleton parser instance"""
global _parser
if _parser is None:
_parser = MarineTrafficParser()
return _parser
def search_vessel(query: str) -> List[Dict]:
"""Quick vessel search"""
return get_parser().search_vessel_public(query)
def get_vessel(mmsi: str) -> Dict:
"""Get vessel details"""
return get_parser().get_vessel_page(mmsi)
if __name__ == "__main__":
# Test
parser = MarineTrafficParser()
logger.info("API key configured: %s", parser.has_api_key())
# Test public search
logger.info("Searching for 'MAERSK'...")
results = parser.search_vessel_public("MAERSK")
for v in results[:5]:
logger.info(" - %s", v)
# Test port resolution
logger.info("Port resolution test:")
for name in ["Rotterdam", "dubai", "SGSIN", "pusan"]:
port = resolve_port(name)
if port:
logger.info(f" {name} -> {port['name']} ({port['country']}) [{port['lat']}, {port['lon']}]")
else:
logger.info(f" {name} -> NOT FOUND")