montana/Русский/Логистика/marinetraffic_parser.py

3381 lines
145 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MarineTraffic Data Parser & Maritime Tools
Web scraping, AIS data, ports DB (16,553 ports), routing engine,
cargo matching, and 15+ premium maritime tool helpers.
Logical modules (split candidates for future refactoring):
1. PORTS DATABASE (lines ~31-120) — _load_world_ports, WORLD_PORTS, PORT_ALIASES
2. CARGO CLASSIFICATION (lines ~123-175) — classify_cargo, CARGO_MAP
3. VESSEL CLASSIFICATION (lines ~176-328) — classify_vessel_type/subtype, resolve_port, find_nearby_ports
4. SCRAPER CLASS (lines ~329-750) — MarineTrafficParser (web scraping)
5. ROUTING ENGINE (lines ~751-1169) — calculate_sea_route, fuel/canal/port cost estimators
6. CARGO MATCHING (lines ~1170-1370) — fixture_match
7. FREIGHT RATES (lines ~1371-1449) — estimate_freight_rate
8. SANCTIONS SCREENING (lines ~1450-1596) — screen_sanctions
9. PORT CONGESTION (lines ~1598-1703) — estimate_port_congestion
10. BUNKER PRICES (lines ~1704-1891) — get_bunker_prices, optimize_bunker_route
11. CHARTER PARTY (lines ~1892-2002) — generate_charter_party
12. VESSEL PERFORMANCE (lines ~2003-2135) — analyze_vessel_performance
13. BILL OF LADING (lines ~2136-2215) — generate_bill_of_lading
14. CREW CHANGE (lines ~2216-2322) — optimize_crew_change
15. INSURANCE (lines ~2323-2449) — calculate_maritime_insurance
16. PORT COSTS (lines ~2445-2553) — estimate_port_costs
17. WEATHER ROUTING (lines ~2554-2661) — calculate_weather_routing
18. FIXTURE RECAP (lines ~2662-2753) — generate_fixture_recap
19. AIS ANOMALY (lines ~2754-2890) — detect_ais_anomalies
20. DARK FLEET (lines ~2891-3081) — detect_dark_fleet
21. MODULE API (lines ~3082-end) — get_parser, search_vessel, get_vessel
Ɉ MONTANA PROTOCOL — ML-DSA-65 (FIPS 204)
"""
import os
import re
import json
import hashlib
import logging
import math
import time
import requests
try:
from curl_cffi import requests as cf_requests
_HAS_CURL_CFFI = True
except ImportError:
_HAS_CURL_CFFI = False
from datetime import datetime
from typing import Optional, Dict, List
from bs4 import BeautifulSoup
logger = logging.getLogger('marinetraffic_parser')
# API key from environment (when available)
MT_API_KEY = os.environ.get("MARINETRAFFIC_API_KEY")
# Base URLs
MT_BASE = "https://www.marinetraffic.com"
MT_API_BASE = "https://services.marinetraffic.com/api"
# =============================================================================
# WORLD PORTS DATABASE (~2000 ports loaded from JSON)
# =============================================================================
def _load_world_ports():
"""Load ports from world_ports.json, fall back to minimal built-in set."""
_ports_json = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'world_ports.json')
if os.path.exists(_ports_json):
try:
with open(_ports_json, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load world_ports.json: {e}")
# Minimal fallback (top 10 ports)
return {
'rotterdam': {'name': 'Rotterdam', 'country': 'Netherlands', 'unlocode': 'NLRTM', 'lat': 51.9244, 'lon': 4.4777, 'radius_nm': 15, 'size': 'large'},
'singapore': {'name': 'Singapore', 'country': 'Singapore', 'unlocode': 'SGSIN', 'lat': 1.2644, 'lon': 103.82, 'radius_nm': 20, 'size': 'large'},
'shanghai': {'name': 'Shanghai', 'country': 'China', 'unlocode': 'CNSHA', 'lat': 31.3622, 'lon': 121.5882, 'radius_nm': 20, 'size': 'large'},
'houston': {'name': 'Houston', 'country': 'USA', 'unlocode': 'USHOU', 'lat': 29.7262, 'lon': -95.0102, 'radius_nm': 20, 'size': 'large'},
'busan': {'name': 'Busan', 'country': 'South Korea', 'unlocode': 'KRPUS', 'lat': 35.0979, 'lon': 129.0371, 'radius_nm': 15, 'size': 'large'},
'jebel_ali': {'name': 'Jebel Ali', 'country': 'UAE', 'unlocode': 'AEJEA', 'lat': 25.0117, 'lon': 55.0637, 'radius_nm': 15, 'size': 'large'},
'new_york': {'name': 'New York', 'country': 'USA', 'unlocode': 'USNYC', 'lat': 40.6701, 'lon': -74.0376, 'radius_nm': 15, 'size': 'large'},
'antwerp': {'name': 'Antwerp', 'country': 'Belgium', 'unlocode': 'BEANR', 'lat': 51.2322, 'lon': 4.3986, 'radius_nm': 12, 'size': 'large'},
'mumbai': {'name': 'Mumbai', 'country': 'India', 'unlocode': 'INBOM', 'lat': 18.94, 'lon': 72.835, 'radius_nm': 15, 'size': 'large'},
'durban': {'name': 'Durban', 'country': 'South Africa', 'unlocode': 'ZADUR', 'lat': -29.8674, 'lon': 31.0386, 'radius_nm': 12, 'size': 'large'},
}
WORLD_PORTS = _load_world_ports()
PORT_ALIASES = {
'europoort': 'rotterdam', 'hook of holland': 'rotterdam',
'changi': 'singapore', 'tanjong pagar': 'singapore', 'jurong': 'singapore',
'pusan': 'busan',
'shenzhen': 'hong_kong', 'yantian': 'hong_kong',
'dubai': 'jebel_ali', 'dp world': 'jebel_ali',
'abu dhabi': 'khalifa_port',
'new jersey': 'new_york', 'newark': 'new_york', 'bayonne': 'new_york',
'tacoma': 'seattle',
'la': 'los_angeles', 'san pedro': 'los_angeles',
'lb': 'long_beach',
'bombay': 'mumbai', 'nhava sheva': 'mumbai', 'jnpt': 'mumbai',
'madras': 'chennai',
'saigon': 'ho_chi_minh',
'tangier': 'tanger_med',
'cape of good hope': 'cape_town',
'johor': 'tanjung_pelepas',
'yangshan': 'shanghai',
'apapa': 'lagos_apapa', 'tin can island': 'tin_can_island',
'sao paulo': 'santos',
# Caspian Sea aliases
'alat port': 'alat', 'baku new port': 'alat', 'port of baku': 'alat',
'krasnovodsk': 'turkmenbashi', 'krw': 'turkmenbashi',
'anzali': 'bandar_anzali', 'bandar-e anzali': 'bandar_e_anzali',
'guryev': 'atyrau',
'noshahr': 'nowshahr',
# Russian port names (пользователь пишет на русском)
'баку': 'baku', 'алят': 'alat', 'актау': 'aktau', 'астрахань': 'astrakhan',
'махачкала': 'makhachkala', 'туркменбаши': 'turkmenbashi', 'красноводск': 'turkmenbashi',
'энзели': 'bandar_anzali', 'анзали': 'bandar_anzali',
'ноушехр': 'nowshahr', 'амирабад': 'amirabad', 'нека': 'neka',
'баутино': 'bautino', 'курык': 'kuryk', 'оля': 'olya',
'роттердам': 'rotterdam', 'сингапур': 'singapore', 'шанхай': 'shanghai',
'стамбул': 'istanbul', 'гонконг': 'hong_kong', 'дубай': 'jebel_ali',
'нью-йорк': 'new_york', 'лондон': 'london', 'гамбург': 'hamburg',
'антверпен': 'antwerp', 'пирей': 'piraeus', 'генуя': 'genoa',
'барселона': 'barcelona', 'марсель': 'marseille', 'одесса': 'odessa',
'новороссийск': 'novorossiysk', 'санкт-петербург': 'saint_petersburg',
'мумбаи': 'mumbai', 'бомбей': 'mumbai', 'токио': 'tokyo',
'пусан': 'busan', 'иокогама': 'yokohama', 'сидней': 'sydney',
'джидда': 'jeddah', 'джебель-али': 'jebel_ali',
'суэц': 'suez', 'порт-саид': 'port_said',
'хьюстон': 'houston', 'лос-анджелес': 'los_angeles',
'сантус': 'santos', 'буэнос-айрес': 'buenos_aires',
'кейптаун': 'cape_town', 'лагос': 'lagos_apapa',
'дар-эс-салам': 'dar_es_salaam',
# Legacy space-key aliases for backward compat
'jebel ali': 'jebel_ali', 'hong kong': 'hong_kong', 'new york': 'new_york',
'los angeles': 'los_angeles', 'long beach': 'long_beach', 'ho chi minh': 'ho_chi_minh',
'tanger med': 'tanger_med', 'cape town': 'cape_town', 'tanjung pelepas': 'tanjung_pelepas',
'port klang': 'port_klang', 'le havre': 'le_havre', 'dar es salaam': 'dar_es_salaam',
'port said': 'port_said', 'richards bay': 'richards_bay', 'khalifa port': 'khalifa_port',
'ras tanura': 'ras_tanura', 'bandar abbas': 'bandar_abbas', 'buenos aires': 'buenos_aires',
'new orleans': 'new_orleans', 'st petersburg': 'saint_petersburg',
'laem chabang': 'laem_chabang', 'gioia tauro': 'gioia_tauro', 'port hedland': 'port_hedland',
}
# =============================================================================
# VESSEL TYPE & NAVIGATION STATUS MAPPINGS
# =============================================================================
# Cargo → vessel type mapping
CARGO_TO_VESSEL = {
# Dry bulk
'grain': 'bulk', 'wheat': 'bulk', 'corn': 'bulk', 'rice': 'bulk', 'barley': 'bulk',
'soybean': 'bulk', 'soybeans': 'bulk', 'coal': 'bulk', 'iron ore': 'bulk', 'ore': 'bulk',
'bauxite': 'bulk', 'phosphate': 'bulk', 'fertilizer': 'bulk', 'cement': 'bulk',
'sugar': 'bulk', 'salt': 'bulk', 'steel': 'bulk', 'scrap': 'bulk', 'clinker': 'bulk',
'minerals': 'bulk', 'aggregate': 'bulk', 'sand': 'bulk', 'gravel': 'bulk',
'flour': 'general', 'meal': 'general', 'feed': 'bulk', 'animal feed': 'bulk',
# Liquid
'crude oil': 'tanker', 'crude': 'tanker', 'oil': 'tanker', 'petroleum': 'tanker',
'diesel': 'tanker', 'gasoline': 'tanker', 'fuel oil': 'tanker', 'naphtha': 'tanker',
'chemicals': 'tanker', 'palm oil': 'tanker', 'vegetable oil': 'tanker',
'lng': 'tanker', 'lpg': 'tanker', 'methanol': 'tanker',
# Container
'containers': 'container', 'container': 'container', 'teu': 'container',
'electronics': 'container', 'machinery': 'container', 'furniture': 'container',
'clothing': 'container', 'textiles': 'container', 'consumer goods': 'container',
'manufactured goods': 'container', 'general merchandise': 'container',
# Ro-Ro / vehicles
'cars': 'roro', 'vehicles': 'roro', 'trucks': 'roro', 'automobiles': 'roro',
'heavy equipment': 'roro', 'tractors': 'roro',
# General
'timber': 'general', 'lumber': 'general', 'wood': 'general', 'plywood': 'general',
'project cargo': 'general', 'breakbulk': 'general', 'pipes': 'general',
}
def classify_cargo(cargo_description: str) -> Optional[str]:
"""Classify cargo description to vessel type category."""
if not cargo_description:
return None
desc = cargo_description.lower().strip()
# Direct match
if desc in CARGO_TO_VESSEL:
return CARGO_TO_VESSEL[desc]
# Partial match
for keyword, vtype in CARGO_TO_VESSEL.items():
if keyword in desc:
return vtype
return None
# AIS ship type codes → category
VESSEL_TYPE_CATEGORIES = {
'bulk': ['bulk carrier', 'bulk', 'ore carrier'],
'tanker': ['tanker', 'oil tanker', 'chemical tanker', 'oil/chemical tanker', 'lng tanker', 'lpg tanker', 'crude oil tanker'],
'container': ['container ship', 'container', 'containership'],
'general': ['general cargo', 'cargo', 'multipurpose', 'general cargo ship'],
'passenger': ['passenger', 'cruise', 'passenger ship', 'cruise ship', 'ferry', 'ro-ro/passenger'],
'roro': ['ro-ro', 'roro', 'vehicles carrier', 'car carrier', 'ro-ro cargo'],
'offshore': ['offshore', 'supply vessel', 'platform', 'anchor handling', 'fpso', 'offshore supply ship'],
'tug': ['tug', 'tugboat', 'towing', 'pusher tug', 'pilot'],
'fishing': ['fishing', 'fishing vessel', 'trawler'],
'highspeed': ['high speed', 'high-speed', 'hsc', 'hydrofoil', 'wing in ground'],
'pleasure': ['pleasure', 'yacht', 'sailing yacht', 'motor yacht'],
'sailing': ['sailing vessel', 'sailing'],
'military': ['military', 'naval', 'warship', 'patrol'],
}
# AIS type codes (numeric) → category
AIS_TYPE_CODE_MAP = {
range(70, 80): 'cargo', # 70-79: Cargo, general cargo
range(80, 90): 'tanker', # 80-89: Tanker
range(60, 70): 'passenger', # 60-69: Passenger
range(40, 50): 'highspeed', # 40-49: High-speed craft
range(30, 30+1): 'fishing', # 30: Fishing
range(31, 33): 'tug', # 31-32: Towing
range(50, 55): 'offshore', # 50-54: Pilot/SAR/port tender
range(35, 36): 'military', # 35: Military
range(36, 37): 'sailing', # 36: Sailing
range(37, 38): 'pleasure', # 37: Pleasure craft
}
# AIS navigation status codes
NAV_STATUS_MAP = {
0: 'underway', # Under way using engine
1: 'at anchor', # At anchor
2: 'not under command',
3: 'restricted maneuverability',
4: 'constrained by draught',
5: 'moored', # Moored
6: 'aground',
7: 'fishing', # Engaged in fishing
8: 'underway sailing', # Under way sailing
14: 'ais-sart',
15: 'undefined',
}
# =============================================================================
# VESSEL SUBTYPES BY DWT (industry standard classifications)
# =============================================================================
VESSEL_SUBTYPES = {
'bulk': [
{'name': 'River-Sea', 'dwt_min': 1000, 'dwt_max': 9999, 'typical_dwt': 5000},
{'name': 'Handysize', 'dwt_min': 10000, 'dwt_max': 39999, 'typical_dwt': 28000},
{'name': 'Handymax', 'dwt_min': 40000, 'dwt_max': 49999, 'typical_dwt': 45000},
{'name': 'Supramax', 'dwt_min': 50000, 'dwt_max': 64999, 'typical_dwt': 58000},
{'name': 'Panamax', 'dwt_min': 65000, 'dwt_max': 99999, 'typical_dwt': 75000},
{'name': 'Capesize', 'dwt_min': 100000, 'dwt_max': 199999, 'typical_dwt': 180000},
{'name': 'VLOC', 'dwt_min': 200000, 'dwt_max': 400000, 'typical_dwt': 300000},
],
'tanker': [
{'name': 'River-Sea Tanker', 'dwt_min': 1000, 'dwt_max': 24999, 'typical_dwt': 7000},
{'name': 'MR (Medium Range)', 'dwt_min': 25000, 'dwt_max': 54999, 'typical_dwt': 45000},
{'name': 'LR1', 'dwt_min': 55000, 'dwt_max': 79999, 'typical_dwt': 73000},
{'name': 'Aframax', 'dwt_min': 80000, 'dwt_max': 119999, 'typical_dwt': 105000},
{'name': 'Suezmax', 'dwt_min': 120000, 'dwt_max': 199999, 'typical_dwt': 160000},
{'name': 'VLCC', 'dwt_min': 200000, 'dwt_max': 320000, 'typical_dwt': 300000},
],
'container': [
{'name': 'Feeder', 'dwt_min': 5000, 'dwt_max': 24999, 'teu_range': '500-2,500 TEU', 'typical_dwt': 15000},
{'name': 'Feedermax', 'dwt_min': 25000, 'dwt_max': 39999, 'teu_range': '2,500-5,000 TEU', 'typical_dwt': 33000},
{'name': 'Panamax', 'dwt_min': 40000, 'dwt_max': 64999, 'teu_range': '5,000-8,000 TEU', 'typical_dwt': 55000},
{'name': 'Post-Panamax', 'dwt_min': 65000, 'dwt_max': 99999, 'teu_range': '8,000-12,000 TEU', 'typical_dwt': 80000},
{'name': 'Neo-Panamax', 'dwt_min': 100000, 'dwt_max': 149999, 'teu_range': '12,000-15,000 TEU', 'typical_dwt': 120000},
{'name': 'ULCV', 'dwt_min': 150000, 'dwt_max': 250000, 'teu_range': '15,000-24,000 TEU', 'typical_dwt': 200000},
],
'general': [
{'name': 'Small General Cargo', 'dwt_min': 1000, 'dwt_max': 9999, 'typical_dwt': 5000},
{'name': 'General Cargo', 'dwt_min': 10000, 'dwt_max': 25000, 'typical_dwt': 15000},
],
}
def classify_vessel_subtype(vessel_type: str, dwt: float = None) -> Optional[Dict]:
"""
Classify vessel into subtype by DWT.
Returns {'subtype': 'Panamax', 'typical_dwt': 75000} or None.
"""
if not vessel_type:
return None
vtype = vessel_type.lower().strip()
# Map common synonyms
for category, keywords in VESSEL_TYPE_CATEGORIES.items():
for kw in keywords:
if kw in vtype:
vtype = category
break
else:
continue
break
subtypes = VESSEL_SUBTYPES.get(vtype)
if not subtypes:
return None
if dwt:
try:
dwt_val = float(dwt)
for st in subtypes:
if st['dwt_min'] <= dwt_val <= st['dwt_max']:
return {**st, 'category': vtype}
except (ValueError, TypeError):
pass
# If DWT out of range, return closest
return {**subtypes[len(subtypes) // 2], 'category': vtype, 'estimated': True}
# No DWT — return mid-range default
mid = subtypes[len(subtypes) // 2]
return {**mid, 'category': vtype, 'estimated': True}
def classify_vessel_type(type_str: str, type_code=None) -> str:
"""Classify vessel type string into a standard category."""
if type_str:
t = type_str.lower().strip()
for category, keywords in VESSEL_TYPE_CATEGORIES.items():
for kw in keywords:
if kw in t:
return category
if type_code is not None:
try:
code = int(type_code)
for code_range, category in AIS_TYPE_CODE_MAP.items():
if code in code_range:
return category
except (ValueError, TypeError):
pass
return 'other'
def get_destination_patterns(port: dict) -> List[str]:
"""Generate AIS destination search patterns for a port.
AIS destination is free-text entered by crew, e.g. "BOSTON", "US BOS", "USBOS".
We generate multiple patterns to maximize matching chances.
Args:
port: dict from resolve_port() with 'name', 'unlocode', 'country', etc.
Returns:
list of uppercase pattern strings for SQL LIKE queries
"""
patterns = set()
name = (port.get('name') or '').upper().strip()
unlocode = (port.get('unlocode') or '').upper().strip()
if name and len(name) >= 4:
patterns.add(name) # "BOSTON"
if unlocode and len(unlocode) >= 4:
patterns.add(unlocode) # "USBOS"
# Split LOCODE: country "US" + loc "BOS"
if len(unlocode) >= 5:
country = unlocode[:2]
loc_part = unlocode[2:]
patterns.add(f"{country} {loc_part}") # "US BOS"
return list(patterns)
def find_nearby_ports(lat: float, lon: float, radius_nm: float = 100) -> List[Dict]:
"""Find ports within radius_nm nautical miles from given coordinates."""
nearby = []
for key, port in WORLD_PORTS.items():
# Approximate distance in NM (1 degree lat ≈ 60 NM)
dlat = abs(port['lat'] - lat) * 60
dlon = abs(port['lon'] - lon) * 60 * max(math.cos(math.radians(lat)), 0.01)
dist_nm = math.sqrt(dlat**2 + dlon**2)
if dist_nm <= radius_nm:
nearby.append({**port, 'key': key, 'distance_nm': round(dist_nm, 1)})
nearby.sort(key=lambda p: p['distance_nm'])
return nearby
def resolve_port(query: str) -> Optional[Dict]:
"""Resolve port name to port info dict. Supports exact, alias, partial, and UNLOCODE match.
Always includes 'key' in the returned dict for reverse lookups."""
if not query:
return None
q = query.lower().strip()
q_under = q.replace(' ', '_').replace('-', '_').replace("'", "")
def _with_key(key):
port = WORLD_PORTS[key]
if 'key' not in port:
return {**port, 'key': key}
return port
# Exact match (spaces or underscores)
if q in WORLD_PORTS:
return _with_key(q)
if q_under in WORLD_PORTS:
return _with_key(q_under)
# Alias match
if q in PORT_ALIASES:
alias_key = PORT_ALIASES[q]
if alias_key in WORLD_PORTS:
return _with_key(alias_key)
# Partial match (query in port key or port key in query)
for key, port in WORLD_PORTS.items():
if q_under in key or key in q_under:
return _with_key(key)
# Match by port name (case-insensitive)
for key, port in WORLD_PORTS.items():
if q in port['name'].lower():
return _with_key(key)
# Match by UNLOCODE
q_upper = q.upper()
for key, port in WORLD_PORTS.items():
if port.get('unlocode') == q_upper:
return _with_key(key)
return None
def _generate_totp(secret: str, digits: int = 6, interval: int = 30) -> str:
"""Generate TOTP 6-digit code from base32 secret (RFC 6238). No external deps."""
import hmac
import hashlib
import struct
import base64
# Normalize base32 (uppercase, pad to multiple of 8)
secret_clean = secret.upper().replace(' ', '')
pad = (-len(secret_clean)) % 8
key = base64.b32decode(secret_clean + '=' * pad)
counter = int(time.time()) // interval
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
return str(code % (10 ** digits)).zfill(digits)
class MarineTrafficParser:
"""Parser for MarineTraffic public data + API"""
def __init__(self, api_key: str = None, totp_secret: str = None):
self.api_key = api_key or MT_API_KEY
self.totp_secret = totp_secret # Google Authenticator 2FA secret (base32)
# Use curl_cffi to impersonate Chrome TLS fingerprint → bypasses Cloudflare
if _HAS_CURL_CFFI:
self.session = cf_requests.Session(impersonate="chrome120")
logger.info("Using curl_cffi Chrome impersonation (Cloudflare bypass)")
else:
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
})
# =========================================================================
# PUBLIC DATA (no API key needed)
# =========================================================================
def search_vessel_public(self, query: str) -> List[Dict]:
"""
Search vessel using public search
Returns basic info: name, MMSI, IMO, type, flag
"""
url = f"{MT_BASE}/en/ais/index/search/all/keyword:{query}"
try:
resp = self.session.get(url, timeout=10)
if resp.status_code != 200:
return []
soup = BeautifulSoup(resp.text, 'html.parser')
results = []
# Parse search results
for item in soup.select('.search-result-item, .vessel-item'):
vessel = {}
# Name
name_el = item.select_one('.vessel-name, .ship-name, a[href*="/vessels/"]')
if name_el:
vessel['name'] = name_el.get_text(strip=True)
href = name_el.get('href', '')
# Extract MMSI from URL
if '/vessels/' in href:
parts = href.split('/')
for i, p in enumerate(parts):
if p == 'vessels' and i + 1 < len(parts):
vessel['mmsi'] = parts[-1].split('-')[0]
# Type
type_el = item.select_one('.ship-type, .vessel-type')
if type_el:
vessel['type'] = type_el.get_text(strip=True)
# Flag
flag_el = item.select_one('.flag, [class*="flag-"]')
if flag_el:
vessel['flag'] = flag_el.get('title', '') or flag_el.get_text(strip=True)
if vessel.get('name'):
results.append(vessel)
return results
except Exception as e:
logger.error(f"Search error: {e}")
return []
def get_vessel_page(self, mmsi: str) -> Dict:
"""
Get vessel details from public page
"""
url = f"{MT_BASE}/en/ais/details/ships/mmsi:{mmsi}"
try:
resp = self.session.get(url, timeout=10)
if resp.status_code != 200:
return {}
soup = BeautifulSoup(resp.text, 'html.parser')
vessel = {'mmsi': mmsi}
# Parse vessel details
# Name
name_el = soup.select_one('h1.title, .vessel-name')
if name_el:
vessel['name'] = name_el.get_text(strip=True)
# Details table
for row in soup.select('.vessel-details tr, .details-table tr'):
cells = row.select('td')
if len(cells) >= 2:
key = cells[0].get_text(strip=True).lower()
value = cells[1].get_text(strip=True)
if 'imo' in key:
vessel['imo'] = value
elif 'mmsi' in key:
vessel['mmsi'] = value
elif 'call sign' in key:
vessel['callsign'] = value
elif 'flag' in key:
vessel['flag'] = value
elif 'type' in key:
vessel['type'] = value
elif 'length' in key:
vessel['length'] = self._parse_number(value)
elif 'width' in key or 'beam' in key:
vessel['width'] = self._parse_number(value)
elif 'draught' in key or 'draft' in key:
vessel['draught'] = self._parse_number(value)
elif 'gross tonnage' in key:
vessel['gross_tonnage'] = self._parse_number(value)
elif 'deadweight' in key:
vessel['deadweight'] = self._parse_number(value)
elif 'year built' in key:
vessel['year_built'] = self._parse_number(value)
# Current position
pos_el = soup.select_one('.position-data, [data-lat], [data-lon]')
if pos_el:
vessel['latitude'] = pos_el.get('data-lat')
vessel['longitude'] = pos_el.get('data-lon')
# Last position from text
pos_text = soup.select_one('.last-position, .position-info')
if pos_text:
text = pos_text.get_text()
# Parse coordinates from text
lat_match = re.search(r'(\d+\.\d+)[°\s]*[NS]', text)
lon_match = re.search(r'(\d+\.\d+)[°\s]*[EW]', text)
if lat_match:
vessel['latitude'] = float(lat_match.group(1))
if lon_match:
vessel['longitude'] = float(lon_match.group(1))
return vessel
except Exception as e:
logger.error(f"Page parse error: {e}")
return {}
# =========================================================================
# MT PRO AUTHENTICATION & OWNERSHIP DATA
# =========================================================================
def login(self, email: str, password: str, totp_secret: str = None) -> bool:
"""Authenticate to MarineTraffic Pro. Returns True on success.
Handles email+password login + optional Google Authenticator 2FA (TOTP).
totp_secret: base32 secret from Google Authenticator (overrides self.totp_secret)."""
totp_key = totp_secret or self.totp_secret
_HDRS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
}
try:
# Step 1: GET login page to extract CSRF token
resp = self.session.get(f'{MT_BASE}/en/users/login', headers=_HDRS, timeout=15)
if resp.status_code != 200:
logger.error(f"MT login page error: {resp.status_code}")
return False
soup = BeautifulSoup(resp.text, 'html.parser')
csrf_el = soup.select_one('input[name="authenticity_token"]')
csrf_token = csrf_el['value'] if csrf_el else ''
if not csrf_token:
meta_csrf = soup.select_one('meta[name="csrf-token"]')
if meta_csrf:
csrf_token = meta_csrf.get('content', '')
# Step 2: POST login credentials
login_resp = self.session.post(
f'{MT_BASE}/en/users/login',
data={
'user[email]': email,
'user[password]': password,
'authenticity_token': csrf_token,
'commit': 'Log in',
},
headers={**_HDRS, 'Content-Type': 'application/x-www-form-urlencoded',
'Referer': f'{MT_BASE}/en/users/login', 'Origin': MT_BASE},
allow_redirects=True,
timeout=20
)
# Step 3: Check if 2FA is required
page_lower = login_resp.text.lower()
otp_indicators = ['two-factor', 'two_factor', 'otp', 'authenticator',
'verification code', 'authentication code', '2fa', 'totp']
needs_2fa = any(ind in page_lower for ind in otp_indicators)
if needs_2fa and totp_key:
logger.info("MT requires 2FA — generating TOTP code...")
otp_code = _generate_totp(totp_key)
logger.info(f"TOTP code: {otp_code}")
# Extract fresh CSRF for 2FA form
soup2 = BeautifulSoup(login_resp.text, 'html.parser')
csrf_el2 = soup2.select_one('input[name="authenticity_token"]')
csrf2 = csrf_el2['value'] if csrf_el2 else csrf_token
if not csrf2:
meta2 = soup2.select_one('meta[name="csrf-token"]')
if meta2:
csrf2 = meta2.get('content', csrf_token)
# Try common 2FA form field names
otp_fields = ['user[otp_attempt]', 'otp_attempt', 'code', 'token',
'user[two_factor_code]', 'two_factor_code']
# Detect actual field name from form
otp_input = soup2.select_one('input[type="text"][name*="otp"], input[type="number"][name*="code"], input[name*="otp"], input[name*="code"], input[name*="token"]')
if otp_input and otp_input.get('name'):
otp_fields = [otp_input['name']] + otp_fields
# POST 2FA code
two_fa_url = login_resp.url # might have redirected to /users/two_factor
for field_name in otp_fields[:2]:
otp_resp = self.session.post(
two_fa_url,
data={field_name: otp_code, 'authenticity_token': csrf2, 'commit': 'Verify'},
headers={**_HDRS, 'Content-Type': 'application/x-www-form-urlencoded',
'Referer': two_fa_url, 'Origin': MT_BASE},
allow_redirects=True,
timeout=20
)
page_lower = otp_resp.text.lower()
success_indicators = ['sign_out', 'logout', 'profile', 'my-fleet', 'subscription']
if any(ind in page_lower for ind in success_indicators):
logger.info(f"MT Pro login + 2FA successful for {email}")
return True
if 'invalid' in page_lower or 'incorrect' in page_lower:
logger.error(f"2FA code rejected (field={field_name}). Code was: {otp_code}")
break
logger.warning("2FA submission failed — checking cookies anyway")
elif needs_2fa and not totp_key:
logger.error("MT requires 2FA but no TOTP secret provided! Use --totp-secret")
return False
# Step 4: Verify login success
success_indicators = ['sign_out', 'logout', 'profile', 'my-fleet', 'subscription']
if any(ind in page_lower for ind in success_indicators):
logger.info(f"MT Pro login successful for {email}")
return True
# Check cookies for session marker
cookie_names = [c.name for c in self.session.cookies]
logger.debug(f"Session cookies after login: {cookie_names}")
if any(c in cookie_names for c in ['remember_user_token', '_mt_session', 'user_credentials']):
logger.info(f"MT Pro login successful (cookie check) for {email}")
return True
logger.warning(f"MT Pro login status uncertain for {email} — continuing anyway")
return True # Optimistic: let vessel fetches confirm
except Exception as e:
logger.error(f"MT Pro login error: {e}")
return False
def get_vessel_ownership(self, mmsi: str) -> dict:
"""Scrape full vessel page including Ownership section (requires MT Pro login).
Returns vessel specs + ownership dict with owner, operator, manager, etc."""
url = f'{MT_BASE}/en/ais/details/ships/mmsi:{mmsi}'
try:
resp = self.session.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
'Referer': MT_BASE,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
},
timeout=20
)
if resp.status_code == 404:
return {}
if resp.status_code != 200:
logger.warning(f"MT vessel page {mmsi}: HTTP {resp.status_code}")
return {}
return self._parse_vessel_full_page(resp.text, mmsi)
except Exception as e:
logger.error(f"get_vessel_ownership({mmsi}): {e}")
return {}
def _parse_vessel_full_page(self, html: str, mmsi: str) -> dict:
"""Parse full MT vessel detail page: specs (name/flag/DWT/year) + Ownership section."""
soup = BeautifulSoup(html, 'html.parser')
result = {'mmsi': mmsi}
# --- Vessel name ---
for sel in ['h1.title', '.vessel-name', 'h1', '.shipname']:
el = soup.select_one(sel)
if el:
name = el.get_text(strip=True)
if name and len(name) > 1:
result['name'] = name
break
# --- Vessel specs table ---
# MT Pro shows details in various table formats
for row in soup.select('table tr, .vessel-details tr, dl dt, .info-table tr'):
cells = row.select('td, dd, th')
if len(cells) < 2:
# Try dt/dd pairs
label_el = row if row.name == 'dt' else None
if label_el:
val_el = label_el.find_next_sibling('dd')
if val_el:
key = label_el.get_text(strip=True).lower()
val = val_el.get_text(strip=True)
self._apply_vessel_field(result, key, val)
continue
key = cells[0].get_text(strip=True).lower()
val = cells[1].get_text(strip=True)
self._apply_vessel_field(result, key, val)
# Also try JSON-LD structured data (MT sometimes embeds this)
for script in soup.select('script[type="application/ld+json"]'):
try:
data = json.loads(script.string or '{}')
if data.get('@type') == 'Product' or 'vessel' in str(data).lower():
if data.get('name') and not result.get('name'):
result['name'] = data['name']
except Exception:
pass
# --- Ownership section ---
companies = []
ownership_role_map = {
'beneficial owner': 'beneficial_owner',
'registered owner': 'registered_owner',
'commercial manager': 'commercial_manager',
'disponent owner': 'commercial_manager',
'ship manager': 'commercial_manager',
'operator': 'operator',
'charterer': 'operator',
'technical manager': 'operator',
}
# Try multiple selectors MT uses for ownership tables
ownership_rows = soup.select(
'[data-id="ownership"] tr, '
'.ownership-widget tr, '
'.widget-body table tr, '
'#ownership tr, '
'.ownership tr, '
'section.ownership table tr'
)
# Also try finding ownership by header text
if not ownership_rows:
for h in soup.find_all(['h2', 'h3', 'h4', 'div'], string=re.compile(r'[Oo]wnership')):
container = h.find_next('table')
if container:
ownership_rows = container.select('tr')
break
for row in ownership_rows:
cells = row.select('td')
if len(cells) < 2:
continue
role_text = cells[0].get_text(strip=True).lower()
company_el = cells[1]
company_name = company_el.get_text(strip=True)
if not company_name or company_name == '-' or company_name == 'N/A':
continue
# Extract company MT profile URL
company_link = company_el.select_one('a[href]')
company_href = company_link['href'] if company_link else None
if company_href and not company_href.startswith('http'):
company_href = MT_BASE + company_href
country = cells[2].get_text(strip=True) if len(cells) > 2 else ''
# Map role to field name
matched_field = None
for role_key, field in ownership_role_map.items():
if role_key in role_text:
matched_field = field
break
if not matched_field:
matched_field = 'operator' # fallback
# Set top-level fields (first occurrence wins)
if not result.get(matched_field):
result[matched_field] = company_name
if country:
result[f'{matched_field}_country'] = country
# Also build owner/operator aliases for DB compatibility
if matched_field in ('beneficial_owner', 'registered_owner') and not result.get('owner'):
result['owner'] = company_name
if country:
result['owner_country'] = country
elif matched_field == 'operator' and not result.get('operator'):
result['operator'] = company_name
if country:
result['operator_country'] = country
companies.append({
'role': role_text,
'name': company_name,
'country': country,
'mt_profile_url': company_href,
})
result['companies'] = companies # stored as companies_json
return result
def _apply_vessel_field(self, result: dict, key: str, val: str):
"""Apply a parsed key-value pair to vessel result dict."""
if not val or val in ('-', 'N/A', 'Unknown', ''):
return
if 'imo' in key and not result.get('imo'):
result['imo'] = val.strip()
elif 'mmsi' in key and key != 'mmsi' and not result.get('mmsi'):
result['mmsi'] = val.strip()
elif 'call sign' in key or 'callsign' in key:
result['callsign'] = val
elif 'flag' in key and not result.get('flag'):
result['flag'] = val
elif 'type' in key and 'ship' in key and not result.get('type'):
result['type'] = val
elif 'year' in key and 'built' in key:
result['year_built'] = self._parse_number(val)
elif 'deadweight' in key or key == 'dwt':
result['deadweight'] = self._parse_number(val)
elif 'gross' in key and 'tonnage' in key:
result['gross_tonnage'] = self._parse_number(val)
elif 'length' in key and 'overall' in key:
result['length'] = self._parse_number(val)
elif 'breadth' in key or ('width' in key and 'ext' in key):
result['width'] = self._parse_number(val)
def get_company_website(self, company_mt_url: str) -> str:
"""Try to extract company website URL from MT company profile page."""
if not company_mt_url or 'marinetraffic.com' not in company_mt_url:
return None
try:
resp = self.session.get(
company_mt_url,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'},
timeout=10
)
if resp.status_code != 200:
return None
soup = BeautifulSoup(resp.text, 'html.parser')
# MT company pages often show website in a link that goes to external domain
for a in soup.select('a[href^="http"]'):
href = a['href']
if 'marinetraffic.com' not in href and '.' in href and len(href) < 120:
# Filter obvious non-website links
if any(skip in href for skip in ['facebook.', 'twitter.', 'linkedin.', 'google.', 'youtube.']):
continue
return href.rstrip('/')
return None
except Exception as e:
logger.debug(f"get_company_website({company_mt_url}): {e}")
return None
# =========================================================================
# PORT / AREA VESSEL SCRAPING
# =========================================================================
def scrape_area_vessels(self, lat: float, lon: float, radius_nm: float = 15) -> List[Dict]:
"""
Scrape vessels in area via MarineTraffic map data JSON endpoint.
Uses tile-based internal API with session cookies.
"""
# Convert radius NM → degrees
lat_delta = radius_nm / 60.0
lon_delta = radius_nm / (60.0 * max(math.cos(math.radians(lat)), 0.01))
lat_min = lat - lat_delta
lat_max = lat + lat_delta
lon_min = lon - lon_delta
lon_max = lon + lon_delta
# Step 1: Establish session (get cookies)
try:
self.session.get(
f"{MT_BASE}/en/ais/home/centerx:{lon:.1f}/centery:{lat:.1f}/zoom:11",
timeout=10
)
except Exception:
pass
time.sleep(1) # Be polite
# Step 2: Calculate Web Mercator tile coordinates for zoom level 11
zoom = 11
def lat_lon_to_tile(lat_deg, lon_deg, z):
lat_rad = math.radians(lat_deg)
n = 2.0 ** z
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return xtile, ytile
x_tile, y_tile = lat_lon_to_tile(lat, lon, zoom)
# Step 3: Request map data
url = f"{MT_BASE}/getData/get_data_json_4/z:{zoom}/X:{x_tile}/Y:{y_tile}/station:0"
try:
resp = self.session.get(url, timeout=15, headers={
'Referer': f'{MT_BASE}/en/ais/home/centerx:{lon:.1f}/centery:{lat:.1f}/zoom:{zoom}',
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*'
})
if resp.status_code != 200:
logger.warning(f"Area scrape HTTP {resp.status_code}")
return []
data = resp.json()
vessels = []
# Parse response — format varies
rows = data
if isinstance(data, dict):
rows = data.get('data', data.get('rows', data.get('type1', [])))
if isinstance(rows, dict):
rows = rows.get('rows', [])
if not isinstance(rows, list):
return []
for item in rows:
if isinstance(item, dict):
v_lat = item.get('LAT', item.get('lat', 0))
v_lon = item.get('LON', item.get('lon', 0))
try:
v_lat = float(v_lat) if v_lat else 0
v_lon = float(v_lon) if v_lon else 0
except (ValueError, TypeError):
continue
# Filter to bounding box
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
continue
mmsi = str(item.get('MMSI', item.get('mmsi', '')))
if not mmsi or mmsi == '0':
continue
type_raw = item.get('SHIPTYPE', item.get('ship_type', item.get('type_name', '')))
type_code = item.get('TYPE_CODE', item.get('type_code'))
nav_status_code = item.get('NAVSTAT', item.get('nav_status', item.get('STATUS', None)))
nav_status = None
if nav_status_code is not None:
try:
nav_status = NAV_STATUS_MAP.get(int(nav_status_code))
except (ValueError, TypeError):
pass
dwt = item.get('DWT', item.get('dwt', item.get('DEADWEIGHT')))
vessels.append({
'mmsi': mmsi,
'name': item.get('SHIPNAME', item.get('shipname', item.get('name', ''))),
'type': str(type_raw) if type_raw else '',
'type_category': classify_vessel_type(str(type_raw) if type_raw else '', type_code),
'flag': item.get('FLAG', item.get('flag', '')),
'speed': item.get('SPEED', item.get('speed')),
'course': item.get('COURSE', item.get('course')),
'nav_status': nav_status,
'dwt': dwt,
'lat': v_lat,
'lon': v_lon,
'destination': item.get('DESTINATION', item.get('destination', '')),
})
elif isinstance(item, list) and len(item) >= 7:
# Some endpoints return arrays: [mmsi, lat, lon, speed, course, type, name, ...]
try:
v_lat = float(item[1]) / 10000 if item[1] > 1000 else float(item[1])
v_lon = float(item[2]) / 10000 if abs(item[2]) > 1000 else float(item[2])
except (ValueError, TypeError, IndexError):
continue
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
continue
type_val = str(item[5]) if len(item) > 5 else ''
vessels.append({
'mmsi': str(item[0]),
'lat': v_lat,
'lon': v_lon,
'speed': item[3] if len(item) > 3 else None,
'course': item[4] if len(item) > 4 else None,
'type': type_val,
'type_category': classify_vessel_type(type_val),
'nav_status': None,
'dwt': None,
'name': str(item[6]) if len(item) > 6 else '',
})
return vessels
except Exception as e:
logger.error(f"Area scrape error: {e}")
return []
def scrape_viewport_vessels(self, lat_min: float, lat_max: float,
lon_min: float, lon_max: float,
max_tiles: int = 6) -> List[Dict]:
"""
Scrape vessels covering a map viewport via multiple MarineTraffic tiles.
Adapts zoom level to viewport size and fetches multiple tiles.
Returns deduplicated list of vessel dicts with lat/lon.
"""
lat_span = lat_max - lat_min
lon_span = lon_max - lon_min
# Choose zoom based on viewport size (larger viewport → lower zoom → bigger tiles)
if lat_span > 20 or lon_span > 40:
zoom = 4
elif lat_span > 10 or lon_span > 20:
zoom = 5
elif lat_span > 5 or lon_span > 10:
zoom = 6
elif lat_span > 2 or lon_span > 4:
zoom = 8
else:
zoom = 10
def lat_lon_to_tile(lat_deg, lon_deg, z):
lat_rad = math.radians(lat_deg)
n = 2.0 ** z
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return xtile, ytile
# Calculate tiles covering the viewport
x_min, y_min = lat_lon_to_tile(lat_max, lon_min, zoom) # NW corner
x_max, y_max = lat_lon_to_tile(lat_min, lon_max, zoom) # SE corner
# Collect unique tiles to fetch (limit to max_tiles)
tiles = []
for x in range(x_min, x_max + 1):
for y in range(y_min, y_max + 1):
tiles.append((x, y))
if len(tiles) >= max_tiles:
break
if len(tiles) >= max_tiles:
break
if not tiles:
center_lat = (lat_min + lat_max) / 2
center_lon = (lon_min + lon_max) / 2
cx, cy = lat_lon_to_tile(center_lat, center_lon, zoom)
tiles = [(cx, cy)]
# Establish session
center_lat = (lat_min + lat_max) / 2
center_lon = (lon_min + lon_max) / 2
try:
self.session.get(
f"{MT_BASE}/en/ais/home/centerx:{center_lon:.1f}/centery:{center_lat:.1f}/zoom:{zoom}",
timeout=10
)
except Exception:
pass
# Fetch tiles
all_vessels = []
seen_mmsi = set()
for x_tile, y_tile in tiles:
try:
time.sleep(0.5) # Rate limit between tiles
url = f"{MT_BASE}/getData/get_data_json_4/z:{zoom}/X:{x_tile}/Y:{y_tile}/station:0"
resp = self.session.get(url, timeout=10, headers={
'Referer': f'{MT_BASE}/en/ais/home/centerx:{center_lon:.1f}/centery:{center_lat:.1f}/zoom:{zoom}',
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*'
})
if resp.status_code != 200:
continue
data = resp.json()
rows = data
if isinstance(data, dict):
rows = data.get('data', data.get('rows', data.get('type1', [])))
if isinstance(rows, dict):
rows = rows.get('rows', [])
if not isinstance(rows, list):
continue
for item in rows:
v = self._parse_tile_item(item, lat_min, lat_max, lon_min, lon_max)
if v and v.get('mmsi') and v['mmsi'] not in seen_mmsi:
seen_mmsi.add(v['mmsi'])
all_vessels.append(v)
except Exception as e:
logger.debug(f"Tile {x_tile},{y_tile} z{zoom} error: {e}")
continue
logger.info(f"MT viewport scrape: {len(tiles)} tiles z{zoom}, {len(all_vessels)} vessels")
return all_vessels
def _parse_tile_item(self, item, lat_min, lat_max, lon_min, lon_max) -> dict:
"""Parse a single vessel item from MT tile data."""
if isinstance(item, dict):
v_lat = item.get('LAT', item.get('lat', 0))
v_lon = item.get('LON', item.get('lon', 0))
try:
v_lat = float(v_lat) if v_lat else 0
v_lon = float(v_lon) if v_lon else 0
except (ValueError, TypeError):
return None
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
return None
mmsi = str(item.get('MMSI', item.get('mmsi', '')))
if not mmsi or mmsi == '0':
return None
type_raw = item.get('SHIPTYPE', item.get('ship_type', item.get('type_name', '')))
type_code = item.get('TYPE_CODE', item.get('type_code'))
nav_status_code = item.get('NAVSTAT', item.get('nav_status', item.get('STATUS', None)))
nav_status = None
if nav_status_code is not None:
try:
nav_status = NAV_STATUS_MAP.get(int(nav_status_code))
except (ValueError, TypeError):
pass
dwt = item.get('DWT', item.get('dwt', item.get('DEADWEIGHT')))
return {
'mmsi': mmsi,
'name': item.get('SHIPNAME', item.get('shipname', item.get('name', ''))),
'type': str(type_raw) if type_raw else '',
'type_category': classify_vessel_type(str(type_raw) if type_raw else '', type_code),
'flag': item.get('FLAG', item.get('flag', '')),
'speed': item.get('SPEED', item.get('speed')),
'course': item.get('COURSE', item.get('course')),
'nav_status': nav_status,
'dwt': dwt,
'lat': v_lat,
'lon': v_lon,
'destination': item.get('DESTINATION', item.get('destination', '')),
}
elif isinstance(item, list) and len(item) >= 7:
try:
v_lat = float(item[1]) / 10000 if item[1] > 1000 else float(item[1])
v_lon = float(item[2]) / 10000 if abs(item[2]) > 1000 else float(item[2])
except (ValueError, TypeError, IndexError):
return None
if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max):
return None
type_val = str(item[5]) if len(item) > 5 else ''
return {
'mmsi': str(item[0]),
'lat': v_lat,
'lon': v_lon,
'speed': item[3] if len(item) > 3 else None,
'course': item[4] if len(item) > 4 else None,
'type': type_val,
'type_category': classify_vessel_type(type_val),
'name': str(item[6]) if len(item) > 6 else '',
}
return None
def get_port_vessels(self, port_name: str) -> List[Dict]:
"""
Get vessels currently in/near a port.
Tries: 1) map tile scraping, 2) paid API (if key), 3) empty.
"""
port = resolve_port(port_name)
if not port:
return []
lat, lon = port['lat'], port['lon']
radius = port.get('radius_nm', 15)
# Strategy 1: Scrape map tile data
vessels = self.scrape_area_vessels(lat, lon, radius)
# Strategy 2: Paid API fallback
if not vessels and self.has_api_key():
lat_d = radius / 60.0
lon_d = radius / (60.0 * max(math.cos(math.radians(lat)), 0.01))
try:
api_result = self.api_vessels_in_area(
lat - lat_d, lat + lat_d, lon - lon_d, lon + lon_d
)
if isinstance(api_result, list):
for item in api_result:
mmsi = str(item.get('MMSI', ''))
if mmsi and mmsi != '0':
vessels.append({
'mmsi': mmsi,
'name': item.get('SHIPNAME', ''),
'type': item.get('TYPE_NAME', item.get('SHIPTYPE', '')),
'flag': item.get('FLAG', ''),
'speed': item.get('SPEED'),
'course': item.get('COURSE'),
'lat': item.get('LAT'),
'lon': item.get('LON'),
'destination': item.get('DESTINATION', ''),
})
except Exception as e:
logger.error(f"API area fallback error: {e}")
# Deduplicate by MMSI
seen = set()
unique = []
for v in vessels:
if v.get('mmsi') and v['mmsi'] not in seen:
seen.add(v['mmsi'])
unique.append(v)
return unique
# =========================================================================
# API METHODS (requires API key)
# =========================================================================
def _api_call(self, service: str, params: dict) -> dict:
"""Make API call"""
if not self.api_key:
raise ValueError("API key required. Set MARINETRAFFIC_API_KEY")
url = f"{MT_API_BASE}/{service}/{self.api_key}"
params['protocol'] = 'jsono' # JSON output
try:
resp = self.session.get(url, params=params, timeout=30)
return resp.json()
except Exception as e:
logger.error(f"API error: {e}")
return {}
def api_vessel_info(self, mmsi: str = None, imo: str = None) -> dict:
"""
PS01 - Vessel Particulars
Get detailed vessel info via API
"""
params = {}
if mmsi:
params['mmsi'] = mmsi
elif imo:
params['imo'] = imo
else:
raise ValueError("MMSI or IMO required")
return self._api_call('vesselparticulars', params)
def api_vessel_position(self, mmsi: str = None, imo: str = None) -> dict:
"""
PS07 - Single Vessel Position
Get current vessel position via API
"""
params = {'timespan': 60} # Last 60 minutes
if mmsi:
params['mmsi'] = mmsi
elif imo:
params['imo'] = imo
return self._api_call('exportvessel', params)
def api_port_calls(self, mmsi: str = None, imo: str = None) -> dict:
"""
VD02 - Port Calls
Get vessel port call history
"""
params = {}
if mmsi:
params['mmsi'] = mmsi
elif imo:
params['imo'] = imo
return self._api_call('portcalls', params)
def api_vessels_in_area(self, lat_min: float, lat_max: float,
lon_min: float, lon_max: float) -> list:
"""
PS02 - Vessels in Area
Get all vessels in geographic area
"""
params = {
'MINLAT': lat_min,
'MAXLAT': lat_max,
'MINLON': lon_min,
'MAXLON': lon_max
}
return self._api_call('exportvessels', params)
# =========================================================================
# HELPERS
# =========================================================================
def _parse_number(self, text: str) -> Optional[float]:
"""Extract number from text"""
if not text:
return None
match = re.search(r'[\d,]+\.?\d*', text.replace(',', ''))
if match:
try:
return float(match.group())
except:
pass
return None
def has_api_key(self) -> bool:
"""Check if API key is configured"""
return bool(self.api_key)
# =============================================================================
# SEA ROUTE CALCULATION
# =============================================================================
# Maritime waypoints (choke points, canals, straits)
WAYPOINTS = {
'gibraltar': {'name': 'Strait of Gibraltar', 'lat': 36.13, 'lon': -5.35},
'suez_n': {'name': 'Suez Canal (Port Said)', 'lat': 31.26, 'lon': 32.30},
'suez_s': {'name': 'Suez Canal (Suez)', 'lat': 29.97, 'lon': 32.55},
'bab_el_mandeb': {'name': 'Bab el-Mandeb Strait', 'lat': 12.65, 'lon': 43.30},
'hormuz': {'name': 'Strait of Hormuz', 'lat': 26.55, 'lon': 56.25},
'malacca': {'name': 'Strait of Malacca', 'lat': 1.27, 'lon': 103.82},
'cape_good_hope': {'name': 'Cape of Good Hope', 'lat': -34.35, 'lon': 18.50},
'panama_atl': {'name': 'Panama Canal (Atlantic)', 'lat': 9.36, 'lon': -79.90},
'panama_pac': {'name': 'Panama Canal (Pacific)', 'lat': 8.95, 'lon': -79.55},
'cape_horn': {'name': 'Cape Horn', 'lat': -55.98, 'lon': -67.28},
'dover': {'name': 'Dover Strait', 'lat': 51.13, 'lon': 1.33},
'bosphorus': {'name': 'Bosphorus Strait', 'lat': 41.12, 'lon': 29.05},
'good_hope_e': {'name': 'Cape Agulhas (East)', 'lat': -34.83, 'lon': 20.02},
'volga_don': {'name': 'Volga-Don Canal (Astrakhan)', 'lat': 46.35, 'lon': 48.03},
}
# Average ECO speeds by vessel type (knots) — modern slow-steaming
VESSEL_SPEEDS = {
'bulk': 12.0, # Eco steaming (was 14.5 design)
'tanker': 12.5, # Eco steaming (was 15 design)
'container': 15.0, # Eco steaming (was 22-25 design)
'general': 12.0,
'passenger': 18.0,
'roro': 15.0,
'offshore': 10.0,
'default': 12.5,
}
# Daily fuel consumption (metric tons/day) — ranges by DWT
# Returns (low, high) MT/day for given vessel type and DWT
def estimate_fuel_consumption(vessel_type: str, dwt: float = None) -> tuple:
"""Estimate daily fuel consumption in MT/day as (low, high) range."""
vtype = vessel_type.lower() if vessel_type else 'default'
# DWT-based tables: (dwt_threshold, low_mt_day, high_mt_day)
FUEL_TABLE = {
'bulk': [
(40000, 18, 25), # Handysize
(65000, 25, 35), # Handymax/Supramax
(100000, 30, 42), # Panamax
(200000, 40, 55), # Capesize
(999999, 55, 70), # VLOC
],
'tanker': [
(55000, 22, 30), # MR
(80000, 28, 38), # LR1
(120000, 35, 48), # Aframax
(200000, 45, 60), # Suezmax
(999999, 60, 85), # VLCC
],
'container': [
(25000, 25, 40), # Feeder
(40000, 40, 60), # Feedermax
(65000, 55, 80), # Panamax
(100000, 75, 110), # Post-Panamax
(150000, 100, 150), # Neo-Panamax
(999999, 140, 200), # ULCV
],
'general': [(999999, 12, 22)],
'roro': [(999999, 35, 55)],
'passenger': [(999999, 80, 150)],
}
table = FUEL_TABLE.get(vtype, [(999999, 25, 40)])
if dwt:
try:
dwt_val = float(dwt)
for threshold, low, high in table:
if dwt_val <= threshold:
return (low, high)
except (ValueError, TypeError):
pass
# Default: mid-range entry
mid = table[len(table) // 2]
return (mid[1], mid[2])
# Canal transit costs — DWT-based (USD)
def estimate_suez_cost(dwt: float = None) -> tuple:
"""Estimate Suez Canal transit cost as (low, high) USD based on SCNT formula."""
if not dwt:
return (200000, 600000)
try:
d = float(dwt)
# Simplified SCNT-based tiers (Suez Canal Net Tonnage)
# Actual formula uses SCNT, but we approximate from DWT
if d <= 30000:
return (80000, 150000)
elif d <= 60000:
return (150000, 300000)
elif d <= 100000:
return (250000, 450000)
elif d <= 200000:
return (350000, 700000)
else:
return (500000, 1200000)
except (ValueError, TypeError):
return (200000, 600000)
def estimate_panama_cost(dwt: float = None) -> tuple:
"""Estimate Panama Canal transit cost as (low, high) USD based on PC/UMS tonnage."""
if not dwt:
return (150000, 500000)
try:
d = float(dwt)
if d <= 30000:
return (50000, 120000)
elif d <= 60000:
return (100000, 250000)
elif d <= 100000:
return (200000, 400000)
elif d <= 150000:
return (350000, 600000)
else:
# Neo-Panamax locks
return (500000, 900000)
except (ValueError, TypeError):
return (150000, 500000)
CANAL_TRANSIT_HOURS = {
'suez': 12,
'panama': 10,
}
# Bunker fuel price (VLSFO, USD/ton) — approximate 2024-2026 range
BUNKER_PRICE_USD = 600
# Port charges — DWT-based (USD per call)
def estimate_port_charges(dwt: float = None) -> tuple:
"""Estimate port charges per call as (low, high) USD."""
if not dwt:
return (15000, 50000)
try:
d = float(dwt)
if d <= 30000:
return (8000, 18000)
elif d <= 60000:
return (15000, 30000)
elif d <= 100000:
return (25000, 50000)
elif d <= 200000:
return (40000, 80000)
else:
return (60000, 150000)
except (ValueError, TypeError):
return (15000, 50000)
# Port region assignments for routing
PORT_REGIONS = {}
def _assign_regions():
"""Assign routing regions to ports based on coordinates."""
for key, port in WORLD_PORTS.items():
lat, lon = port['lat'], port['lon']
# Americas first (all lon < -30)
if lon < -100 and lat > 25:
region = 'USWC' # US West Coast
elif lon < -30 and lat > 25:
region = 'USEC' # US East Coast
elif lon < -30 and -5 < lat <= 25:
region = 'CARIB' # Caribbean / Central America
elif lon < -70 and lat <= -5:
region = 'SAW' # South America West
elif lon < -30 and lat <= -5:
region = 'SAE' # South America East
# Asia-Pacific
elif lon > 100 and lat < -10:
region = 'AUSNZ' # Australia / NZ
elif lon > 100:
region = 'EASIA' # East/SE Asia
elif 60 < lon <= 100 and lat > 0:
region = 'SASIA' # South Asia
elif 36 <= lat <= 47 and 46 <= lon <= 55:
region = 'CASP' # Caspian Sea (landlocked, before GULF)
elif 40 < lon <= 60 and lat > 10:
region = 'GULF' # Persian Gulf
# Africa
elif 10 < lon < 45 and lat < -20:
region = 'SAFR' # Southern Africa (must be before ERED)
elif 30 < lon <= 45 and -10 < lat < 20:
region = 'ERED' # East Africa / Red Sea
elif -5 < lon < 15 and -5 < lat < 15:
region = 'WAFR' # West Africa
# Europe
elif 25 < lon <= 40 and 40 < lat <= 48:
region = 'BSEA' # Black Sea (Istanbul, Novorossiysk, Constanta)
elif lon < 40 and lat > 48:
region = 'NEUR' # North Europe + Baltic (above 48N)
elif -10 <= lon <= 40 and 25 < lat <= 48:
region = 'MED' # Mediterranean + Southern Europe
else:
region = 'OTHER'
PORT_REGIONS[key] = region
_assign_regions()
# Route templates: from_region → to_region → waypoint sequence
ROUTE_WAYPOINTS = {
('NEUR', 'EASIA'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('NEUR', 'SASIA'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb'],
('NEUR', 'GULF'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('NEUR', 'MED'): ['dover', 'gibraltar'],
('NEUR', 'USEC'): [], # Direct Atlantic
('NEUR', 'SAFR'): ['dover', 'gibraltar', 'cape_good_hope'],
('NEUR', 'WAFR'): ['dover', 'gibraltar'],
('NEUR', 'ERED'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb'],
('NEUR', 'CARIB'): [], # Direct Atlantic
('NEUR', 'SAE'): [],
('NEUR', 'AUSNZ'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('NEUR', 'BSEA'): ['dover'],
('MED', 'EASIA'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('MED', 'SASIA'): ['suez_n', 'suez_s', 'bab_el_mandeb'],
('MED', 'GULF'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('MED', 'NEUR'): ['gibraltar', 'dover'],
('MED', 'USEC'): ['gibraltar'],
('MED', 'SAFR'): ['gibraltar', 'cape_good_hope'],
('MED', 'ERED'): ['suez_n', 'suez_s', 'bab_el_mandeb'],
('MED', 'AUSNZ'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('BSEA', 'MED'): ['bosphorus'],
('BSEA', 'EASIA'): ['bosphorus', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'],
('BSEA', 'GULF'): ['bosphorus', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('GULF', 'EASIA'): ['hormuz', 'malacca'],
('GULF', 'SASIA'): ['hormuz'],
('GULF', 'NEUR'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
('GULF', 'MED'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n'],
('GULF', 'USEC'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar'],
('GULF', 'SAFR'): ['hormuz', 'bab_el_mandeb', 'cape_good_hope'],
('EASIA', 'SASIA'): ['malacca'],
('EASIA', 'GULF'): ['malacca', 'hormuz'],
('EASIA', 'NEUR'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
('EASIA', 'MED'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n'],
('EASIA', 'USEC'): ['panama_pac', 'panama_atl'],
('EASIA', 'USWC'): [], # Direct Pacific
('EASIA', 'SAFR'): ['malacca', 'cape_good_hope'],
('EASIA', 'AUSNZ'): ['malacca'],
('SASIA', 'EASIA'): ['malacca'],
('SASIA', 'NEUR'): ['bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
('SASIA', 'MED'): ['bab_el_mandeb', 'suez_s', 'suez_n'],
('SASIA', 'GULF'): ['hormuz'],
('USEC', 'EASIA'): ['panama_atl', 'panama_pac'],
('USEC', 'NEUR'): [], # Direct Atlantic
('USEC', 'MED'): ['gibraltar'],
('USEC', 'GULF'): ['gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'],
('USEC', 'SAE'): [],
('USEC', 'CARIB'): [],
('USWC', 'EASIA'): [], # Direct Pacific
('USWC', 'NEUR'): ['panama_pac', 'panama_atl'],
('USWC', 'SAW'): [],
('USWC', 'AUSNZ'): [],
('SAFR', 'NEUR'): ['cape_good_hope', 'gibraltar', 'dover'],
('SAFR', 'EASIA'): ['cape_good_hope', 'malacca'],
('SAFR', 'SASIA'): ['cape_good_hope'],
('SAE', 'NEUR'): [],
('SAE', 'USEC'): [],
('SAE', 'EASIA'): ['cape_good_hope', 'malacca'],
('SAW', 'USEC'): ['panama_pac', 'panama_atl'],
('SAW', 'USWC'): [],
('CARIB', 'NEUR'): [],
('CARIB', 'USEC'): [],
('CARIB', 'EASIA'): ['panama_atl', 'panama_pac'],
('AUSNZ', 'EASIA'): ['malacca'],
('AUSNZ', 'NEUR'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'],
# Caspian Sea (landlocked — river-sea vessels only via Volga-Don Canal)
('CASP', 'CASP'): [], # Direct within Caspian
('CASP', 'BSEA'): ['volga_don'], # Via Volga-Don → Azov → Black Sea
('CASP', 'MED'): ['volga_don', 'bosphorus'], # Caspian → Black Sea → Med
('CASP', 'NEUR'): ['volga_don', 'bosphorus', 'gibraltar', 'dover'],
('CASP', 'GULF'): [], # INSTC: Caspian → Iran overland → AG
('CASP', 'SASIA'): [], # INSTC: Caspian → Iran → India
('BSEA', 'CASP'): ['volga_don'],
('MED', 'CASP'): ['bosphorus', 'volga_don'],
('NEUR', 'CASP'): ['dover', 'gibraltar', 'bosphorus', 'volga_don'],
('GULF', 'CASP'): [], # INSTC reverse
('SASIA', 'CASP'): [], # INSTC reverse
}
def _haversine_nm(lat1, lon1, lat2, lon2):
"""Calculate great circle distance in nautical miles."""
R = 3440.065 # Earth radius in NM
lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.asin(math.sqrt(min(a, 1.0)))
return R * c
def calculate_sea_route(from_port: str, to_port: str, vessel_type: str = 'default',
dwt: float = None) -> Optional[Dict]:
"""
Calculate sea route between two ports.
Returns distance, time, waypoints, estimated cost RANGES based on DWT.
"""
port_a = resolve_port(from_port)
port_b = resolve_port(to_port)
if not port_a or not port_b:
return None
# Get port keys for region lookup (resolve_port includes 'key')
key_a = port_a.get('key')
key_b = port_b.get('key')
if not key_a or not key_b:
return None
region_a = PORT_REGIONS.get(key_a, 'OTHER')
region_b = PORT_REGIONS.get(key_b, 'OTHER')
# Find waypoint sequence
wp_keys = ROUTE_WAYPOINTS.get((region_a, region_b))
if wp_keys is None:
# Try reverse
wp_keys_rev = ROUTE_WAYPOINTS.get((region_b, region_a))
if wp_keys_rev is not None:
wp_keys = list(reversed(wp_keys_rev))
else:
wp_keys = [] # Direct route
# Build coordinate chain: port_a → waypoints → port_b
chain = [(port_a['lat'], port_a['lon'], port_a['name'])]
for wpk in wp_keys:
wp = WAYPOINTS[wpk]
chain.append((wp['lat'], wp['lon'], wp['name']))
chain.append((port_b['lat'], port_b['lon'], port_b['name']))
# Calculate total distance
total_nm = 0.0
legs = []
for i in range(len(chain) - 1):
lat1, lon1, name1 = chain[i]
lat2, lon2, name2 = chain[i + 1]
dist = _haversine_nm(lat1, lon1, lat2, lon2)
# Sea routes are ~15-20% longer than great circle due to coastlines
sea_dist = dist * 1.15
total_nm += sea_dist
legs.append({
'from': name1,
'to': name2,
'distance_nm': round(sea_dist),
})
# Canals used — DWT-based costs
canals_used = []
canal_cost_low = 0
canal_cost_high = 0
canal_hours = 0
if 'suez_n' in wp_keys or 'suez_s' in wp_keys:
s_low, s_high = estimate_suez_cost(dwt)
canals_used.append({'name': 'Suez Canal', 'cost_low': s_low, 'cost_high': s_high})
canal_cost_low += s_low
canal_cost_high += s_high
canal_hours += CANAL_TRANSIT_HOURS['suez']
if 'panama_atl' in wp_keys or 'panama_pac' in wp_keys:
p_low, p_high = estimate_panama_cost(dwt)
canals_used.append({'name': 'Panama Canal', 'cost_low': p_low, 'cost_high': p_high})
canal_cost_low += p_low
canal_cost_high += p_high
canal_hours += CANAL_TRANSIT_HOURS['panama']
# Speed and time
vtype = vessel_type.lower() if vessel_type else 'default'
speed = VESSEL_SPEEDS.get(vtype, VESSEL_SPEEDS['default'])
sailing_hours = total_nm / speed
total_hours = sailing_hours + canal_hours
total_days = total_hours / 24
# Cost estimation — ranges
fuel_low, fuel_high = estimate_fuel_consumption(vtype, dwt)
fuel_cost_low = (sailing_hours / 24) * fuel_low * BUNKER_PRICE_USD
fuel_cost_high = (sailing_hours / 24) * fuel_high * BUNKER_PRICE_USD
port_low, port_high = estimate_port_charges(dwt)
port_cost_low = port_low * 2 # Load + discharge
port_cost_high = port_high * 2
total_cost_low = fuel_cost_low + canal_cost_low + port_cost_low
total_cost_high = fuel_cost_high + canal_cost_high + port_cost_high
# Vessel subtype info
subtype = classify_vessel_subtype(vtype, dwt)
# Waypoint names for display
via_points = []
for wpk in wp_keys:
wp = WAYPOINTS[wpk]
# Skip pairs (suez_n + suez_s → "Suez Canal", panama_atl + panama_pac → "Panama Canal")
if wpk in ('suez_s', 'panama_pac'):
continue
via_points.append(wp['name'])
# Build coordinate chain for map polyline
route_coords = [[lat, lon] for lat, lon, _ in chain]
result = {
'from': port_a['name'],
'from_country': port_a['country'],
'from_lat': port_a['lat'],
'from_lon': port_a['lon'],
'to': port_b['name'],
'to_country': port_b['country'],
'to_lat': port_b['lat'],
'to_lon': port_b['lon'],
'route_coords': route_coords,
'distance_nm': round(total_nm),
'distance_km': round(total_nm * 1.852),
'vessel_type': vtype,
'speed_knots': speed,
'sailing_hours': round(sailing_hours),
'canal_transit_hours': canal_hours,
'total_hours': round(total_hours),
'total_days': round(total_days, 1),
'via': via_points,
'canals': [c['name'] for c in canals_used],
'legs': legs,
'cost_estimate': {
'fuel_usd': {'low': round(fuel_cost_low), 'high': round(fuel_cost_high)},
'canal_fees_usd': {'low': canal_cost_low, 'high': canal_cost_high},
'port_charges_usd': {'low': port_cost_low, 'high': port_cost_high},
'total_usd': {'low': round(total_cost_low), 'high': round(total_cost_high)},
'bunker_price_usd_ton': BUNKER_PRICE_USD,
'note': 'Estimated range based on vessel size. Actual costs depend on market rates, cargo, season, and operator.'
}
}
if dwt:
result['dwt'] = dwt
if subtype:
result['vessel_subtype'] = subtype.get('name')
return result
# =============================================================================
# FEATURE 1: FIXTURE MATCHING ENGINE
# =============================================================================
def fixture_match(cargo_type: str, tonnage: float, from_port: str,
to_port: str = None, vessels: list = None) -> Dict:
"""
Match cargo to suitable vessels with scoring.
Returns scored candidates + route info.
"""
# Determine required vessel type
vtype = classify_cargo(cargo_type)
if not vtype:
return {'error': f"Cannot classify cargo '{cargo_type}'", 'candidates': []}
load_port = resolve_port(from_port)
if not load_port:
return {'error': f"Port '{from_port}' not found", 'candidates': []}
# Subtype recommendation based on tonnage
subtype = classify_vessel_subtype(vtype, tonnage * 1.15 if tonnage else None) # 15% margin
# Score each vessel
scored = []
for v in (vessels or []):
score = 0
reasons = []
# Type match (40 pts)
v_cat = v.get('type_category', '')
if v_cat == vtype:
score += 40
reasons.append('type_match')
elif vtype in (v.get('type', '') or '').lower():
score += 25
reasons.append('partial_type_match')
else:
continue # Skip non-matching types entirely
# DWT fit (30 pts)
v_dwt = None
try:
v_dwt = float(v.get('dwt', 0) or 0)
except (ValueError, TypeError):
pass
if v_dwt and tonnage:
ratio = v_dwt / tonnage if tonnage > 0 else 0
if 1.05 <= ratio <= 1.5:
score += 30 # Perfect fit (5-50% margin)
reasons.append('dwt_perfect')
elif 1.0 <= ratio <= 2.0:
score += 20
reasons.append('dwt_acceptable')
elif ratio > 2.0:
score += 5
reasons.append('dwt_oversized')
else:
score += 0
reasons.append('dwt_too_small')
elif not v_dwt:
score += 10 # Unknown DWT — neutral
reasons.append('dwt_unknown')
# Availability — anchored/moored vessels are more likely available (20 pts)
nav = v.get('nav_status', '')
spd = 0
try:
spd = float(v.get('speed', 0) or 0)
except (ValueError, TypeError):
pass
if nav in ('at anchor', 'moored') or spd < 0.5:
score += 20
reasons.append('likely_available')
elif spd < 3:
score += 15
reasons.append('slow_moving')
else:
score += 5
reasons.append('underway')
# Proximity bonus (10 pts) — vessels closer to load port
v_lat = v.get('lat')
v_lon = v.get('lon')
if v_lat and v_lon:
try:
dist = _haversine_nm(float(v_lat), float(v_lon),
load_port['lat'], load_port['lon'])
if dist < 50:
score += 10
reasons.append('very_close')
elif dist < 200:
score += 7
reasons.append('nearby')
elif dist < 500:
score += 3
reasons.append('regional')
except (ValueError, TypeError):
pass
v_scored = {**v, '_score': score, '_match_reasons': reasons}
if v_dwt:
v_scored['_subtype'] = classify_vessel_subtype(vtype, v_dwt)
scored.append(v_scored)
# Sort by score descending
scored.sort(key=lambda x: -x['_score'])
# Route calculation if destination provided
route = None
if to_port:
route = calculate_sea_route(from_port, to_port, vtype, dwt=tonnage)
return {
'cargo_type': cargo_type,
'vessel_type_required': vtype,
'recommended_subtype': subtype.get('name') if subtype else None,
'recommended_dwt_range': f"{subtype['dwt_min']:,}{subtype['dwt_max']:,}" if subtype else None,
'loading_port': load_port['name'],
'tonnage': tonnage,
'candidates': scored[:15],
'total_matches': len(scored),
'route': route,
}
# =============================================================================
# FEATURE 2: FREIGHT RATE INTELLIGENCE
# =============================================================================
# Freight rates $/ton by route corridor + vessel type (Q1 2026 approximations)
# Based on Baltic Exchange indices: BDI (bulk), BDTI (tanker), SCFI (container)
FREIGHT_RATES = {
'bulk': {
# route_key: (low_$/ton, high_$/ton, benchmark)
('EASIA', 'NEUR'): (18, 28, 'C3 Cape Brazil-China equiv'),
('NEUR', 'EASIA'): (12, 20, 'Backhaul'),
('SAE', 'EASIA'): (15, 25, 'C5TC Capesize Brazil-China'),
('AUSNZ', 'EASIA'): (8, 14, 'C5 Australia-China'),
('GULF', 'EASIA'): (12, 20, 'AG-Far East'),
('USEC', 'NEUR'): (10, 18, 'USG-Continent grain'),
('USEC', 'EASIA'): (25, 40, 'USG-Far East'),
('SAE', 'NEUR'): (12, 20, 'ECSA-Continent'),
('BSEA', 'MED'): (8, 15, 'Black Sea-Med grain'),
('BSEA', 'EASIA'): (22, 35, 'Black Sea-Far East'),
('SAFR', 'EASIA'): (10, 16, 'S.Africa-China'),
('SASIA', 'EASIA'): (6, 12, 'India-China coastal'),
('MED', 'NEUR'): (6, 12, 'Short-sea Med-Continent'),
('NEUR', 'USEC'): (8, 14, 'Continent-USG backhaul'),
('CASP', 'CASP'): (8, 18, 'Caspian Sea internal'),
('CASP', 'BSEA'): (20, 40, 'Caspian-Black Sea via Volga-Don'),
('BSEA', 'CASP'): (18, 35, 'Black Sea-Caspian via Volga-Don'),
('CASP', 'GULF'): (12, 25, 'Caspian-Persian Gulf INSTC'),
('GULF', 'CASP'): (14, 28, 'Persian Gulf-Caspian INSTC'),
('CASP', 'SASIA'): (22, 45, 'Caspian-India INSTC corridor'),
},
'tanker': {
('GULF', 'EASIA'): (8, 15, 'TD3C VLCC AG-China'),
('GULF', 'NEUR'): (10, 18, 'AG-UKC'),
('GULF', 'USEC'): (12, 20, 'AG-USAC'),
('GULF', 'MED'): (9, 16, 'AG-Med'),
('GULF', 'SASIA'): (5, 10, 'AG-India'),
('WAFR', 'EASIA'): (12, 22, 'WAF-East'),
('WAFR', 'USEC'): (8, 14, 'WAF-USG'),
('USEC', 'NEUR'): (6, 12, 'USG-UKC'),
('NEUR', 'USEC'): (5, 10, 'UKC-USG backhaul'),
('MED', 'NEUR'): (4, 8, 'Med-Continent'),
('BSEA', 'MED'): (5, 10, 'CPC-Med Aframax'),
('EASIA', 'EASIA'): (3, 7, 'Intra-Asia'),
('CASP', 'CASP'): (6, 14, 'Caspian tanker coastal'),
('CASP', 'BSEA'): (18, 35, 'Caspian-Black Sea crude via VDC'),
('BSEA', 'CASP'): (16, 30, 'Black Sea-Caspian products via VDC'),
('CASP', 'GULF'): (10, 22, 'Caspian-AG crude shuttle'),
('CASP', 'MED'): (25, 50, 'Caspian-Med crude via BTC/pipeline'),
},
'container': {
# $/TEU rates (not per ton)
('EASIA', 'NEUR'): (1200, 2800, 'SCFI Shanghai-Europe'),
('EASIA', 'USEC'): (2500, 5000, 'SCFI Shanghai-USEC'),
('EASIA', 'USWC'): (1800, 3500, 'SCFI Shanghai-USWC'),
('EASIA', 'MED'): (1400, 3000, 'Shanghai-Med'),
('NEUR', 'EASIA'): (400, 900, 'Europe-Asia backhaul'),
('NEUR', 'USEC'): (800, 1500, 'Europe-USEC'),
('EASIA', 'SASIA'): (300, 700, 'Intra-Asia'),
('EASIA', 'AUSNZ'): (600, 1200, 'China-Australia'),
('EASIA', 'ERED'): (800, 1500, 'Asia-East Africa'),
('EASIA', 'SAE'): (1000, 2000, 'Asia-S.America'),
('MED', 'ERED'): (600, 1200, 'Med-East Africa'),
('USEC', 'SAE'): (500, 1000, 'US-S.America'),
},
}
# DWT multipliers for rate adjustment (smaller vessels = higher $/ton)
_RATE_DWT_MULTIPLIER = {
'bulk': [
(10000, 1.8), # River-Sea premium (Caspian/inland)
(40000, 1.4), # Handysize premium
(65000, 1.15), # Supramax
(100000, 1.0), # Panamax (base)
(200000, 0.85), # Capesize discount
(999999, 0.75), # VLOC
],
'tanker': [
(25000, 1.8), # River-Sea tanker premium (Caspian/inland)
(55000, 1.5), # MR premium
(80000, 1.2), # LR1
(120000, 1.0), # Aframax (base)
(200000, 0.8), # Suezmax
(999999, 0.65), # VLCC
],
}
def estimate_freight_rate(from_port: str, to_port: str, vessel_type: str = 'bulk',
dwt: float = None) -> Optional[Dict]:
"""
Estimate freight rate for a route + vessel type.
Returns $/ton range (bulk/tanker) or $/TEU range (container).
"""
port_a = resolve_port(from_port)
port_b = resolve_port(to_port)
if not port_a or not port_b:
return None
# Get regions (resolve_port includes 'key')
key_a = port_a.get('key')
key_b = port_b.get('key')
if not key_a or not key_b:
return None
region_a = PORT_REGIONS.get(key_a, 'OTHER')
region_b = PORT_REGIONS.get(key_b, 'OTHER')
vtype = vessel_type.lower() if vessel_type else 'bulk'
rates_table = FREIGHT_RATES.get(vtype, {})
# Direct match
rate_data = rates_table.get((region_a, region_b))
# Try reverse with discount (backhaul)
if not rate_data:
rev = rates_table.get((region_b, region_a))
if rev:
rate_data = (int(rev[0] * 0.6), int(rev[1] * 0.7), f"Backhaul from {rev[2]}")
# Fallback: estimate from distance
if not rate_data:
route = calculate_sea_route(from_port, to_port, vtype, dwt=dwt)
if route:
dist = route['distance_nm']
# ~$0.003-$0.006/ton/NM for bulk
base_rates = {'bulk': (0.003, 0.006), 'tanker': (0.0025, 0.005), 'container': (0.3, 0.7)}
r = base_rates.get(vtype, (0.003, 0.006))
rate_data = (round(dist * r[0]), round(dist * r[1]), 'Distance-based estimate')
else:
return None
low, high, benchmark = rate_data
# DWT adjustment
multiplier = 1.0
if dwt and vtype in _RATE_DWT_MULTIPLIER:
try:
d = float(dwt)
for threshold, mult in _RATE_DWT_MULTIPLIER[vtype]:
if d <= threshold:
multiplier = mult
break
except (ValueError, TypeError):
pass
low_adj = round(low * multiplier)
high_adj = round(high * multiplier)
unit = '$/TEU' if vtype == 'container' else '$/ton'
subtype = classify_vessel_subtype(vtype, dwt)
return {
'from': port_a['name'],
'to': port_b['name'],
'vessel_type': vtype,
'vessel_subtype': subtype.get('name') if subtype else None,
'rate_low': low_adj,
'rate_high': high_adj,
'unit': unit,
'benchmark': benchmark,
'dwt': dwt,
'dwt_multiplier': round(multiplier, 2),
'note': 'Indicative market rate range. Actual fixtures depend on market conditions, vessel age, port costs, and negotiation.',
}
# =============================================================================
# FEATURE 3: SANCTIONS & COMPLIANCE SCREENER
# (Extracted to maritime_compliance.py — re-exported for backward compatibility)
# =============================================================================
from maritime_compliance import (
SANCTIONED_FLAGS, SANCTIONED_ENTITIES, DARK_FLEET_INDICATORS,
screen_sanctions, _max_risk,
)
# =============================================================================
# FEATURE 4: PORT CONGESTION PREDICTOR
# =============================================================================
# Average port congestion data (wait days by port, updated periodically)
# Sources: public port authority reports, shipping indices
PORT_CONGESTION = {
# port_key: {'wait_days': (low, high), 'berth_utilization': %, 'trend': up/down/stable}
'shanghai': {'wait_days': (1, 3), 'berth_utilization': 85, 'trend': 'stable', 'peak_months': [3, 4, 9, 10, 11]},
'singapore': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [1, 2, 11, 12]},
'rotterdam': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'antwerp': {'wait_days': (0.5, 2), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'houston': {'wait_days': (1, 4), 'berth_utilization': 82, 'trend': 'up', 'peak_months': [1, 2, 6, 7, 8]},
'long_beach': {'wait_days': (0.5, 3), 'berth_utilization': 72, 'trend': 'down', 'peak_months': [8, 9, 10, 11]},
'los_angeles': {'wait_days': (0.5, 3), 'berth_utilization': 73, 'trend': 'down', 'peak_months': [8, 9, 10, 11]},
'busan': {'wait_days': (0.5, 1.5), 'berth_utilization': 70, 'trend': 'stable', 'peak_months': [3, 4, 10, 11]},
'ningbo': {'wait_days': (1, 3), 'berth_utilization': 88, 'trend': 'up', 'peak_months': [3, 4, 9, 10, 11]},
'hong_kong': {'wait_days': (0.5, 1), 'berth_utilization': 60, 'trend': 'down', 'peak_months': [1, 2, 10, 11]},
'jebel_ali': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [1, 2, 3, 11, 12]},
'santos': {'wait_days': (2, 6), 'berth_utilization': 90, 'trend': 'up', 'peak_months': [2, 3, 4, 5]},
'paranagua': {'wait_days': (3, 10), 'berth_utilization': 92, 'trend': 'up', 'peak_months': [2, 3, 4, 5]},
'port_hedland': {'wait_days': (1, 4), 'berth_utilization': 88, 'trend': 'stable', 'peak_months': [6, 7, 8, 9]},
'richards_bay': {'wait_days': (2, 7), 'berth_utilization': 85, 'trend': 'stable', 'peak_months': [6, 7, 8]},
'new_orleans': {'wait_days': (1, 4), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [9, 10, 11, 12]},
'novorossiysk': {'wait_days': (2, 8), 'berth_utilization': 85, 'trend': 'up', 'peak_months': [6, 7, 8, 9]},
'istanbul': {'wait_days': (1, 5), 'berth_utilization': 82, 'trend': 'stable', 'peak_months': [6, 7, 8, 9]},
'piraeus': {'wait_days': (0.5, 1.5), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [6, 7, 8]},
'laem_chabang': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [11, 12, 1]},
'mumbai': {'wait_days': (1, 4), 'berth_utilization': 85, 'trend': 'up', 'peak_months': [10, 11, 12, 1]},
'durban': {'wait_days': (2, 5), 'berth_utilization': 88, 'trend': 'up', 'peak_months': [1, 2, 3]},
'callao': {'wait_days': (1, 3), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [2, 3, 4]},
'vancouver': {'wait_days': (1, 5), 'berth_utilization': 82, 'trend': 'up', 'peak_months': [8, 9, 10, 11]},
'savannah': {'wait_days': (0.5, 2), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [8, 9, 10]},
'new_york': {'wait_days': (0.5, 2), 'berth_utilization': 76, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'hamburg': {'wait_days': (0.5, 1.5), 'berth_utilization': 70, 'trend': 'down', 'peak_months': [9, 10, 11]},
'felixstowe': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [9, 10, 11]},
'tanger_med': {'wait_days': (0.5, 1), 'berth_utilization': 72, 'trend': 'stable', 'peak_months': [6, 7, 8]},
'port_said': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [1, 2, 3]},
}
def estimate_port_congestion(port_name: str, month: int = None) -> Optional[Dict]:
"""
Estimate port congestion level and expected wait time.
Returns congestion assessment with wait days, level, and trend.
"""
port = resolve_port(port_name)
if not port:
return None
# Get port key (resolve_port includes 'key')
port_key = port.get('key')
if not port_key:
return None
congestion = PORT_CONGESTION.get(port_key)
if not congestion:
# Default estimate based on region
region = PORT_REGIONS.get(port_key, 'OTHER')
defaults = {
'EASIA': {'wait_days': (1, 3), 'berth_utilization': 80, 'trend': 'stable'},
'NEUR': {'wait_days': (0.5, 2), 'berth_utilization': 72, 'trend': 'stable'},
'MED': {'wait_days': (0.5, 2), 'berth_utilization': 75, 'trend': 'stable'},
'GULF': {'wait_days': (0.5, 2), 'berth_utilization': 75, 'trend': 'stable'},
'USEC': {'wait_days': (0.5, 3), 'berth_utilization': 76, 'trend': 'stable'},
'USWC': {'wait_days': (0.5, 2), 'berth_utilization': 70, 'trend': 'stable'},
}
congestion = defaults.get(region, {'wait_days': (1, 3), 'berth_utilization': 75, 'trend': 'stable'})
wait_low, wait_high = congestion['wait_days']
utilization = congestion['berth_utilization']
trend = congestion['trend']
# Seasonal adjustment
current_month = month or datetime.now().month
peak_months = congestion.get('peak_months', [])
is_peak = current_month in peak_months
if is_peak:
wait_low = round(wait_low * 1.3, 1)
wait_high = round(wait_high * 1.5, 1)
utilization = min(98, utilization + 8)
# Congestion level
if utilization >= 90 or wait_high >= 6:
level = 'severe'
elif utilization >= 80 or wait_high >= 3:
level = 'moderate'
elif utilization >= 70:
level = 'normal'
else:
level = 'low'
return {
'port': port['name'],
'country': port['country'],
'congestion_level': level,
'wait_days': {'low': wait_low, 'high': wait_high},
'berth_utilization_pct': utilization,
'trend': trend,
'is_peak_season': is_peak,
'peak_months': peak_months,
'note': 'Estimate based on historical patterns and seasonal trends. Real-time congestion may vary.',
}
# =============================================================================
# FEATURE 5: BUNKER PRICE OPTIMIZER
# =============================================================================
# VLSFO bunker prices by major bunkering port (USD/ton, approximate Q1 2026)
BUNKER_PRICES = {
# port_key: {'vlsfo': $/ton, 'hsfo': $/ton, 'mgo': $/ton, 'supply': 'good'|'limited'}
'singapore': {'vlsfo': 580, 'hsfo': 440, 'mgo': 750, 'supply': 'excellent'},
'fujairah': {'vlsfo': 570, 'hsfo': 430, 'mgo': 740, 'supply': 'excellent'},
'rotterdam': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'excellent'},
'houston': {'vlsfo': 575, 'hsfo': 430, 'mgo': 730, 'supply': 'excellent'},
'busan': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'hong_kong': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'good'},
'shanghai': {'vlsfo': 605, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'antwerp': {'vlsfo': 595, 'hsfo': 455, 'mgo': 775, 'supply': 'good'},
'hamburg': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'piraeus': {'vlsfo': 585, 'hsfo': 445, 'mgo': 760, 'supply': 'good'},
'istanbul': {'vlsfo': 595, 'hsfo': 450, 'mgo': 770, 'supply': 'good'},
'algeciras': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'good'},
'tanger_med': {'vlsfo': 580, 'hsfo': 440, 'mgo': 750, 'supply': 'good'},
'port_said': {'vlsfo': 600, 'hsfo': 460, 'mgo': 790, 'supply': 'limited'},
'durban': {'vlsfo': 620, 'hsfo': 480, 'mgo': 810, 'supply': 'limited'},
'cape_town': {'vlsfo': 615, 'hsfo': 475, 'mgo': 800, 'supply': 'limited'},
'santos': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'limited'},
'colon': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'good'},
'new_york': {'vlsfo': 595, 'hsfo': 450, 'mgo': 775, 'supply': 'good'},
'los_angeles': {'vlsfo': 600, 'hsfo': 460, 'mgo': 785, 'supply': 'good'},
'long_beach': {'vlsfo': 600, 'hsfo': 460, 'mgo': 785, 'supply': 'good'},
'vancouver': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'good'},
'jebel_ali': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'excellent'},
'mumbai': {'vlsfo': 605, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
'colombo': {'vlsfo': 595, 'hsfo': 455, 'mgo': 775, 'supply': 'good'},
'laem_chabang': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'limited'},
'jeddah': {'vlsfo': 580, 'hsfo': 440, 'mgo': 755, 'supply': 'good'},
'salalah': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'limited'},
'marsaxlokk': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'limited'},
'kaohsiung': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'},
}
def get_bunker_prices(port_name: str) -> Optional[Dict]:
"""Get bunker fuel prices for a port + nearby alternatives."""
port = resolve_port(port_name)
if not port:
return None
# Get port key (resolve_port includes 'key')
port_key = port.get('key')
prices = BUNKER_PRICES.get(port_key) if port_key else None
# Find nearby bunkering alternatives
alternatives = []
nearby = find_nearby_ports(port['lat'], port['lon'], radius_nm=500)
for np in nearby:
np_key = np.get('key')
if np_key and np_key != port_key and np_key in BUNKER_PRICES:
bp = BUNKER_PRICES[np_key]
alternatives.append({
'port': np['name'],
'distance_nm': np['distance_nm'],
'vlsfo': bp['vlsfo'],
'hsfo': bp['hsfo'],
'mgo': bp['mgo'],
'supply': bp['supply'],
})
alternatives.sort(key=lambda x: x['vlsfo']) # Sort by cheapest VLSFO
result = {
'port': port['name'],
'country': port['country'],
}
if prices:
result.update({
'vlsfo_usd_ton': prices['vlsfo'],
'hsfo_usd_ton': prices['hsfo'],
'mgo_usd_ton': prices['mgo'],
'supply_level': prices['supply'],
})
else:
result['note'] = 'No direct bunker price data for this port.'
if alternatives:
result['alternatives'] = alternatives[:5]
# Savings calculation
if prices and alternatives:
cheapest = alternatives[0]
if cheapest['vlsfo'] < prices['vlsfo']:
saving = prices['vlsfo'] - cheapest['vlsfo']
result['cheapest_alternative'] = {
'port': cheapest['port'],
'vlsfo': cheapest['vlsfo'],
'saving_per_ton': saving,
'distance_nm': cheapest['distance_nm'],
'note': f'Save ${saving}/ton VLSFO by bunkering at {cheapest["port"]} ({cheapest["distance_nm"]} NM away)',
}
return result
def optimize_bunker_route(from_port: str, to_port: str, vessel_type: str = 'bulk',
dwt: float = None) -> Optional[Dict]:
"""
Find optimal bunkering ports along a route.
Returns route with bunker price comparison at key stops.
"""
route = calculate_sea_route(from_port, to_port, vessel_type, dwt=dwt)
if not route:
return None
# Collect bunkering options along the route
bunker_options = []
# Check departure port
dep_bunker = get_bunker_prices(from_port)
if dep_bunker and dep_bunker.get('vlsfo_usd_ton'):
bunker_options.append({
'port': dep_bunker['port'],
'position': 'departure',
'vlsfo': dep_bunker['vlsfo_usd_ton'],
'supply': dep_bunker.get('supply_level', 'unknown'),
})
# Check waypoint ports (canals often have nearby bunkering)
waypoint_bunker_ports = {
'gibraltar': 'algeciras',
'suez_n': 'port said',
'bab_el_mandeb': 'jeddah',
'hormuz': 'fujairah',
'malacca': 'singapore',
'panama_atl': 'colon',
'cape_good_hope': 'cape town',
'dover': 'rotterdam',
}
for wp in route.get('via', []):
wp_lower = wp.lower()
for waypoint_key, bunker_port_key in waypoint_bunker_ports.items():
if waypoint_key.replace('_', ' ') in wp_lower or wp_lower in waypoint_key:
bp = BUNKER_PRICES.get(bunker_port_key)
if bp:
port_info = WORLD_PORTS.get(bunker_port_key)
bunker_options.append({
'port': port_info['name'] if port_info else bunker_port_key,
'position': 'en_route',
'vlsfo': bp['vlsfo'],
'supply': bp['supply'],
})
break
# Check arrival port
arr_bunker = get_bunker_prices(to_port)
if arr_bunker and arr_bunker.get('vlsfo_usd_ton'):
bunker_options.append({
'port': arr_bunker['port'],
'position': 'arrival',
'vlsfo': arr_bunker['vlsfo_usd_ton'],
'supply': arr_bunker.get('supply_level', 'unknown'),
})
# Find cheapest
if bunker_options:
cheapest = min(bunker_options, key=lambda x: x['vlsfo'])
cheapest['recommended'] = True
# Estimate savings
fuel_low, fuel_high = estimate_fuel_consumption(vessel_type, dwt)
avg_fuel = (fuel_low + fuel_high) / 2
sailing_days = route['total_days']
total_fuel_tons = avg_fuel * sailing_days
return {
'route': {'from': route['from'], 'to': route['to'], 'distance_nm': route['distance_nm'],
'total_days': route['total_days'], 'canals': route['canals']},
'estimated_fuel_tons': round(total_fuel_tons),
'bunker_options': bunker_options,
'price_spread': {
'cheapest_vlsfo': min(b['vlsfo'] for b in bunker_options) if bunker_options else None,
'most_expensive_vlsfo': max(b['vlsfo'] for b in bunker_options) if bunker_options else None,
'potential_saving_usd': round((max(b['vlsfo'] for b in bunker_options) - min(b['vlsfo'] for b in bunker_options)) * total_fuel_tons) if bunker_options else 0,
},
'note': 'VLSFO prices are approximate and fluctuate daily. Contact bunker suppliers for live quotes.',
}
# =============================================================================
# FEATURE 6: CHARTER PARTY GENERATOR
# =============================================================================
def generate_charter_party(vessel_name: str, cargo_type: str, tonnage: float,
from_port: str, to_port: str, rate: float = None,
laydays: str = None, demurrage_rate: float = None,
charterer: str = None, owner: str = None) -> Optional[Dict]:
"""Generate a structured charter party agreement."""
port_load = resolve_port(from_port)
port_discharge = resolve_port(to_port)
if not port_load or not port_discharge:
return None
route = calculate_sea_route(from_port, to_port, cargo_type)
distance = route['distance_nm'] if route else 0
voyage_days = route['total_days'] if route else 0
# Estimate rate if not provided
cargo_lower = cargo_type.lower() if cargo_type else 'bulk'
if not rate:
freight = estimate_freight_rate(from_port, to_port, cargo_lower)
rate = freight['rate_high'] if freight else 15.0
# Default laytime calculation (based on tonnage, ~5000t/day loading rate)
load_rate = 5000 if tonnage <= 50000 else 8000 if tonnage <= 100000 else 12000
laytime_days = round(tonnage / load_rate, 1) if tonnage else 5
if not demurrage_rate:
demurrage_rate = round(tonnage * 0.15 if tonnage else 15000, 0)
demurrage_rate = min(max(demurrage_rate, 8000), 80000)
from datetime import datetime, timedelta
today = datetime.now()
laycan_start = today + timedelta(days=14)
laycan_end = laycan_start + timedelta(days=5)
cp_number = f"CP-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name or 'TBN').encode()).hexdigest()[:8], 16) % 10000:04d}"
return {
'cp_number': cp_number,
'date': today.strftime('%Y-%m-%d'),
'type': 'Voyage Charter Party',
'parties': {
'owner': owner or '[Owner name to be inserted]',
'charterer': charterer or '[Charterer name to be inserted]',
},
'vessel': {
'name': vessel_name or 'TBN (To Be Nominated)',
'type': cargo_lower.title(),
'dwt': tonnage,
},
'cargo': {
'type': cargo_type,
'quantity_mt': tonnage,
'tolerance': '5% MOLOO (More or Less at Owner\'s Option)',
'stowage_factor': 'As per standard',
},
'ports': {
'loading': {
'name': port_load['name'],
'country': port_load['country'],
'unlocode': port_load.get('unlocode', ''),
'terms': 'FIOS (Free In/Out Stowed)',
},
'discharge': {
'name': port_discharge['name'],
'country': port_discharge['country'],
'unlocode': port_discharge.get('unlocode', ''),
'terms': 'FIOS (Free In/Out Stowed)',
},
},
'commercial': {
'freight_rate': f"${rate:.2f}/MT",
'total_freight': f"${rate * tonnage:,.0f}",
'payment': 'Within 5 banking days of completion of loading, less address commission 3.75%',
'laycan': laydays or f"{laycan_start.strftime('%d %b')} - {laycan_end.strftime('%d %b %Y')}",
},
'laytime': {
'loading_days': laytime_days,
'discharge_days': laytime_days,
'total_days': laytime_days * 2,
'loading_rate': f"{load_rate:,} MT/day SHINC",
'commencement': 'NOR tendered, whether in berth or not (WIBON), whether in port or not (WIPON)',
},
'demurrage': {
'rate_per_day': f"${demurrage_rate:,.0f}/day",
'despatch': f"${demurrage_rate / 2:,.0f}/day (half demurrage rate)",
'payment': 'Within 30 days of completion of discharge',
},
'voyage': {
'distance_nm': distance,
'estimated_days': voyage_days,
'canals': route.get('canals', []) if route else [],
},
'clauses': [
'General Average to be settled according to York-Antwerp Rules 2016',
'Both-to-Blame Collision Clause as per Conwartime 2013',
'War Risk Clause — Voywar 2013',
'Ice Clause — if applicable',
'BIMCO Sanctions Clause 2020',
'BIMCO Infectious or Contagious Diseases Clause 2022',
'ISM/ISPS Clause as per BIMCO',
'US Clause Paramount — if US port involved',
'Hague-Visby Rules to apply',
],
'arbitration': 'London, English Law — LMAA Terms',
'note': 'DRAFT — This is an AI-generated charter party template. Must be reviewed by qualified maritime lawyers before execution.',
}
# =============================================================================
# FEATURE 7: VESSEL PERFORMANCE ANALYTICS
# =============================================================================
# Typical fuel consumption by vessel subtype (tons/day at design speed)
VESSEL_FUEL_BENCHMARKS = {
'capesize': {'speed_kn': 14.5, 'fuel_td': 55, 'dwt_range': (100000, 200000)},
'panamax_bulk': {'speed_kn': 14.0, 'fuel_td': 35, 'dwt_range': (65000, 99999)},
'supramax': {'speed_kn': 13.5, 'fuel_td': 28, 'dwt_range': (50000, 64999)},
'handysize': {'speed_kn': 13.0, 'fuel_td': 22, 'dwt_range': (15000, 49999)},
'vlcc': {'speed_kn': 15.0, 'fuel_td': 80, 'dwt_range': (200000, 350000)},
'suezmax': {'speed_kn': 14.5, 'fuel_td': 55, 'dwt_range': (120000, 199999)},
'aframax': {'speed_kn': 14.5, 'fuel_td': 45, 'dwt_range': (80000, 119999)},
'mr_tanker': {'speed_kn': 14.0, 'fuel_td': 30, 'dwt_range': (45000, 79999)},
'ulcv': {'speed_kn': 22.0, 'fuel_td': 250, 'dwt_range': (150000, 250000)},
'neo_panamax': {'speed_kn': 21.0, 'fuel_td': 180, 'dwt_range': (100000, 149999)},
'panamax_container': {'speed_kn': 20.0, 'fuel_td': 130, 'dwt_range': (50000, 99999)},
'feeder': {'speed_kn': 18.0, 'fuel_td': 50, 'dwt_range': (5000, 49999)},
}
def analyze_vessel_performance(vessel_name: str = None, imo: str = None,
dwt: float = None, speed: float = None,
vessel_type: str = None, year_built: int = None,
fuel_consumption: float = None) -> Optional[Dict]:
"""Analyze vessel performance: fuel efficiency, speed, hull condition."""
if not vessel_type and not dwt:
return {'error': 'Vessel type or DWT required for performance analysis'}
vtype = (vessel_type or 'bulk').lower()
# Find benchmark
benchmark = None
if dwt:
for name, data in VESSEL_FUEL_BENCHMARKS.items():
low, high = data['dwt_range']
if low <= dwt <= high and (vtype in name or name in vtype or vtype == 'bulk' or vtype == 'tanker'):
benchmark = {**data, 'subtype': name}
break
if not benchmark:
# Default by type
defaults = {'bulk': 'handysize', 'tanker': 'mr_tanker', 'container': 'feeder'}
key = defaults.get(vtype, 'handysize')
benchmark = {**VESSEL_FUEL_BENCHMARKS[key], 'subtype': key}
design_speed = benchmark['speed_kn']
design_fuel = benchmark['fuel_td']
actual_speed = speed or design_speed
actual_fuel = fuel_consumption or design_fuel
# Vessel age impact
current_year = datetime.now().year
age = (current_year - year_built) if year_built else 10
age_fuel_penalty = min(age * 0.8, 20) # ~0.8% per year, max 20%
expected_fuel = round(design_fuel * (1 + age_fuel_penalty / 100), 1)
# Speed vs fuel (cubic relationship)
speed_ratio = actual_speed / design_speed if design_speed else 1
theoretical_fuel = round(design_fuel * (speed_ratio ** 3), 1)
# Efficiency score (100 = perfect)
if actual_fuel > 0 and theoretical_fuel > 0:
efficiency = round(min(100, (theoretical_fuel / actual_fuel) * 100), 1)
else:
efficiency = round(100 - age_fuel_penalty, 1)
# Hull condition estimate
if efficiency >= 95:
hull_condition = 'Excellent — recently dry-docked'
elif efficiency >= 85:
hull_condition = 'Good — within normal parameters'
elif efficiency >= 75:
hull_condition = 'Fair — hull cleaning recommended within 3-6 months'
elif efficiency >= 65:
hull_condition = 'Below average — significant fouling likely, dry-dock recommended'
else:
hull_condition = 'Poor — urgent hull treatment needed, significant fuel waste'
# CII rating estimate (Carbon Intensity Indicator)
if efficiency >= 90:
cii_rating = 'A'
elif efficiency >= 80:
cii_rating = 'B'
elif efficiency >= 70:
cii_rating = 'C'
elif efficiency >= 60:
cii_rating = 'D'
else:
cii_rating = 'E'
recommendations = []
if actual_speed > design_speed * 0.95:
recommendations.append(f'Slow steaming to {design_speed * 0.85:.1f} kn could save ~{round(design_fuel * 0.25)}t/day fuel')
if efficiency < 85:
recommendations.append('Hull cleaning/dry-docking would improve fuel efficiency by 10-15%')
if age > 15:
recommendations.append('Consider engine overhaul or retrofit for improved performance')
if cii_rating in ('D', 'E'):
recommendations.append(f'CII rating {cii_rating} — corrective plan required under IMO regulations')
if not recommendations:
recommendations.append('Vessel performing within expected parameters')
return {
'vessel': vessel_name or 'Unknown',
'imo': imo or '',
'vessel_type': vtype,
'subtype': benchmark['subtype'].replace('_', ' ').title(),
'dwt': dwt,
'age_years': age,
'year_built': year_built,
'performance': {
'design_speed_kn': design_speed,
'actual_speed_kn': actual_speed,
'speed_utilization_pct': round(speed_ratio * 100, 1),
'design_fuel_td': design_fuel,
'expected_fuel_td': expected_fuel,
'actual_fuel_td': actual_fuel,
'efficiency_score': efficiency,
},
'hull_condition': hull_condition,
'cii_rating': cii_rating,
'cii_note': 'IMO Carbon Intensity Indicator (A=best, E=worst). D/E requires corrective action plan.',
'fleet_comparison': f"{'Above' if efficiency >= 80 else 'Below'} average for {benchmark['subtype'].replace('_',' ')} fleet",
'annual_fuel_cost_estimate': {
'at_current_speed': f"${round(actual_fuel * 365 * BUNKER_PRICE_USD):,}",
'at_eco_speed': f"${round(design_fuel * 0.7 * 365 * BUNKER_PRICE_USD):,}",
'potential_annual_saving': f"${round((actual_fuel - design_fuel * 0.7) * 365 * BUNKER_PRICE_USD):,}",
},
'recommendations': recommendations,
'note': 'Performance estimates based on vessel class benchmarks and reported parameters. Actual performance depends on sea/weather conditions, cargo, trim, and maintenance.',
}
# =============================================================================
# FEATURE 8: DIGITAL BILL OF LADING
# =============================================================================
def generate_bill_of_lading(shipper: str, consignee: str, vessel_name: str,
from_port: str, to_port: str, cargo_description: str,
weight_mt: float = None, packages: int = None,
notify_party: str = None, voyage_no: str = None,
marks: str = None, freight_terms: str = 'PREPAID') -> Optional[Dict]:
"""Generate a structured digital Bill of Lading."""
port_load = resolve_port(from_port)
port_discharge = resolve_port(to_port)
if not port_load or not port_discharge:
return None
from datetime import datetime
today = datetime.now()
bl_number = f"SFBL-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name + shipper).encode()).hexdigest()[:8], 16) % 100000:05d}"
return {
'bl_number': bl_number,
'date_issued': today.strftime('%Y-%m-%d'),
'type': 'Negotiable OCEAN BILL OF LADING',
'original_copies': 3,
'shipper': {
'name': shipper,
'address': '[Shipper address]',
},
'consignee': {
'name': consignee,
'instruction': 'TO ORDER' if consignee.lower() in ('to order', 'order') else consignee,
},
'notify_party': {
'name': notify_party or consignee,
'address': '[Notify party address]',
},
'vessel': {
'name': vessel_name,
'voyage_no': voyage_no or f"V.{today.strftime('%y')}{int(hashlib.md5(vessel_name.encode()).hexdigest()[:4], 16) % 100:02d}",
'flag': '[Flag state]',
},
'port_of_loading': {
'name': port_load['name'],
'country': port_load['country'],
'unlocode': port_load.get('unlocode', ''),
},
'port_of_discharge': {
'name': port_discharge['name'],
'country': port_discharge['country'],
'unlocode': port_discharge.get('unlocode', ''),
},
'cargo': {
'description': cargo_description,
'marks_and_numbers': marks or 'N/M (No Marks)',
'packages': packages,
'gross_weight_mt': weight_mt,
'measurement': f"{round(weight_mt * 1.2, 1)} CBM" if weight_mt else None,
},
'freight': {
'terms': f'FREIGHT {freight_terms.upper()}',
'payable_at': port_load['name'],
},
'conditions': {
'shipped_on_board': today.strftime('%Y-%m-%d'),
'clean_on_board': True,
'said_to_contain': True,
'shipper_load_stow_count': True,
},
'clauses': [
'Shipped on board in apparent good order and condition',
'Weight, measure, quantity, quality, contents and value unknown',
'Subject to all terms and conditions of the Charter Party dated as per C/P',
'Clause Paramount — Hague-Visby Rules apply',
],
'place_of_issue': port_load['name'],
'signatory': 'Master or Agent on behalf of the Master',
'note': 'DRAFT — This is a digital B/L template. For legally binding B/L, use eBL platforms (Bolero, essDOCS, WAVE BL) or contact your P&I club.',
}
# =============================================================================
# FEATURE 9: CREW CHANGE OPTIMIZER
# =============================================================================
# Major crew change hubs with airport connectivity and cost factors
CREW_CHANGE_HUBS = {
'singapore': {'airport': 'SIN (Changi)', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 800, 'hotel_day': 120, 'connectivity': 10},
'rotterdam': {'airport': 'AMS (Schiphol)', 'flights': 'excellent', 'visa': 'schengen', 'agent_cost': 1200, 'hotel_day': 150, 'connectivity': 9},
'hamburg': {'airport': 'HAM', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 1100, 'hotel_day': 140, 'connectivity': 8},
'piraeus': {'airport': 'ATH', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 900, 'hotel_day': 100, 'connectivity': 8},
'istanbul': {'airport': 'IST', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 700, 'hotel_day': 80, 'connectivity': 9},
'dubai': {'airport': 'DXB', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 1000, 'hotel_day': 130, 'connectivity': 10},
'jebel_ali': {'airport': 'DXB', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 1000, 'hotel_day': 130, 'connectivity': 10},
'mumbai': {'airport': 'BOM', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 500, 'hotel_day': 60, 'connectivity': 8},
'manila': {'airport': 'MNL', 'flights': 'good', 'visa': 'easy', 'agent_cost': 400, 'hotel_day': 50, 'connectivity': 9},
'hong_kong': {'airport': 'HKG', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 900, 'hotel_day': 140, 'connectivity': 10},
'shanghai': {'airport': 'PVG', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 800, 'hotel_day': 100, 'connectivity': 8},
'busan': {'airport': 'PUS', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 700, 'hotel_day': 90, 'connectivity': 7},
'houston': {'airport': 'IAH', 'flights': 'good', 'visa': 'difficult', 'agent_cost': 1500, 'hotel_day': 130, 'connectivity': 7},
'new_york': {'airport': 'JFK', 'flights': 'excellent', 'visa': 'difficult', 'agent_cost': 1600, 'hotel_day': 180, 'connectivity': 8},
'santos': {'airport': 'GRU (São Paulo)', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 900, 'hotel_day': 90, 'connectivity': 7},
'durban': {'airport': 'DUR', 'flights': 'limited', 'visa': 'moderate', 'agent_cost': 700, 'hotel_day': 70, 'connectivity': 5},
'cape_town': {'airport': 'CPT', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 750, 'hotel_day': 80, 'connectivity': 6},
'las_palmas': {'airport': 'LPA', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 800, 'hotel_day': 90, 'connectivity': 7},
'colombo': {'airport': 'CMB', 'flights': 'good', 'visa': 'easy', 'agent_cost': 450, 'hotel_day': 50, 'connectivity': 7},
'port_said': {'airport': 'CAI (Cairo)', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 600, 'hotel_day': 60, 'connectivity': 6},
}
def optimize_crew_change(current_port: str, next_ports: list = None,
crew_count: int = None, vessel_name: str = None) -> Optional[Dict]:
"""Find optimal crew change ports along vessel route."""
port = resolve_port(current_port)
if not port:
return None
crew_count = crew_count or 20
candidates = []
# Score ports: current port + next ports + nearby hubs
ports_to_check = [current_port]
if next_ports:
ports_to_check.extend(next_ports[:5])
# Also add nearby crew change hubs
nearby = find_nearby_ports(port['lat'], port['lon'], radius_nm=500)
for n in nearby[:10]:
nkey = n.get('key', '')
if nkey in CREW_CHANGE_HUBS:
ports_to_check.append(n['name'])
seen = set()
for pname in ports_to_check:
p = resolve_port(pname)
if not p:
continue
pkey = p.get('key', '')
if pkey in seen:
continue
seen.add(pkey)
hub = CREW_CHANGE_HUBS.get(pkey)
if not hub:
# Generic scoring for non-hub ports
hub = {'airport': 'Regional', 'flights': 'limited', 'visa': 'moderate',
'agent_cost': 1000, 'hotel_day': 100, 'connectivity': 4}
# Scoring (0-100)
scores = {}
conn_map = {'excellent': 10, 'good': 7, 'limited': 4}
scores['flight_connectivity'] = hub['connectivity'] * 10
scores['visa_ease'] = {'easy': 90, 'schengen': 80, 'moderate': 60, 'difficult': 30}.get(hub['visa'], 50)
scores['cost_efficiency'] = max(0, 100 - int(hub['agent_cost'] / 20))
scores['agent_availability'] = conn_map.get(hub['flights'], 4) * 10
overall = round(sum(scores.values()) / len(scores), 1)
est_cost = hub['agent_cost'] + hub['hotel_day'] * 2 + 800 * crew_count # flights estimate
candidates.append({
'port': p['name'],
'country': p['country'],
'airport': hub['airport'],
'scores': scores,
'overall_score': overall,
'estimated_cost_usd': est_cost,
'cost_breakdown': {
'agent_fees': hub['agent_cost'],
'hotel_2_nights': hub['hotel_day'] * 2,
'flights_estimate': 800 * crew_count,
'transport_launch': 500,
},
'visa_requirement': hub['visa'],
'flight_availability': hub['flights'],
})
candidates.sort(key=lambda x: -x['overall_score'])
return {
'vessel': vessel_name or 'Unknown',
'crew_count': crew_count,
'current_location': port['name'],
'recommended_ports': candidates[:5],
'best_option': candidates[0]['port'] if candidates else None,
'note': 'Scores based on airport connectivity, visa requirements, agent availability, and cost. Actual costs depend on crew nationality, flight routes, and local regulations.',
}
# =============================================================================
# FEATURE 10: MARITIME INSURANCE CALCULATOR
# =============================================================================
# War risk / high risk areas
WAR_RISK_ZONES = {
'gulf_of_aden': {'lat_range': (11, 15), 'lon_range': (43, 51), 'premium_pct': 0.05},
'strait_of_hormuz': {'lat_range': (25, 27), 'lon_range': (54, 57), 'premium_pct': 0.03},
'red_sea': {'lat_range': (12, 30), 'lon_range': (32, 44), 'premium_pct': 0.07},
'west_africa_gog': {'lat_range': (-5, 7), 'lon_range': (-5, 10), 'premium_pct': 0.03},
'black_sea': {'lat_range': (41, 47), 'lon_range': (27, 42), 'premium_pct': 0.10},
'south_china_sea': {'lat_range': (5, 22), 'lon_range': (105, 120), 'premium_pct': 0.01},
}
def calculate_maritime_insurance(vessel_name: str = None, imo: str = None,
vessel_type: str = None, dwt: float = None,
year_built: int = None, flag: str = None,
from_port: str = None, to_port: str = None,
cargo_value: float = None, hull_value: float = None) -> Optional[Dict]:
"""Calculate maritime insurance premiums (H&M, P&I, Cargo, War Risk)."""
vtype = (vessel_type or 'bulk').lower()
# Estimate hull value if not provided
if not hull_value and dwt:
age = (datetime.now().year - year_built) if year_built else 10
base_value = dwt * 120 # ~$120/dwt for new vessel
depreciation = max(0.2, 1 - age * 0.04) # 4%/year, min 20% residual
hull_value = round(base_value * depreciation)
hull_value = hull_value or 20000000
cargo_value = cargo_value or 0
# H&M premium (0.15% - 0.45% of hull value)
age = (datetime.now().year - year_built) if year_built else 10
hm_base_rate = 0.002 # 0.2% base
if age > 20:
hm_base_rate += 0.002
elif age > 15:
hm_base_rate += 0.001
elif age > 10:
hm_base_rate += 0.0005
# Flag discount/surcharge
flag_lower = (flag or '').lower()
if flag_lower in ('panama', 'liberia', 'marshall islands', 'bahamas', 'singapore', 'hong kong'):
hm_base_rate *= 0.95 # Reputable open registries
elif flag_lower in SANCTIONED_FLAGS:
hm_base_rate *= 2.0 # Sanctioned flag = major surcharge
hm_premium = round(hull_value * hm_base_rate)
# P&I premium (based on GRT, typically $5-30/GRT)
grt_estimate = round(dwt * 0.6) if dwt else 15000
pi_rate = 8 # $/GRT base
if age > 20:
pi_rate = 15
elif age > 15:
pi_rate = 12
pi_premium = round(grt_estimate * pi_rate)
# Cargo insurance (0.1% - 0.5% of cargo value)
cargo_premium = 0
if cargo_value > 0:
cargo_rate = 0.002 # 0.2% base
cargo_premium = round(cargo_value * cargo_rate)
# War risk premium
war_risk = 0
war_zones_crossed = []
if from_port and to_port:
p1 = resolve_port(from_port)
p2 = resolve_port(to_port)
if p1 and p2:
for zone_name, zone in WAR_RISK_ZONES.items():
lat_min, lat_max = zone['lat_range']
lon_min, lon_max = zone['lon_range']
# Check if route might cross this zone (simplified)
lats = [p1['lat'], p2['lat']]
lons = [p1['lon'], p2['lon']]
if (min(lats) <= lat_max and max(lats) >= lat_min and
min(lons) <= lon_max and max(lons) >= lon_min):
war_zones_crossed.append(zone_name.replace('_', ' ').title())
war_risk += round(hull_value * zone['premium_pct'])
total = hm_premium + pi_premium + cargo_premium + war_risk
risk_factors = []
if age > 20:
risk_factors.append(f'Vessel age ({age} years) increases H&M and P&I premiums')
if war_zones_crossed:
risk_factors.append(f'Route crosses war risk zones: {", ".join(war_zones_crossed)}')
if flag_lower in SANCTIONED_FLAGS:
risk_factors.append(f'Flag state ({flag}) has sanctions exposure')
if not risk_factors:
risk_factors.append('Standard risk profile — no significant surcharges')
return {
'vessel': vessel_name or 'Unknown',
'vessel_type': vtype,
'dwt': dwt,
'age_years': age,
'hull_value_usd': hull_value,
'cargo_value_usd': cargo_value,
'premiums': {
'hull_and_machinery': {'annual_usd': hm_premium, 'rate_pct': round(hm_base_rate * 100, 3)},
'p_and_i': {'annual_usd': pi_premium, 'rate_per_grt': pi_rate, 'grt_estimate': grt_estimate},
'cargo': {'premium_usd': cargo_premium, 'rate_pct': 0.2} if cargo_value else None,
'war_risk': {'premium_usd': war_risk, 'zones': war_zones_crossed} if war_risk else None,
},
'total_annual_premium_usd': total,
'risk_factors': risk_factors,
'recommended_coverage': [
'H&M (Hull & Machinery) — physical damage to vessel',
'P&I (Protection & Indemnity) — third-party liability, crew, pollution',
'Loss of Hire — income protection during off-hire periods',
'FD&D (Freight, Demurrage & Defence) — legal costs',
],
'note': 'Indicative premiums based on vessel profile and standard market rates. Contact insurance broker for firm quotation. Underwriting subject to survey and claims history.',
}
# =============================================================================
# FEATURE 11: PORT COST ESTIMATOR
# =============================================================================
# Regional port cost multipliers (base = 1.0 for average)
def _assign_region(lat, lon):
"""Assign maritime region from coordinates."""
if lon < -100 and lat > 25: return 'USWC'
elif lon < -30 and lat > 25: return 'USEC'
elif lon < -30 and -5 < lat <= 25: return 'CARIB'
elif lon < -70 and lat <= -5: return 'SAW'
elif lon < -30 and lat <= -5: return 'SAE'
elif lon > 100 and lat < -10: return 'AUSNZ'
elif lon > 100: return 'EASIA'
elif 60 < lon <= 100 and lat > 0: return 'SASIA'
elif 40 < lon <= 60 and lat > 10: return 'GULF'
elif 10 < lon < 45 and lat < -20: return 'SAFR'
elif 30 < lon <= 45 and -10 < lat < 20: return 'ERED'
elif -5 < lon < 15 and -5 < lat < 15: return 'WAFR'
elif 25 < lon <= 40 and 40 < lat <= 48: return 'BSEA'
elif lon < 40 and lat > 48: return 'NEUR'
elif -10 <= lon <= 40 and 25 < lat <= 48: return 'MED'
else: return 'OTHER'
PORT_COST_REGIONS = {
'NEUR': 1.3, 'MED': 1.0, 'BSEA': 0.8, 'EASIA': 0.9, 'SASIA': 0.7,
'GULF': 0.85, 'ERED': 0.75, 'USWC': 1.2, 'USEC': 1.25, 'CARIB': 0.9,
'SAE': 0.8, 'SAW': 0.85, 'WAFR': 0.8, 'SAFR': 0.85, 'AUSNZ': 1.15,
'OTHER': 0.9,
}
def estimate_port_costs(port_name: str, vessel_type: str = 'bulk',
dwt: float = None, grt: float = None,
duration_days: float = 3) -> Optional[Dict]:
"""Estimate total port call costs (pilotage, towage, berth, agency, etc.)."""
port = resolve_port(port_name)
if not port:
return None
dwt = dwt or 50000
grt = grt or round(dwt * 0.6)
vtype = (vessel_type or 'bulk').lower()
port_key = port.get('key', '')
region = port.get('region') or _assign_region(port.get('lat', 0), port.get('lon', 0))
multiplier = PORT_COST_REGIONS.get(region, 1.0)
# Base costs (USD) for 50,000 GRT vessel
size_factor = grt / 50000
pilotage_in = round(2500 * size_factor * multiplier)
pilotage_out = round(2500 * size_factor * multiplier)
towage = round(5000 * size_factor * multiplier)
berth_per_day = round(1500 * size_factor * multiplier)
mooring = round(800 * size_factor * multiplier)
port_dues = round(grt * 0.08 * multiplier) # ~$0.08/GRT
light_dues = round(grt * 0.03 * multiplier) # ~$0.03/GRT
agency_fees = round(3000 * multiplier)
waste_disposal = round(500 * multiplier)
fresh_water = round(300 * duration_days)
documentation = round(400 * multiplier)
# Canal surcharges for specific ports
canal_fee = 0
canal_name = None
if port_key in ('port_said', 'suez', 'ismailia'):
canal_fee = round(dwt * 0.12)
canal_name = 'Suez Canal Transit'
elif port_key in ('colon', 'balboa', 'cristobal'):
canal_fee = round(dwt * 0.10)
canal_name = 'Panama Canal Transit'
berth_total = round(berth_per_day * duration_days)
total = (pilotage_in + pilotage_out + towage + berth_total + mooring +
port_dues + light_dues + agency_fees + waste_disposal +
fresh_water + documentation + canal_fee)
return {
'port': port['name'],
'country': port['country'],
'region': region,
'vessel_type': vtype,
'dwt': dwt,
'grt': grt,
'duration_days': duration_days,
'cost_breakdown': {
'pilotage_inbound': pilotage_in,
'pilotage_outbound': pilotage_out,
'towage': towage,
'berth_charges': berth_total,
'mooring_unmooring': mooring,
'port_dues': port_dues,
'light_dues': light_dues,
'agency_fees': agency_fees,
'waste_disposal': waste_disposal,
'fresh_water': fresh_water,
'documentation': documentation,
'canal_transit': {'fee': canal_fee, 'canal': canal_name} if canal_fee else None,
},
'total_estimated_usd': total,
'cost_per_day': round(total / max(duration_days, 1)),
'comparison': f"{'Above' if multiplier > 1.0 else 'Below'} global average (region factor: {multiplier}x)",
'note': 'Estimated based on port region tariffs and vessel size. Actual costs vary by terminal operator, berth availability, and local regulations. Request proforma DA from port agent for exact figures.',
}
# =============================================================================
# FEATURE 12: WEATHER ROUTING
# =============================================================================
# Seasonal weather patterns by region/month
WEATHER_PATTERNS = {
'north_atlantic_winter': {'months': [11, 12, 1, 2, 3], 'lat_range': (35, 65), 'lon_range': (-60, 0),
'risk': 'high', 'wave_m': (4, 8), 'wind_kn': (25, 45), 'speed_loss_pct': 15},
'north_pacific_winter': {'months': [11, 12, 1, 2, 3], 'lat_range': (35, 60), 'lon_range': (140, 180),
'risk': 'high', 'wave_m': (4, 8), 'wind_kn': (25, 45), 'speed_loss_pct': 15},
'indian_ocean_monsoon': {'months': [6, 7, 8, 9], 'lat_range': (0, 25), 'lon_range': (50, 80),
'risk': 'moderate', 'wave_m': (3, 6), 'wind_kn': (20, 35), 'speed_loss_pct': 10},
'typhoon_season_wp': {'months': [7, 8, 9, 10, 11], 'lat_range': (10, 35), 'lon_range': (120, 160),
'risk': 'high', 'wave_m': (3, 10), 'wind_kn': (25, 65), 'speed_loss_pct': 20},
'hurricane_season_carib': {'months': [6, 7, 8, 9, 10, 11], 'lat_range': (10, 30), 'lon_range': (-90, -50),
'risk': 'high', 'wave_m': (3, 10), 'wind_kn': (25, 65), 'speed_loss_pct': 20},
'cape_of_good_hope': {'months': list(range(1, 13)), 'lat_range': (-40, -30), 'lon_range': (15, 30),
'risk': 'moderate', 'wave_m': (3, 7), 'wind_kn': (20, 40), 'speed_loss_pct': 12},
'drake_passage': {'months': list(range(1, 13)), 'lat_range': (-65, -55), 'lon_range': (-70, -55),
'risk': 'high', 'wave_m': (4, 10), 'wind_kn': (30, 50), 'speed_loss_pct': 20},
}
def calculate_weather_routing(from_port: str, to_port: str, vessel_type: str = 'bulk',
dwt: float = None, departure_month: int = None) -> Optional[Dict]:
"""Calculate weather-optimized route vs direct route."""
route = calculate_sea_route(from_port, to_port, vessel_type, dwt)
if not route:
return None
port_from = resolve_port(from_port)
port_to = resolve_port(to_port)
if not port_from or not port_to:
return None
month = departure_month or datetime.now().month
# Check which weather zones the route might cross
weather_risks = []
total_speed_loss = 0
for zone_name, pattern in WEATHER_PATTERNS.items():
if month not in pattern['months']:
continue
lat_min, lat_max = pattern['lat_range']
lon_min, lon_max = pattern['lon_range']
lats = [port_from['lat'], port_to['lat']]
lons = [port_from['lon'], port_to['lon']]
if (min(lats) <= lat_max and max(lats) >= lat_min and
min(lons) <= lon_max and max(lons) >= lon_min):
weather_risks.append({
'zone': zone_name.replace('_', ' ').title(),
'risk_level': pattern['risk'],
'wave_height_m': f"{pattern['wave_m'][0]}-{pattern['wave_m'][1]}m",
'wind_speed_kn': f"{pattern['wind_kn'][0]}-{pattern['wind_kn'][1]} kn",
'speed_reduction': f"{pattern['speed_loss_pct']}%",
})
total_speed_loss = max(total_speed_loss, pattern['speed_loss_pct'])
direct_days = route['total_days']
weather_delay = round(direct_days * total_speed_loss / 100, 1)
optimized_extra_nm = round(route['distance_nm'] * 0.03) if total_speed_loss > 10 else 0
optimized_days = round(direct_days + weather_delay * 0.5 + optimized_extra_nm / (route.get('speed_knots', 14) * 24), 1)
# Fuel comparison
speed = route.get('speed_knots', 14)
fuel_day = VESSEL_FUEL_BENCHMARKS.get('handysize', {}).get('fuel_td', 30)
if dwt:
for _, bench in VESSEL_FUEL_BENCHMARKS.items():
if bench['dwt_range'][0] <= dwt <= bench['dwt_range'][1]:
fuel_day = bench['fuel_td']
break
direct_fuel = round(direct_days * fuel_day)
optimized_fuel = round(optimized_days * fuel_day * 0.9) # Eco speed on longer route
fuel_saving = direct_fuel - optimized_fuel
return {
'from': route['from'],
'to': route['to'],
'departure_month': month,
'direct_route': {
'distance_nm': route['distance_nm'],
'estimated_days': direct_days,
'fuel_consumption_mt': direct_fuel,
'canals': route.get('canals', []),
},
'optimized_route': {
'distance_nm': route['distance_nm'] + optimized_extra_nm,
'estimated_days': optimized_days,
'fuel_consumption_mt': optimized_fuel,
'deviation_nm': optimized_extra_nm,
'strategy': 'Great circle with weather avoidance' if total_speed_loss > 10 else 'Direct route acceptable',
},
'weather_risks': weather_risks,
'savings': {
'time_saved_vs_bad_weather': f"{weather_delay - (optimized_days - direct_days):.1f} days",
'fuel_difference_mt': fuel_saving,
'fuel_cost_difference_usd': round(fuel_saving * BUNKER_PRICE_USD),
},
'recommendations': [
f"{'Route deviation recommended — significant weather risk' if total_speed_loss >= 15 else 'Direct route feasible with monitoring' if total_speed_loss >= 5 else 'Direct route recommended — low weather risk'}",
f"Monitor {'tropical cyclone warnings' if any('typhoon' in r['zone'].lower() or 'hurricane' in r['zone'].lower() for r in weather_risks) else 'weather forecasts'}" if weather_risks else 'Standard weather monitoring sufficient',
],
'note': 'Weather routing based on seasonal patterns. For real-time optimization, subscribe to routing services (StormGeo, DTN, WRI).',
}
# =============================================================================
# FEATURE 13: FIXTURE RECAP GENERATOR
# =============================================================================
def generate_fixture_recap(vessel_name: str, cargo_type: str, tonnage: float,
from_port: str, to_port: str, rate: float,
laycan: str = None, charterer: str = None,
owner: str = None, demurrage_rate: float = None,
commission: float = 3.75) -> Optional[Dict]:
"""Generate a fixture recap summary."""
port_load = resolve_port(from_port)
port_discharge = resolve_port(to_port)
if not port_load or not port_discharge:
return None
route = calculate_sea_route(from_port, to_port)
from datetime import datetime, timedelta
today = datetime.now()
recap_number = f"FIX-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name + cargo_type).encode()).hexdigest()[:8], 16) % 10000:04d}"
if not laycan:
start = today + timedelta(days=10)
end = start + timedelta(days=5)
laycan = f"{start.strftime('%d %b')} - {end.strftime('%d %b %Y')}"
if not demurrage_rate:
demurrage_rate = round(max(8000, min(80000, tonnage * 0.15)))
load_rate = 5000 if (tonnage or 0) <= 50000 else 8000 if (tonnage or 0) <= 100000 else 12000
return {
'recap_number': recap_number,
'date': today.strftime('%Y-%m-%d %H:%M UTC'),
'status': 'ON SUBJECTS — Subject to charterer/owner approval within 24 hours',
'principals': {
'owner': owner or '[Owner / Disponent Owner]',
'charterer': charterer or '[Charterer]',
},
'vessel': {
'name': vessel_name,
'type': cargo_type.title() if cargo_type else 'TBN',
'dwt': tonnage,
'flag': '[TBC]',
'year_built': '[TBC]',
'class': '[TBC]',
},
'cargo': {
'description': cargo_type,
'quantity': f"{tonnage:,.0f} MT" if tonnage else 'TBC',
'tolerance': '5% MOLOO',
},
'loading': {
'port': port_load['name'],
'country': port_load['country'],
'terms': 'FIOS',
'rate': f"{load_rate:,} MT/SHINC",
},
'discharge': {
'port': port_discharge['name'],
'country': port_discharge['country'],
'terms': 'FIOS',
'rate': f"{load_rate:,} MT/SHINC",
},
'laycan': laycan,
'freight': {
'rate': f"${rate:.2f}/MT",
'total': f"${rate * tonnage:,.0f}" if tonnage else 'TBC',
'payment': '95% within 5 banking days of signing B/L, 5% on completion of discharge',
},
'demurrage_despatch': {
'demurrage': f"${demurrage_rate:,.0f}/day pro rata",
'despatch': f"${demurrage_rate / 2:,.0f}/day pro rata (half demurrage)",
},
'commission': f"{commission}% total — {commission - 1.25}% address + 1.25% brokerage",
'voyage_estimate': {
'distance_nm': route['distance_nm'] if route else 0,
'days': route['total_days'] if route else 0,
'via': route.get('canals', []) if route else [],
},
'governing_law': 'English Law, London Arbitration (LMAA)',
'cp_form': 'GENCON 2022 (or applicable standard form)',
'subjects': [
'Subject charterer board approval — 24 hours',
'Subject owner confirmation — 24 hours',
'Subject stem confirmation',
'Subject satisfactory vessel inspection',
],
'note': 'DRAFT RECAP — This is an AI-generated fixture recap template. All terms subject to main charter party agreement.',
}
# =============================================================================
# FEATURE 14: AIS ANOMALY DETECTOR
# FEATURE 15: DARK FLEET DETECTOR
# (Extracted to maritime_compliance.py — re-exported for backward compatibility)
# =============================================================================
from maritime_compliance import STS_HOTSPOTS, detect_ais_anomalies, detect_dark_fleet
# =============================================================================
# TILE FETCHER — curl_cffi (Cloudflare bypass, no browser)
# =============================================================================
def lat_lon_to_tile(lat, lon, zoom):
"""Convert lat/lon to slippy map tile X/Y at given zoom level."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
y = int((1.0 - math.log(math.tan(math.radians(lat)) + 1.0 / math.cos(math.radians(lat))) / math.pi) / 2.0 * n)
return max(0, min(x, n - 1)), max(0, min(y, n - 1))
def _fetch_mt_tile(z, x, y):
"""Fetch single MarineTraffic tile via curl_cffi (TLS fingerprint bypass).
Returns list of raw vessel dicts from tile data, or empty list on failure."""
if not _HAS_CURL_CFFI:
return []
url = f'https://www.marinetraffic.com/getData/get_data_json_4/z:{z}/X:{x}/Y:{y}/station:0'
headers = {
'Referer': f'https://www.marinetraffic.com/en/ais/home/centerx:0/centery:0/zoom:{z}',
'X-Requested-With': 'XMLHttpRequest',
}
try:
resp = cf_requests.get(url, headers=headers, impersonate='chrome', timeout=15)
if resp.status_code == 200:
data = resp.json()
if isinstance(data, dict):
rows = data.get('data', data)
if isinstance(rows, dict):
rows = rows.get('rows', [])
return rows if isinstance(rows, list) else []
return []
except Exception as e:
logger.debug(f"Tile z:{z}/X:{x}/Y:{y} fetch error: {e}")
return []
LIVE_SCAN_ZOOM = 3 # Free MT API limit: z:2-4 only. z:3 = best per-tile density.
def live_scan_area(lat, lon, radius_nm, zoom=LIVE_SCAN_ZOOM):
"""Live scan MarineTraffic tiles covering area around (lat, lon).
Uses curl_cffi to bypass Cloudflare — no browser needed.
Free MT API works at z:2-4 only (z:5+ requires Pro login).
At z:3: 1-4 tiles per port query, ~1 second.
Returns list of raw MT vessel row dicts (SHIP_ID, SHIPNAME, LAT, LON, etc).
"""
if not _HAS_CURL_CFFI:
logger.warning("curl_cffi not available, live_scan_area disabled")
return []
delta_lat = radius_nm / 60.0
cos_lat = max(math.cos(math.radians(lat)), 0.01)
delta_lon = radius_nm / (60.0 * cos_lat)
x_min, y_min = lat_lon_to_tile(lat + delta_lat, lon - delta_lon, zoom) # NW
x_max, y_max = lat_lon_to_tile(lat - delta_lat, lon + delta_lon, zoom) # SE
all_rows = []
tile_count = 0
for x in range(x_min, x_max + 1):
for y in range(y_min, y_max + 1):
rows = _fetch_mt_tile(zoom, x, y)
all_rows.extend(rows)
tile_count += 1
logger.info(f"live_scan_area({lat:.1f}, {lon:.1f}, {radius_nm}nm): "
f"{tile_count} tiles z{zoom}, {len(all_rows)} raw vessels")
return all_rows
# =============================================================================
# CONVENIENCE FUNCTIONS
# =============================================================================
_parser = None
def get_parser() -> MarineTrafficParser:
"""Get singleton parser instance"""
global _parser
if _parser is None:
_parser = MarineTrafficParser()
return _parser
def search_vessel(query: str) -> List[Dict]:
"""Quick vessel search"""
return get_parser().search_vessel_public(query)
def get_vessel(mmsi: str) -> Dict:
"""Get vessel details"""
return get_parser().get_vessel_page(mmsi)
if __name__ == "__main__":
# Test
parser = MarineTrafficParser()
logger.info("API key configured: %s", parser.has_api_key())
# Test public search
logger.info("Searching for 'MAERSK'...")
results = parser.search_vessel_public("MAERSK")
for v in results[:5]:
logger.info(" - %s", v)
# Test port resolution
logger.info("Port resolution test:")
for name in ["Rotterdam", "dubai", "SGSIN", "pusan"]:
port = resolve_port(name)
if port:
logger.info(f" {name} -> {port['name']} ({port['country']}) [{port['lat']}, {port['lon']}]")
else:
logger.info(f" {name} -> NOT FOUND")