#!/usr/bin/env python3 """ MarineTraffic Data Parser & Maritime Tools Web scraping, AIS data, ports DB (16,553 ports), routing engine, cargo matching, and 15+ premium maritime tool helpers. Logical modules (split candidates for future refactoring): 1. PORTS DATABASE (lines ~31-120) — _load_world_ports, WORLD_PORTS, PORT_ALIASES 2. CARGO CLASSIFICATION (lines ~123-175) — classify_cargo, CARGO_MAP 3. VESSEL CLASSIFICATION (lines ~176-328) — classify_vessel_type/subtype, resolve_port, find_nearby_ports 4. SCRAPER CLASS (lines ~329-750) — MarineTrafficParser (web scraping) 5. ROUTING ENGINE (lines ~751-1169) — calculate_sea_route, fuel/canal/port cost estimators 6. CARGO MATCHING (lines ~1170-1370) — fixture_match 7. FREIGHT RATES (lines ~1371-1449) — estimate_freight_rate 8. SANCTIONS SCREENING (lines ~1450-1596) — screen_sanctions 9. PORT CONGESTION (lines ~1598-1703) — estimate_port_congestion 10. BUNKER PRICES (lines ~1704-1891) — get_bunker_prices, optimize_bunker_route 11. CHARTER PARTY (lines ~1892-2002) — generate_charter_party 12. VESSEL PERFORMANCE (lines ~2003-2135) — analyze_vessel_performance 13. BILL OF LADING (lines ~2136-2215) — generate_bill_of_lading 14. CREW CHANGE (lines ~2216-2322) — optimize_crew_change 15. INSURANCE (lines ~2323-2449) — calculate_maritime_insurance 16. PORT COSTS (lines ~2445-2553) — estimate_port_costs 17. WEATHER ROUTING (lines ~2554-2661) — calculate_weather_routing 18. FIXTURE RECAP (lines ~2662-2753) — generate_fixture_recap 19. AIS ANOMALY (lines ~2754-2890) — detect_ais_anomalies 20. DARK FLEET (lines ~2891-3081) — detect_dark_fleet 21. MODULE API (lines ~3082-end) — get_parser, search_vessel, get_vessel Ɉ MONTANA PROTOCOL — ML-DSA-65 (FIPS 204) """ import os import re import json import hashlib import logging import math import time import requests try: from curl_cffi import requests as cf_requests _HAS_CURL_CFFI = True except ImportError: _HAS_CURL_CFFI = False from datetime import datetime from typing import Optional, Dict, List from bs4 import BeautifulSoup logger = logging.getLogger('marinetraffic_parser') # API key from environment (when available) MT_API_KEY = os.environ.get("MARINETRAFFIC_API_KEY") # Base URLs MT_BASE = "https://www.marinetraffic.com" MT_API_BASE = "https://services.marinetraffic.com/api" # ============================================================================= # WORLD PORTS DATABASE (~2000 ports loaded from JSON) # ============================================================================= def _load_world_ports(): """Load ports from world_ports.json, fall back to minimal built-in set.""" _ports_json = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'world_ports.json') if os.path.exists(_ports_json): try: with open(_ports_json, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load world_ports.json: {e}") # Minimal fallback (top 10 ports) return { 'rotterdam': {'name': 'Rotterdam', 'country': 'Netherlands', 'unlocode': 'NLRTM', 'lat': 51.9244, 'lon': 4.4777, 'radius_nm': 15, 'size': 'large'}, 'singapore': {'name': 'Singapore', 'country': 'Singapore', 'unlocode': 'SGSIN', 'lat': 1.2644, 'lon': 103.82, 'radius_nm': 20, 'size': 'large'}, 'shanghai': {'name': 'Shanghai', 'country': 'China', 'unlocode': 'CNSHA', 'lat': 31.3622, 'lon': 121.5882, 'radius_nm': 20, 'size': 'large'}, 'houston': {'name': 'Houston', 'country': 'USA', 'unlocode': 'USHOU', 'lat': 29.7262, 'lon': -95.0102, 'radius_nm': 20, 'size': 'large'}, 'busan': {'name': 'Busan', 'country': 'South Korea', 'unlocode': 'KRPUS', 'lat': 35.0979, 'lon': 129.0371, 'radius_nm': 15, 'size': 'large'}, 'jebel_ali': {'name': 'Jebel Ali', 'country': 'UAE', 'unlocode': 'AEJEA', 'lat': 25.0117, 'lon': 55.0637, 'radius_nm': 15, 'size': 'large'}, 'new_york': {'name': 'New York', 'country': 'USA', 'unlocode': 'USNYC', 'lat': 40.6701, 'lon': -74.0376, 'radius_nm': 15, 'size': 'large'}, 'antwerp': {'name': 'Antwerp', 'country': 'Belgium', 'unlocode': 'BEANR', 'lat': 51.2322, 'lon': 4.3986, 'radius_nm': 12, 'size': 'large'}, 'mumbai': {'name': 'Mumbai', 'country': 'India', 'unlocode': 'INBOM', 'lat': 18.94, 'lon': 72.835, 'radius_nm': 15, 'size': 'large'}, 'durban': {'name': 'Durban', 'country': 'South Africa', 'unlocode': 'ZADUR', 'lat': -29.8674, 'lon': 31.0386, 'radius_nm': 12, 'size': 'large'}, } WORLD_PORTS = _load_world_ports() PORT_ALIASES = { 'europoort': 'rotterdam', 'hook of holland': 'rotterdam', 'changi': 'singapore', 'tanjong pagar': 'singapore', 'jurong': 'singapore', 'pusan': 'busan', 'shenzhen': 'hong_kong', 'yantian': 'hong_kong', 'dubai': 'jebel_ali', 'dp world': 'jebel_ali', 'abu dhabi': 'khalifa_port', 'new jersey': 'new_york', 'newark': 'new_york', 'bayonne': 'new_york', 'tacoma': 'seattle', 'la': 'los_angeles', 'san pedro': 'los_angeles', 'lb': 'long_beach', 'bombay': 'mumbai', 'nhava sheva': 'mumbai', 'jnpt': 'mumbai', 'madras': 'chennai', 'saigon': 'ho_chi_minh', 'tangier': 'tanger_med', 'cape of good hope': 'cape_town', 'johor': 'tanjung_pelepas', 'yangshan': 'shanghai', 'apapa': 'lagos_apapa', 'tin can island': 'tin_can_island', 'sao paulo': 'santos', # Caspian Sea aliases 'alat port': 'alat', 'baku new port': 'alat', 'port of baku': 'alat', 'krasnovodsk': 'turkmenbashi', 'krw': 'turkmenbashi', 'anzali': 'bandar_anzali', 'bandar-e anzali': 'bandar_e_anzali', 'guryev': 'atyrau', 'noshahr': 'nowshahr', # Russian port names (пользователь пишет на русском) 'баку': 'baku', 'алят': 'alat', 'актау': 'aktau', 'астрахань': 'astrakhan', 'махачкала': 'makhachkala', 'туркменбаши': 'turkmenbashi', 'красноводск': 'turkmenbashi', 'энзели': 'bandar_anzali', 'анзали': 'bandar_anzali', 'ноушехр': 'nowshahr', 'амирабад': 'amirabad', 'нека': 'neka', 'баутино': 'bautino', 'курык': 'kuryk', 'оля': 'olya', 'роттердам': 'rotterdam', 'сингапур': 'singapore', 'шанхай': 'shanghai', 'стамбул': 'istanbul', 'гонконг': 'hong_kong', 'дубай': 'jebel_ali', 'нью-йорк': 'new_york', 'лондон': 'london', 'гамбург': 'hamburg', 'антверпен': 'antwerp', 'пирей': 'piraeus', 'генуя': 'genoa', 'барселона': 'barcelona', 'марсель': 'marseille', 'одесса': 'odessa', 'новороссийск': 'novorossiysk', 'санкт-петербург': 'saint_petersburg', 'мумбаи': 'mumbai', 'бомбей': 'mumbai', 'токио': 'tokyo', 'пусан': 'busan', 'иокогама': 'yokohama', 'сидней': 'sydney', 'джидда': 'jeddah', 'джебель-али': 'jebel_ali', 'суэц': 'suez', 'порт-саид': 'port_said', 'хьюстон': 'houston', 'лос-анджелес': 'los_angeles', 'сантус': 'santos', 'буэнос-айрес': 'buenos_aires', 'кейптаун': 'cape_town', 'лагос': 'lagos_apapa', 'дар-эс-салам': 'dar_es_salaam', # Legacy space-key aliases for backward compat 'jebel ali': 'jebel_ali', 'hong kong': 'hong_kong', 'new york': 'new_york', 'los angeles': 'los_angeles', 'long beach': 'long_beach', 'ho chi minh': 'ho_chi_minh', 'tanger med': 'tanger_med', 'cape town': 'cape_town', 'tanjung pelepas': 'tanjung_pelepas', 'port klang': 'port_klang', 'le havre': 'le_havre', 'dar es salaam': 'dar_es_salaam', 'port said': 'port_said', 'richards bay': 'richards_bay', 'khalifa port': 'khalifa_port', 'ras tanura': 'ras_tanura', 'bandar abbas': 'bandar_abbas', 'buenos aires': 'buenos_aires', 'new orleans': 'new_orleans', 'st petersburg': 'saint_petersburg', 'laem chabang': 'laem_chabang', 'gioia tauro': 'gioia_tauro', 'port hedland': 'port_hedland', } # ============================================================================= # VESSEL TYPE & NAVIGATION STATUS MAPPINGS # ============================================================================= # Cargo → vessel type mapping CARGO_TO_VESSEL = { # Dry bulk 'grain': 'bulk', 'wheat': 'bulk', 'corn': 'bulk', 'rice': 'bulk', 'barley': 'bulk', 'soybean': 'bulk', 'soybeans': 'bulk', 'coal': 'bulk', 'iron ore': 'bulk', 'ore': 'bulk', 'bauxite': 'bulk', 'phosphate': 'bulk', 'fertilizer': 'bulk', 'cement': 'bulk', 'sugar': 'bulk', 'salt': 'bulk', 'steel': 'bulk', 'scrap': 'bulk', 'clinker': 'bulk', 'minerals': 'bulk', 'aggregate': 'bulk', 'sand': 'bulk', 'gravel': 'bulk', 'flour': 'general', 'meal': 'general', 'feed': 'bulk', 'animal feed': 'bulk', # Liquid 'crude oil': 'tanker', 'crude': 'tanker', 'oil': 'tanker', 'petroleum': 'tanker', 'diesel': 'tanker', 'gasoline': 'tanker', 'fuel oil': 'tanker', 'naphtha': 'tanker', 'chemicals': 'tanker', 'palm oil': 'tanker', 'vegetable oil': 'tanker', 'lng': 'tanker', 'lpg': 'tanker', 'methanol': 'tanker', # Container 'containers': 'container', 'container': 'container', 'teu': 'container', 'electronics': 'container', 'machinery': 'container', 'furniture': 'container', 'clothing': 'container', 'textiles': 'container', 'consumer goods': 'container', 'manufactured goods': 'container', 'general merchandise': 'container', # Ro-Ro / vehicles 'cars': 'roro', 'vehicles': 'roro', 'trucks': 'roro', 'automobiles': 'roro', 'heavy equipment': 'roro', 'tractors': 'roro', # General 'timber': 'general', 'lumber': 'general', 'wood': 'general', 'plywood': 'general', 'project cargo': 'general', 'breakbulk': 'general', 'pipes': 'general', } def classify_cargo(cargo_description: str) -> Optional[str]: """Classify cargo description to vessel type category.""" if not cargo_description: return None desc = cargo_description.lower().strip() # Direct match if desc in CARGO_TO_VESSEL: return CARGO_TO_VESSEL[desc] # Partial match for keyword, vtype in CARGO_TO_VESSEL.items(): if keyword in desc: return vtype return None # AIS ship type codes → category VESSEL_TYPE_CATEGORIES = { 'bulk': ['bulk carrier', 'bulk', 'ore carrier'], 'tanker': ['tanker', 'oil tanker', 'chemical tanker', 'oil/chemical tanker', 'lng tanker', 'lpg tanker', 'crude oil tanker'], 'container': ['container ship', 'container', 'containership'], 'general': ['general cargo', 'cargo', 'multipurpose', 'general cargo ship'], 'passenger': ['passenger', 'cruise', 'passenger ship', 'cruise ship', 'ferry', 'ro-ro/passenger'], 'roro': ['ro-ro', 'roro', 'vehicles carrier', 'car carrier', 'ro-ro cargo'], 'offshore': ['offshore', 'supply vessel', 'platform', 'anchor handling', 'fpso', 'offshore supply ship'], 'tug': ['tug', 'tugboat', 'towing', 'pusher tug', 'pilot'], 'fishing': ['fishing', 'fishing vessel', 'trawler'], 'highspeed': ['high speed', 'high-speed', 'hsc', 'hydrofoil', 'wing in ground'], 'pleasure': ['pleasure', 'yacht', 'sailing yacht', 'motor yacht'], 'sailing': ['sailing vessel', 'sailing'], 'military': ['military', 'naval', 'warship', 'patrol'], } # AIS type codes (numeric) → category AIS_TYPE_CODE_MAP = { range(70, 80): 'cargo', # 70-79: Cargo, general cargo range(80, 90): 'tanker', # 80-89: Tanker range(60, 70): 'passenger', # 60-69: Passenger range(40, 50): 'highspeed', # 40-49: High-speed craft range(30, 30+1): 'fishing', # 30: Fishing range(31, 33): 'tug', # 31-32: Towing range(50, 55): 'offshore', # 50-54: Pilot/SAR/port tender range(35, 36): 'military', # 35: Military range(36, 37): 'sailing', # 36: Sailing range(37, 38): 'pleasure', # 37: Pleasure craft } # AIS navigation status codes NAV_STATUS_MAP = { 0: 'underway', # Under way using engine 1: 'at anchor', # At anchor 2: 'not under command', 3: 'restricted maneuverability', 4: 'constrained by draught', 5: 'moored', # Moored 6: 'aground', 7: 'fishing', # Engaged in fishing 8: 'underway sailing', # Under way sailing 14: 'ais-sart', 15: 'undefined', } # ============================================================================= # VESSEL SUBTYPES BY DWT (industry standard classifications) # ============================================================================= VESSEL_SUBTYPES = { 'bulk': [ {'name': 'River-Sea', 'dwt_min': 1000, 'dwt_max': 9999, 'typical_dwt': 5000}, {'name': 'Handysize', 'dwt_min': 10000, 'dwt_max': 39999, 'typical_dwt': 28000}, {'name': 'Handymax', 'dwt_min': 40000, 'dwt_max': 49999, 'typical_dwt': 45000}, {'name': 'Supramax', 'dwt_min': 50000, 'dwt_max': 64999, 'typical_dwt': 58000}, {'name': 'Panamax', 'dwt_min': 65000, 'dwt_max': 99999, 'typical_dwt': 75000}, {'name': 'Capesize', 'dwt_min': 100000, 'dwt_max': 199999, 'typical_dwt': 180000}, {'name': 'VLOC', 'dwt_min': 200000, 'dwt_max': 400000, 'typical_dwt': 300000}, ], 'tanker': [ {'name': 'River-Sea Tanker', 'dwt_min': 1000, 'dwt_max': 24999, 'typical_dwt': 7000}, {'name': 'MR (Medium Range)', 'dwt_min': 25000, 'dwt_max': 54999, 'typical_dwt': 45000}, {'name': 'LR1', 'dwt_min': 55000, 'dwt_max': 79999, 'typical_dwt': 73000}, {'name': 'Aframax', 'dwt_min': 80000, 'dwt_max': 119999, 'typical_dwt': 105000}, {'name': 'Suezmax', 'dwt_min': 120000, 'dwt_max': 199999, 'typical_dwt': 160000}, {'name': 'VLCC', 'dwt_min': 200000, 'dwt_max': 320000, 'typical_dwt': 300000}, ], 'container': [ {'name': 'Feeder', 'dwt_min': 5000, 'dwt_max': 24999, 'teu_range': '500-2,500 TEU', 'typical_dwt': 15000}, {'name': 'Feedermax', 'dwt_min': 25000, 'dwt_max': 39999, 'teu_range': '2,500-5,000 TEU', 'typical_dwt': 33000}, {'name': 'Panamax', 'dwt_min': 40000, 'dwt_max': 64999, 'teu_range': '5,000-8,000 TEU', 'typical_dwt': 55000}, {'name': 'Post-Panamax', 'dwt_min': 65000, 'dwt_max': 99999, 'teu_range': '8,000-12,000 TEU', 'typical_dwt': 80000}, {'name': 'Neo-Panamax', 'dwt_min': 100000, 'dwt_max': 149999, 'teu_range': '12,000-15,000 TEU', 'typical_dwt': 120000}, {'name': 'ULCV', 'dwt_min': 150000, 'dwt_max': 250000, 'teu_range': '15,000-24,000 TEU', 'typical_dwt': 200000}, ], 'general': [ {'name': 'Small General Cargo', 'dwt_min': 1000, 'dwt_max': 9999, 'typical_dwt': 5000}, {'name': 'General Cargo', 'dwt_min': 10000, 'dwt_max': 25000, 'typical_dwt': 15000}, ], } def classify_vessel_subtype(vessel_type: str, dwt: float = None) -> Optional[Dict]: """ Classify vessel into subtype by DWT. Returns {'subtype': 'Panamax', 'typical_dwt': 75000} or None. """ if not vessel_type: return None vtype = vessel_type.lower().strip() # Map common synonyms for category, keywords in VESSEL_TYPE_CATEGORIES.items(): for kw in keywords: if kw in vtype: vtype = category break else: continue break subtypes = VESSEL_SUBTYPES.get(vtype) if not subtypes: return None if dwt: try: dwt_val = float(dwt) for st in subtypes: if st['dwt_min'] <= dwt_val <= st['dwt_max']: return {**st, 'category': vtype} except (ValueError, TypeError): pass # If DWT out of range, return closest return {**subtypes[len(subtypes) // 2], 'category': vtype, 'estimated': True} # No DWT — return mid-range default mid = subtypes[len(subtypes) // 2] return {**mid, 'category': vtype, 'estimated': True} def classify_vessel_type(type_str: str, type_code=None) -> str: """Classify vessel type string into a standard category.""" if type_str: t = type_str.lower().strip() for category, keywords in VESSEL_TYPE_CATEGORIES.items(): for kw in keywords: if kw in t: return category if type_code is not None: try: code = int(type_code) for code_range, category in AIS_TYPE_CODE_MAP.items(): if code in code_range: return category except (ValueError, TypeError): pass return 'other' def get_destination_patterns(port: dict) -> List[str]: """Generate AIS destination search patterns for a port. AIS destination is free-text entered by crew, e.g. "BOSTON", "US BOS", "USBOS". We generate multiple patterns to maximize matching chances. Args: port: dict from resolve_port() with 'name', 'unlocode', 'country', etc. Returns: list of uppercase pattern strings for SQL LIKE queries """ patterns = set() name = (port.get('name') or '').upper().strip() unlocode = (port.get('unlocode') or '').upper().strip() if name and len(name) >= 4: patterns.add(name) # "BOSTON" if unlocode and len(unlocode) >= 4: patterns.add(unlocode) # "USBOS" # Split LOCODE: country "US" + loc "BOS" if len(unlocode) >= 5: country = unlocode[:2] loc_part = unlocode[2:] patterns.add(f"{country} {loc_part}") # "US BOS" return list(patterns) def find_nearby_ports(lat: float, lon: float, radius_nm: float = 100) -> List[Dict]: """Find ports within radius_nm nautical miles from given coordinates.""" nearby = [] for key, port in WORLD_PORTS.items(): # Approximate distance in NM (1 degree lat ≈ 60 NM) dlat = abs(port['lat'] - lat) * 60 dlon = abs(port['lon'] - lon) * 60 * max(math.cos(math.radians(lat)), 0.01) dist_nm = math.sqrt(dlat**2 + dlon**2) if dist_nm <= radius_nm: nearby.append({**port, 'key': key, 'distance_nm': round(dist_nm, 1)}) nearby.sort(key=lambda p: p['distance_nm']) return nearby def resolve_port(query: str) -> Optional[Dict]: """Resolve port name to port info dict. Supports exact, alias, partial, and UNLOCODE match. Always includes 'key' in the returned dict for reverse lookups.""" if not query: return None q = query.lower().strip() q_under = q.replace(' ', '_').replace('-', '_').replace("'", "") def _with_key(key): port = WORLD_PORTS[key] if 'key' not in port: return {**port, 'key': key} return port # Exact match (spaces or underscores) if q in WORLD_PORTS: return _with_key(q) if q_under in WORLD_PORTS: return _with_key(q_under) # Alias match if q in PORT_ALIASES: alias_key = PORT_ALIASES[q] if alias_key in WORLD_PORTS: return _with_key(alias_key) # Partial match (query in port key or port key in query) for key, port in WORLD_PORTS.items(): if q_under in key or key in q_under: return _with_key(key) # Match by port name (case-insensitive) for key, port in WORLD_PORTS.items(): if q in port['name'].lower(): return _with_key(key) # Match by UNLOCODE q_upper = q.upper() for key, port in WORLD_PORTS.items(): if port.get('unlocode') == q_upper: return _with_key(key) return None def _generate_totp(secret: str, digits: int = 6, interval: int = 30) -> str: """Generate TOTP 6-digit code from base32 secret (RFC 6238). No external deps.""" import hmac import hashlib import struct import base64 # Normalize base32 (uppercase, pad to multiple of 8) secret_clean = secret.upper().replace(' ', '') pad = (-len(secret_clean)) % 8 key = base64.b32decode(secret_clean + '=' * pad) counter = int(time.time()) // interval msg = struct.pack('>Q', counter) h = hmac.new(key, msg, hashlib.sha1).digest() offset = h[-1] & 0x0f code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff return str(code % (10 ** digits)).zfill(digits) class MarineTrafficParser: """Parser for MarineTraffic public data + API""" def __init__(self, api_key: str = None, totp_secret: str = None): self.api_key = api_key or MT_API_KEY self.totp_secret = totp_secret # Google Authenticator 2FA secret (base32) # Use curl_cffi to impersonate Chrome TLS fingerprint → bypasses Cloudflare if _HAS_CURL_CFFI: self.session = cf_requests.Session(impersonate="chrome120") logger.info("Using curl_cffi Chrome impersonation (Cloudflare bypass)") else: self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' }) # ========================================================================= # PUBLIC DATA (no API key needed) # ========================================================================= def search_vessel_public(self, query: str) -> List[Dict]: """ Search vessel using public search Returns basic info: name, MMSI, IMO, type, flag """ url = f"{MT_BASE}/en/ais/index/search/all/keyword:{query}" try: resp = self.session.get(url, timeout=10) if resp.status_code != 200: return [] soup = BeautifulSoup(resp.text, 'html.parser') results = [] # Parse search results for item in soup.select('.search-result-item, .vessel-item'): vessel = {} # Name name_el = item.select_one('.vessel-name, .ship-name, a[href*="/vessels/"]') if name_el: vessel['name'] = name_el.get_text(strip=True) href = name_el.get('href', '') # Extract MMSI from URL if '/vessels/' in href: parts = href.split('/') for i, p in enumerate(parts): if p == 'vessels' and i + 1 < len(parts): vessel['mmsi'] = parts[-1].split('-')[0] # Type type_el = item.select_one('.ship-type, .vessel-type') if type_el: vessel['type'] = type_el.get_text(strip=True) # Flag flag_el = item.select_one('.flag, [class*="flag-"]') if flag_el: vessel['flag'] = flag_el.get('title', '') or flag_el.get_text(strip=True) if vessel.get('name'): results.append(vessel) return results except Exception as e: logger.error(f"Search error: {e}") return [] def get_vessel_page(self, mmsi: str) -> Dict: """ Get vessel details from public page """ url = f"{MT_BASE}/en/ais/details/ships/mmsi:{mmsi}" try: resp = self.session.get(url, timeout=10) if resp.status_code != 200: return {} soup = BeautifulSoup(resp.text, 'html.parser') vessel = {'mmsi': mmsi} # Parse vessel details # Name name_el = soup.select_one('h1.title, .vessel-name') if name_el: vessel['name'] = name_el.get_text(strip=True) # Details table for row in soup.select('.vessel-details tr, .details-table tr'): cells = row.select('td') if len(cells) >= 2: key = cells[0].get_text(strip=True).lower() value = cells[1].get_text(strip=True) if 'imo' in key: vessel['imo'] = value elif 'mmsi' in key: vessel['mmsi'] = value elif 'call sign' in key: vessel['callsign'] = value elif 'flag' in key: vessel['flag'] = value elif 'type' in key: vessel['type'] = value elif 'length' in key: vessel['length'] = self._parse_number(value) elif 'width' in key or 'beam' in key: vessel['width'] = self._parse_number(value) elif 'draught' in key or 'draft' in key: vessel['draught'] = self._parse_number(value) elif 'gross tonnage' in key: vessel['gross_tonnage'] = self._parse_number(value) elif 'deadweight' in key: vessel['deadweight'] = self._parse_number(value) elif 'year built' in key: vessel['year_built'] = self._parse_number(value) # Current position pos_el = soup.select_one('.position-data, [data-lat], [data-lon]') if pos_el: vessel['latitude'] = pos_el.get('data-lat') vessel['longitude'] = pos_el.get('data-lon') # Last position from text pos_text = soup.select_one('.last-position, .position-info') if pos_text: text = pos_text.get_text() # Parse coordinates from text lat_match = re.search(r'(\d+\.\d+)[°\s]*[NS]', text) lon_match = re.search(r'(\d+\.\d+)[°\s]*[EW]', text) if lat_match: vessel['latitude'] = float(lat_match.group(1)) if lon_match: vessel['longitude'] = float(lon_match.group(1)) return vessel except Exception as e: logger.error(f"Page parse error: {e}") return {} # ========================================================================= # MT PRO AUTHENTICATION & OWNERSHIP DATA # ========================================================================= def login(self, email: str, password: str, totp_secret: str = None) -> bool: """Authenticate to MarineTraffic Pro. Returns True on success. Handles email+password login + optional Google Authenticator 2FA (TOTP). totp_secret: base32 secret from Google Authenticator (overrides self.totp_secret).""" totp_key = totp_secret or self.totp_secret _HDRS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', } try: # Step 1: GET login page to extract CSRF token resp = self.session.get(f'{MT_BASE}/en/users/login', headers=_HDRS, timeout=15) if resp.status_code != 200: logger.error(f"MT login page error: {resp.status_code}") return False soup = BeautifulSoup(resp.text, 'html.parser') csrf_el = soup.select_one('input[name="authenticity_token"]') csrf_token = csrf_el['value'] if csrf_el else '' if not csrf_token: meta_csrf = soup.select_one('meta[name="csrf-token"]') if meta_csrf: csrf_token = meta_csrf.get('content', '') # Step 2: POST login credentials login_resp = self.session.post( f'{MT_BASE}/en/users/login', data={ 'user[email]': email, 'user[password]': password, 'authenticity_token': csrf_token, 'commit': 'Log in', }, headers={**_HDRS, 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': f'{MT_BASE}/en/users/login', 'Origin': MT_BASE}, allow_redirects=True, timeout=20 ) # Step 3: Check if 2FA is required page_lower = login_resp.text.lower() otp_indicators = ['two-factor', 'two_factor', 'otp', 'authenticator', 'verification code', 'authentication code', '2fa', 'totp'] needs_2fa = any(ind in page_lower for ind in otp_indicators) if needs_2fa and totp_key: logger.info("MT requires 2FA — generating TOTP code...") otp_code = _generate_totp(totp_key) logger.info(f"TOTP code: {otp_code}") # Extract fresh CSRF for 2FA form soup2 = BeautifulSoup(login_resp.text, 'html.parser') csrf_el2 = soup2.select_one('input[name="authenticity_token"]') csrf2 = csrf_el2['value'] if csrf_el2 else csrf_token if not csrf2: meta2 = soup2.select_one('meta[name="csrf-token"]') if meta2: csrf2 = meta2.get('content', csrf_token) # Try common 2FA form field names otp_fields = ['user[otp_attempt]', 'otp_attempt', 'code', 'token', 'user[two_factor_code]', 'two_factor_code'] # Detect actual field name from form otp_input = soup2.select_one('input[type="text"][name*="otp"], input[type="number"][name*="code"], input[name*="otp"], input[name*="code"], input[name*="token"]') if otp_input and otp_input.get('name'): otp_fields = [otp_input['name']] + otp_fields # POST 2FA code two_fa_url = login_resp.url # might have redirected to /users/two_factor for field_name in otp_fields[:2]: otp_resp = self.session.post( two_fa_url, data={field_name: otp_code, 'authenticity_token': csrf2, 'commit': 'Verify'}, headers={**_HDRS, 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': two_fa_url, 'Origin': MT_BASE}, allow_redirects=True, timeout=20 ) page_lower = otp_resp.text.lower() success_indicators = ['sign_out', 'logout', 'profile', 'my-fleet', 'subscription'] if any(ind in page_lower for ind in success_indicators): logger.info(f"MT Pro login + 2FA successful for {email}") return True if 'invalid' in page_lower or 'incorrect' in page_lower: logger.error(f"2FA code rejected (field={field_name}). Code was: {otp_code}") break logger.warning("2FA submission failed — checking cookies anyway") elif needs_2fa and not totp_key: logger.error("MT requires 2FA but no TOTP secret provided! Use --totp-secret") return False # Step 4: Verify login success success_indicators = ['sign_out', 'logout', 'profile', 'my-fleet', 'subscription'] if any(ind in page_lower for ind in success_indicators): logger.info(f"MT Pro login successful for {email}") return True # Check cookies for session marker cookie_names = [c.name for c in self.session.cookies] logger.debug(f"Session cookies after login: {cookie_names}") if any(c in cookie_names for c in ['remember_user_token', '_mt_session', 'user_credentials']): logger.info(f"MT Pro login successful (cookie check) for {email}") return True logger.warning(f"MT Pro login status uncertain for {email} — continuing anyway") return True # Optimistic: let vessel fetches confirm except Exception as e: logger.error(f"MT Pro login error: {e}") return False def get_vessel_ownership(self, mmsi: str) -> dict: """Scrape full vessel page including Ownership section (requires MT Pro login). Returns vessel specs + ownership dict with owner, operator, manager, etc.""" url = f'{MT_BASE}/en/ais/details/ships/mmsi:{mmsi}' try: resp = self.session.get( url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', 'Referer': MT_BASE, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', }, timeout=20 ) if resp.status_code == 404: return {} if resp.status_code != 200: logger.warning(f"MT vessel page {mmsi}: HTTP {resp.status_code}") return {} return self._parse_vessel_full_page(resp.text, mmsi) except Exception as e: logger.error(f"get_vessel_ownership({mmsi}): {e}") return {} def _parse_vessel_full_page(self, html: str, mmsi: str) -> dict: """Parse full MT vessel detail page: specs (name/flag/DWT/year) + Ownership section.""" soup = BeautifulSoup(html, 'html.parser') result = {'mmsi': mmsi} # --- Vessel name --- for sel in ['h1.title', '.vessel-name', 'h1', '.shipname']: el = soup.select_one(sel) if el: name = el.get_text(strip=True) if name and len(name) > 1: result['name'] = name break # --- Vessel specs table --- # MT Pro shows details in various table formats for row in soup.select('table tr, .vessel-details tr, dl dt, .info-table tr'): cells = row.select('td, dd, th') if len(cells) < 2: # Try dt/dd pairs label_el = row if row.name == 'dt' else None if label_el: val_el = label_el.find_next_sibling('dd') if val_el: key = label_el.get_text(strip=True).lower() val = val_el.get_text(strip=True) self._apply_vessel_field(result, key, val) continue key = cells[0].get_text(strip=True).lower() val = cells[1].get_text(strip=True) self._apply_vessel_field(result, key, val) # Also try JSON-LD structured data (MT sometimes embeds this) for script in soup.select('script[type="application/ld+json"]'): try: data = json.loads(script.string or '{}') if data.get('@type') == 'Product' or 'vessel' in str(data).lower(): if data.get('name') and not result.get('name'): result['name'] = data['name'] except Exception: pass # --- Ownership section --- companies = [] ownership_role_map = { 'beneficial owner': 'beneficial_owner', 'registered owner': 'registered_owner', 'commercial manager': 'commercial_manager', 'disponent owner': 'commercial_manager', 'ship manager': 'commercial_manager', 'operator': 'operator', 'charterer': 'operator', 'technical manager': 'operator', } # Try multiple selectors MT uses for ownership tables ownership_rows = soup.select( '[data-id="ownership"] tr, ' '.ownership-widget tr, ' '.widget-body table tr, ' '#ownership tr, ' '.ownership tr, ' 'section.ownership table tr' ) # Also try finding ownership by header text if not ownership_rows: for h in soup.find_all(['h2', 'h3', 'h4', 'div'], string=re.compile(r'[Oo]wnership')): container = h.find_next('table') if container: ownership_rows = container.select('tr') break for row in ownership_rows: cells = row.select('td') if len(cells) < 2: continue role_text = cells[0].get_text(strip=True).lower() company_el = cells[1] company_name = company_el.get_text(strip=True) if not company_name or company_name == '-' or company_name == 'N/A': continue # Extract company MT profile URL company_link = company_el.select_one('a[href]') company_href = company_link['href'] if company_link else None if company_href and not company_href.startswith('http'): company_href = MT_BASE + company_href country = cells[2].get_text(strip=True) if len(cells) > 2 else '' # Map role to field name matched_field = None for role_key, field in ownership_role_map.items(): if role_key in role_text: matched_field = field break if not matched_field: matched_field = 'operator' # fallback # Set top-level fields (first occurrence wins) if not result.get(matched_field): result[matched_field] = company_name if country: result[f'{matched_field}_country'] = country # Also build owner/operator aliases for DB compatibility if matched_field in ('beneficial_owner', 'registered_owner') and not result.get('owner'): result['owner'] = company_name if country: result['owner_country'] = country elif matched_field == 'operator' and not result.get('operator'): result['operator'] = company_name if country: result['operator_country'] = country companies.append({ 'role': role_text, 'name': company_name, 'country': country, 'mt_profile_url': company_href, }) result['companies'] = companies # stored as companies_json return result def _apply_vessel_field(self, result: dict, key: str, val: str): """Apply a parsed key-value pair to vessel result dict.""" if not val or val in ('-', 'N/A', 'Unknown', '—'): return if 'imo' in key and not result.get('imo'): result['imo'] = val.strip() elif 'mmsi' in key and key != 'mmsi' and not result.get('mmsi'): result['mmsi'] = val.strip() elif 'call sign' in key or 'callsign' in key: result['callsign'] = val elif 'flag' in key and not result.get('flag'): result['flag'] = val elif 'type' in key and 'ship' in key and not result.get('type'): result['type'] = val elif 'year' in key and 'built' in key: result['year_built'] = self._parse_number(val) elif 'deadweight' in key or key == 'dwt': result['deadweight'] = self._parse_number(val) elif 'gross' in key and 'tonnage' in key: result['gross_tonnage'] = self._parse_number(val) elif 'length' in key and 'overall' in key: result['length'] = self._parse_number(val) elif 'breadth' in key or ('width' in key and 'ext' in key): result['width'] = self._parse_number(val) def get_company_website(self, company_mt_url: str) -> str: """Try to extract company website URL from MT company profile page.""" if not company_mt_url or 'marinetraffic.com' not in company_mt_url: return None try: resp = self.session.get( company_mt_url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}, timeout=10 ) if resp.status_code != 200: return None soup = BeautifulSoup(resp.text, 'html.parser') # MT company pages often show website in a link that goes to external domain for a in soup.select('a[href^="http"]'): href = a['href'] if 'marinetraffic.com' not in href and '.' in href and len(href) < 120: # Filter obvious non-website links if any(skip in href for skip in ['facebook.', 'twitter.', 'linkedin.', 'google.', 'youtube.']): continue return href.rstrip('/') return None except Exception as e: logger.debug(f"get_company_website({company_mt_url}): {e}") return None # ========================================================================= # PORT / AREA VESSEL SCRAPING # ========================================================================= def scrape_area_vessels(self, lat: float, lon: float, radius_nm: float = 15) -> List[Dict]: """ Scrape vessels in area via MarineTraffic map data JSON endpoint. Uses tile-based internal API with session cookies. """ # Convert radius NM → degrees lat_delta = radius_nm / 60.0 lon_delta = radius_nm / (60.0 * max(math.cos(math.radians(lat)), 0.01)) lat_min = lat - lat_delta lat_max = lat + lat_delta lon_min = lon - lon_delta lon_max = lon + lon_delta # Step 1: Establish session (get cookies) try: self.session.get( f"{MT_BASE}/en/ais/home/centerx:{lon:.1f}/centery:{lat:.1f}/zoom:11", timeout=10 ) except Exception: pass time.sleep(1) # Be polite # Step 2: Calculate Web Mercator tile coordinates for zoom level 11 zoom = 11 def lat_lon_to_tile(lat_deg, lon_deg, z): lat_rad = math.radians(lat_deg) n = 2.0 ** z xtile = int((lon_deg + 180.0) / 360.0 * n) ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n) return xtile, ytile x_tile, y_tile = lat_lon_to_tile(lat, lon, zoom) # Step 3: Request map data url = f"{MT_BASE}/getData/get_data_json_4/z:{zoom}/X:{x_tile}/Y:{y_tile}/station:0" try: resp = self.session.get(url, timeout=15, headers={ 'Referer': f'{MT_BASE}/en/ais/home/centerx:{lon:.1f}/centery:{lat:.1f}/zoom:{zoom}', 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*' }) if resp.status_code != 200: logger.warning(f"Area scrape HTTP {resp.status_code}") return [] data = resp.json() vessels = [] # Parse response — format varies rows = data if isinstance(data, dict): rows = data.get('data', data.get('rows', data.get('type1', []))) if isinstance(rows, dict): rows = rows.get('rows', []) if not isinstance(rows, list): return [] for item in rows: if isinstance(item, dict): v_lat = item.get('LAT', item.get('lat', 0)) v_lon = item.get('LON', item.get('lon', 0)) try: v_lat = float(v_lat) if v_lat else 0 v_lon = float(v_lon) if v_lon else 0 except (ValueError, TypeError): continue # Filter to bounding box if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max): continue mmsi = str(item.get('MMSI', item.get('mmsi', ''))) if not mmsi or mmsi == '0': continue type_raw = item.get('SHIPTYPE', item.get('ship_type', item.get('type_name', ''))) type_code = item.get('TYPE_CODE', item.get('type_code')) nav_status_code = item.get('NAVSTAT', item.get('nav_status', item.get('STATUS', None))) nav_status = None if nav_status_code is not None: try: nav_status = NAV_STATUS_MAP.get(int(nav_status_code)) except (ValueError, TypeError): pass dwt = item.get('DWT', item.get('dwt', item.get('DEADWEIGHT'))) vessels.append({ 'mmsi': mmsi, 'name': item.get('SHIPNAME', item.get('shipname', item.get('name', ''))), 'type': str(type_raw) if type_raw else '', 'type_category': classify_vessel_type(str(type_raw) if type_raw else '', type_code), 'flag': item.get('FLAG', item.get('flag', '')), 'speed': item.get('SPEED', item.get('speed')), 'course': item.get('COURSE', item.get('course')), 'nav_status': nav_status, 'dwt': dwt, 'lat': v_lat, 'lon': v_lon, 'destination': item.get('DESTINATION', item.get('destination', '')), }) elif isinstance(item, list) and len(item) >= 7: # Some endpoints return arrays: [mmsi, lat, lon, speed, course, type, name, ...] try: v_lat = float(item[1]) / 10000 if item[1] > 1000 else float(item[1]) v_lon = float(item[2]) / 10000 if abs(item[2]) > 1000 else float(item[2]) except (ValueError, TypeError, IndexError): continue if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max): continue type_val = str(item[5]) if len(item) > 5 else '' vessels.append({ 'mmsi': str(item[0]), 'lat': v_lat, 'lon': v_lon, 'speed': item[3] if len(item) > 3 else None, 'course': item[4] if len(item) > 4 else None, 'type': type_val, 'type_category': classify_vessel_type(type_val), 'nav_status': None, 'dwt': None, 'name': str(item[6]) if len(item) > 6 else '', }) return vessels except Exception as e: logger.error(f"Area scrape error: {e}") return [] def scrape_viewport_vessels(self, lat_min: float, lat_max: float, lon_min: float, lon_max: float, max_tiles: int = 6) -> List[Dict]: """ Scrape vessels covering a map viewport via multiple MarineTraffic tiles. Adapts zoom level to viewport size and fetches multiple tiles. Returns deduplicated list of vessel dicts with lat/lon. """ lat_span = lat_max - lat_min lon_span = lon_max - lon_min # Choose zoom based on viewport size (larger viewport → lower zoom → bigger tiles) if lat_span > 20 or lon_span > 40: zoom = 4 elif lat_span > 10 or lon_span > 20: zoom = 5 elif lat_span > 5 or lon_span > 10: zoom = 6 elif lat_span > 2 or lon_span > 4: zoom = 8 else: zoom = 10 def lat_lon_to_tile(lat_deg, lon_deg, z): lat_rad = math.radians(lat_deg) n = 2.0 ** z xtile = int((lon_deg + 180.0) / 360.0 * n) ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n) return xtile, ytile # Calculate tiles covering the viewport x_min, y_min = lat_lon_to_tile(lat_max, lon_min, zoom) # NW corner x_max, y_max = lat_lon_to_tile(lat_min, lon_max, zoom) # SE corner # Collect unique tiles to fetch (limit to max_tiles) tiles = [] for x in range(x_min, x_max + 1): for y in range(y_min, y_max + 1): tiles.append((x, y)) if len(tiles) >= max_tiles: break if len(tiles) >= max_tiles: break if not tiles: center_lat = (lat_min + lat_max) / 2 center_lon = (lon_min + lon_max) / 2 cx, cy = lat_lon_to_tile(center_lat, center_lon, zoom) tiles = [(cx, cy)] # Establish session center_lat = (lat_min + lat_max) / 2 center_lon = (lon_min + lon_max) / 2 try: self.session.get( f"{MT_BASE}/en/ais/home/centerx:{center_lon:.1f}/centery:{center_lat:.1f}/zoom:{zoom}", timeout=10 ) except Exception: pass # Fetch tiles all_vessels = [] seen_mmsi = set() for x_tile, y_tile in tiles: try: time.sleep(0.5) # Rate limit between tiles url = f"{MT_BASE}/getData/get_data_json_4/z:{zoom}/X:{x_tile}/Y:{y_tile}/station:0" resp = self.session.get(url, timeout=10, headers={ 'Referer': f'{MT_BASE}/en/ais/home/centerx:{center_lon:.1f}/centery:{center_lat:.1f}/zoom:{zoom}', 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json, text/javascript, */*' }) if resp.status_code != 200: continue data = resp.json() rows = data if isinstance(data, dict): rows = data.get('data', data.get('rows', data.get('type1', []))) if isinstance(rows, dict): rows = rows.get('rows', []) if not isinstance(rows, list): continue for item in rows: v = self._parse_tile_item(item, lat_min, lat_max, lon_min, lon_max) if v and v.get('mmsi') and v['mmsi'] not in seen_mmsi: seen_mmsi.add(v['mmsi']) all_vessels.append(v) except Exception as e: logger.debug(f"Tile {x_tile},{y_tile} z{zoom} error: {e}") continue logger.info(f"MT viewport scrape: {len(tiles)} tiles z{zoom}, {len(all_vessels)} vessels") return all_vessels def _parse_tile_item(self, item, lat_min, lat_max, lon_min, lon_max) -> dict: """Parse a single vessel item from MT tile data.""" if isinstance(item, dict): v_lat = item.get('LAT', item.get('lat', 0)) v_lon = item.get('LON', item.get('lon', 0)) try: v_lat = float(v_lat) if v_lat else 0 v_lon = float(v_lon) if v_lon else 0 except (ValueError, TypeError): return None if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max): return None mmsi = str(item.get('MMSI', item.get('mmsi', ''))) if not mmsi or mmsi == '0': return None type_raw = item.get('SHIPTYPE', item.get('ship_type', item.get('type_name', ''))) type_code = item.get('TYPE_CODE', item.get('type_code')) nav_status_code = item.get('NAVSTAT', item.get('nav_status', item.get('STATUS', None))) nav_status = None if nav_status_code is not None: try: nav_status = NAV_STATUS_MAP.get(int(nav_status_code)) except (ValueError, TypeError): pass dwt = item.get('DWT', item.get('dwt', item.get('DEADWEIGHT'))) return { 'mmsi': mmsi, 'name': item.get('SHIPNAME', item.get('shipname', item.get('name', ''))), 'type': str(type_raw) if type_raw else '', 'type_category': classify_vessel_type(str(type_raw) if type_raw else '', type_code), 'flag': item.get('FLAG', item.get('flag', '')), 'speed': item.get('SPEED', item.get('speed')), 'course': item.get('COURSE', item.get('course')), 'nav_status': nav_status, 'dwt': dwt, 'lat': v_lat, 'lon': v_lon, 'destination': item.get('DESTINATION', item.get('destination', '')), } elif isinstance(item, list) and len(item) >= 7: try: v_lat = float(item[1]) / 10000 if item[1] > 1000 else float(item[1]) v_lon = float(item[2]) / 10000 if abs(item[2]) > 1000 else float(item[2]) except (ValueError, TypeError, IndexError): return None if not (lat_min <= v_lat <= lat_max and lon_min <= v_lon <= lon_max): return None type_val = str(item[5]) if len(item) > 5 else '' return { 'mmsi': str(item[0]), 'lat': v_lat, 'lon': v_lon, 'speed': item[3] if len(item) > 3 else None, 'course': item[4] if len(item) > 4 else None, 'type': type_val, 'type_category': classify_vessel_type(type_val), 'name': str(item[6]) if len(item) > 6 else '', } return None def get_port_vessels(self, port_name: str) -> List[Dict]: """ Get vessels currently in/near a port. Tries: 1) map tile scraping, 2) paid API (if key), 3) empty. """ port = resolve_port(port_name) if not port: return [] lat, lon = port['lat'], port['lon'] radius = port.get('radius_nm', 15) # Strategy 1: Scrape map tile data vessels = self.scrape_area_vessels(lat, lon, radius) # Strategy 2: Paid API fallback if not vessels and self.has_api_key(): lat_d = radius / 60.0 lon_d = radius / (60.0 * max(math.cos(math.radians(lat)), 0.01)) try: api_result = self.api_vessels_in_area( lat - lat_d, lat + lat_d, lon - lon_d, lon + lon_d ) if isinstance(api_result, list): for item in api_result: mmsi = str(item.get('MMSI', '')) if mmsi and mmsi != '0': vessels.append({ 'mmsi': mmsi, 'name': item.get('SHIPNAME', ''), 'type': item.get('TYPE_NAME', item.get('SHIPTYPE', '')), 'flag': item.get('FLAG', ''), 'speed': item.get('SPEED'), 'course': item.get('COURSE'), 'lat': item.get('LAT'), 'lon': item.get('LON'), 'destination': item.get('DESTINATION', ''), }) except Exception as e: logger.error(f"API area fallback error: {e}") # Deduplicate by MMSI seen = set() unique = [] for v in vessels: if v.get('mmsi') and v['mmsi'] not in seen: seen.add(v['mmsi']) unique.append(v) return unique # ========================================================================= # API METHODS (requires API key) # ========================================================================= def _api_call(self, service: str, params: dict) -> dict: """Make API call""" if not self.api_key: raise ValueError("API key required. Set MARINETRAFFIC_API_KEY") url = f"{MT_API_BASE}/{service}/{self.api_key}" params['protocol'] = 'jsono' # JSON output try: resp = self.session.get(url, params=params, timeout=30) return resp.json() except Exception as e: logger.error(f"API error: {e}") return {} def api_vessel_info(self, mmsi: str = None, imo: str = None) -> dict: """ PS01 - Vessel Particulars Get detailed vessel info via API """ params = {} if mmsi: params['mmsi'] = mmsi elif imo: params['imo'] = imo else: raise ValueError("MMSI or IMO required") return self._api_call('vesselparticulars', params) def api_vessel_position(self, mmsi: str = None, imo: str = None) -> dict: """ PS07 - Single Vessel Position Get current vessel position via API """ params = {'timespan': 60} # Last 60 minutes if mmsi: params['mmsi'] = mmsi elif imo: params['imo'] = imo return self._api_call('exportvessel', params) def api_port_calls(self, mmsi: str = None, imo: str = None) -> dict: """ VD02 - Port Calls Get vessel port call history """ params = {} if mmsi: params['mmsi'] = mmsi elif imo: params['imo'] = imo return self._api_call('portcalls', params) def api_vessels_in_area(self, lat_min: float, lat_max: float, lon_min: float, lon_max: float) -> list: """ PS02 - Vessels in Area Get all vessels in geographic area """ params = { 'MINLAT': lat_min, 'MAXLAT': lat_max, 'MINLON': lon_min, 'MAXLON': lon_max } return self._api_call('exportvessels', params) # ========================================================================= # HELPERS # ========================================================================= def _parse_number(self, text: str) -> Optional[float]: """Extract number from text""" if not text: return None match = re.search(r'[\d,]+\.?\d*', text.replace(',', '')) if match: try: return float(match.group()) except: pass return None def has_api_key(self) -> bool: """Check if API key is configured""" return bool(self.api_key) # ============================================================================= # SEA ROUTE CALCULATION # ============================================================================= # Maritime waypoints (choke points, canals, straits) WAYPOINTS = { 'gibraltar': {'name': 'Strait of Gibraltar', 'lat': 36.13, 'lon': -5.35}, 'suez_n': {'name': 'Suez Canal (Port Said)', 'lat': 31.26, 'lon': 32.30}, 'suez_s': {'name': 'Suez Canal (Suez)', 'lat': 29.97, 'lon': 32.55}, 'bab_el_mandeb': {'name': 'Bab el-Mandeb Strait', 'lat': 12.65, 'lon': 43.30}, 'hormuz': {'name': 'Strait of Hormuz', 'lat': 26.55, 'lon': 56.25}, 'malacca': {'name': 'Strait of Malacca', 'lat': 1.27, 'lon': 103.82}, 'cape_good_hope': {'name': 'Cape of Good Hope', 'lat': -34.35, 'lon': 18.50}, 'panama_atl': {'name': 'Panama Canal (Atlantic)', 'lat': 9.36, 'lon': -79.90}, 'panama_pac': {'name': 'Panama Canal (Pacific)', 'lat': 8.95, 'lon': -79.55}, 'cape_horn': {'name': 'Cape Horn', 'lat': -55.98, 'lon': -67.28}, 'dover': {'name': 'Dover Strait', 'lat': 51.13, 'lon': 1.33}, 'bosphorus': {'name': 'Bosphorus Strait', 'lat': 41.12, 'lon': 29.05}, 'good_hope_e': {'name': 'Cape Agulhas (East)', 'lat': -34.83, 'lon': 20.02}, 'volga_don': {'name': 'Volga-Don Canal (Astrakhan)', 'lat': 46.35, 'lon': 48.03}, } # Average ECO speeds by vessel type (knots) — modern slow-steaming VESSEL_SPEEDS = { 'bulk': 12.0, # Eco steaming (was 14.5 design) 'tanker': 12.5, # Eco steaming (was 15 design) 'container': 15.0, # Eco steaming (was 22-25 design) 'general': 12.0, 'passenger': 18.0, 'roro': 15.0, 'offshore': 10.0, 'default': 12.5, } # Daily fuel consumption (metric tons/day) — ranges by DWT # Returns (low, high) MT/day for given vessel type and DWT def estimate_fuel_consumption(vessel_type: str, dwt: float = None) -> tuple: """Estimate daily fuel consumption in MT/day as (low, high) range.""" vtype = vessel_type.lower() if vessel_type else 'default' # DWT-based tables: (dwt_threshold, low_mt_day, high_mt_day) FUEL_TABLE = { 'bulk': [ (40000, 18, 25), # Handysize (65000, 25, 35), # Handymax/Supramax (100000, 30, 42), # Panamax (200000, 40, 55), # Capesize (999999, 55, 70), # VLOC ], 'tanker': [ (55000, 22, 30), # MR (80000, 28, 38), # LR1 (120000, 35, 48), # Aframax (200000, 45, 60), # Suezmax (999999, 60, 85), # VLCC ], 'container': [ (25000, 25, 40), # Feeder (40000, 40, 60), # Feedermax (65000, 55, 80), # Panamax (100000, 75, 110), # Post-Panamax (150000, 100, 150), # Neo-Panamax (999999, 140, 200), # ULCV ], 'general': [(999999, 12, 22)], 'roro': [(999999, 35, 55)], 'passenger': [(999999, 80, 150)], } table = FUEL_TABLE.get(vtype, [(999999, 25, 40)]) if dwt: try: dwt_val = float(dwt) for threshold, low, high in table: if dwt_val <= threshold: return (low, high) except (ValueError, TypeError): pass # Default: mid-range entry mid = table[len(table) // 2] return (mid[1], mid[2]) # Canal transit costs — DWT-based (USD) def estimate_suez_cost(dwt: float = None) -> tuple: """Estimate Suez Canal transit cost as (low, high) USD based on SCNT formula.""" if not dwt: return (200000, 600000) try: d = float(dwt) # Simplified SCNT-based tiers (Suez Canal Net Tonnage) # Actual formula uses SCNT, but we approximate from DWT if d <= 30000: return (80000, 150000) elif d <= 60000: return (150000, 300000) elif d <= 100000: return (250000, 450000) elif d <= 200000: return (350000, 700000) else: return (500000, 1200000) except (ValueError, TypeError): return (200000, 600000) def estimate_panama_cost(dwt: float = None) -> tuple: """Estimate Panama Canal transit cost as (low, high) USD based on PC/UMS tonnage.""" if not dwt: return (150000, 500000) try: d = float(dwt) if d <= 30000: return (50000, 120000) elif d <= 60000: return (100000, 250000) elif d <= 100000: return (200000, 400000) elif d <= 150000: return (350000, 600000) else: # Neo-Panamax locks return (500000, 900000) except (ValueError, TypeError): return (150000, 500000) CANAL_TRANSIT_HOURS = { 'suez': 12, 'panama': 10, } # Bunker fuel price (VLSFO, USD/ton) — approximate 2024-2026 range BUNKER_PRICE_USD = 600 # Port charges — DWT-based (USD per call) def estimate_port_charges(dwt: float = None) -> tuple: """Estimate port charges per call as (low, high) USD.""" if not dwt: return (15000, 50000) try: d = float(dwt) if d <= 30000: return (8000, 18000) elif d <= 60000: return (15000, 30000) elif d <= 100000: return (25000, 50000) elif d <= 200000: return (40000, 80000) else: return (60000, 150000) except (ValueError, TypeError): return (15000, 50000) # Port region assignments for routing PORT_REGIONS = {} def _assign_regions(): """Assign routing regions to ports based on coordinates.""" for key, port in WORLD_PORTS.items(): lat, lon = port['lat'], port['lon'] # Americas first (all lon < -30) if lon < -100 and lat > 25: region = 'USWC' # US West Coast elif lon < -30 and lat > 25: region = 'USEC' # US East Coast elif lon < -30 and -5 < lat <= 25: region = 'CARIB' # Caribbean / Central America elif lon < -70 and lat <= -5: region = 'SAW' # South America West elif lon < -30 and lat <= -5: region = 'SAE' # South America East # Asia-Pacific elif lon > 100 and lat < -10: region = 'AUSNZ' # Australia / NZ elif lon > 100: region = 'EASIA' # East/SE Asia elif 60 < lon <= 100 and lat > 0: region = 'SASIA' # South Asia elif 36 <= lat <= 47 and 46 <= lon <= 55: region = 'CASP' # Caspian Sea (landlocked, before GULF) elif 40 < lon <= 60 and lat > 10: region = 'GULF' # Persian Gulf # Africa elif 10 < lon < 45 and lat < -20: region = 'SAFR' # Southern Africa (must be before ERED) elif 30 < lon <= 45 and -10 < lat < 20: region = 'ERED' # East Africa / Red Sea elif -5 < lon < 15 and -5 < lat < 15: region = 'WAFR' # West Africa # Europe elif 25 < lon <= 40 and 40 < lat <= 48: region = 'BSEA' # Black Sea (Istanbul, Novorossiysk, Constanta) elif lon < 40 and lat > 48: region = 'NEUR' # North Europe + Baltic (above 48N) elif -10 <= lon <= 40 and 25 < lat <= 48: region = 'MED' # Mediterranean + Southern Europe else: region = 'OTHER' PORT_REGIONS[key] = region _assign_regions() # Route templates: from_region → to_region → waypoint sequence ROUTE_WAYPOINTS = { ('NEUR', 'EASIA'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'], ('NEUR', 'SASIA'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb'], ('NEUR', 'GULF'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'], ('NEUR', 'MED'): ['dover', 'gibraltar'], ('NEUR', 'USEC'): [], # Direct Atlantic ('NEUR', 'SAFR'): ['dover', 'gibraltar', 'cape_good_hope'], ('NEUR', 'WAFR'): ['dover', 'gibraltar'], ('NEUR', 'ERED'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb'], ('NEUR', 'CARIB'): [], # Direct Atlantic ('NEUR', 'SAE'): [], ('NEUR', 'AUSNZ'): ['dover', 'gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'], ('NEUR', 'BSEA'): ['dover'], ('MED', 'EASIA'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'], ('MED', 'SASIA'): ['suez_n', 'suez_s', 'bab_el_mandeb'], ('MED', 'GULF'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'], ('MED', 'NEUR'): ['gibraltar', 'dover'], ('MED', 'USEC'): ['gibraltar'], ('MED', 'SAFR'): ['gibraltar', 'cape_good_hope'], ('MED', 'ERED'): ['suez_n', 'suez_s', 'bab_el_mandeb'], ('MED', 'AUSNZ'): ['suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'], ('BSEA', 'MED'): ['bosphorus'], ('BSEA', 'EASIA'): ['bosphorus', 'suez_n', 'suez_s', 'bab_el_mandeb', 'malacca'], ('BSEA', 'GULF'): ['bosphorus', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'], ('GULF', 'EASIA'): ['hormuz', 'malacca'], ('GULF', 'SASIA'): ['hormuz'], ('GULF', 'NEUR'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'], ('GULF', 'MED'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n'], ('GULF', 'USEC'): ['hormuz', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar'], ('GULF', 'SAFR'): ['hormuz', 'bab_el_mandeb', 'cape_good_hope'], ('EASIA', 'SASIA'): ['malacca'], ('EASIA', 'GULF'): ['malacca', 'hormuz'], ('EASIA', 'NEUR'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'], ('EASIA', 'MED'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n'], ('EASIA', 'USEC'): ['panama_pac', 'panama_atl'], ('EASIA', 'USWC'): [], # Direct Pacific ('EASIA', 'SAFR'): ['malacca', 'cape_good_hope'], ('EASIA', 'AUSNZ'): ['malacca'], ('SASIA', 'EASIA'): ['malacca'], ('SASIA', 'NEUR'): ['bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'], ('SASIA', 'MED'): ['bab_el_mandeb', 'suez_s', 'suez_n'], ('SASIA', 'GULF'): ['hormuz'], ('USEC', 'EASIA'): ['panama_atl', 'panama_pac'], ('USEC', 'NEUR'): [], # Direct Atlantic ('USEC', 'MED'): ['gibraltar'], ('USEC', 'GULF'): ['gibraltar', 'suez_n', 'suez_s', 'bab_el_mandeb', 'hormuz'], ('USEC', 'SAE'): [], ('USEC', 'CARIB'): [], ('USWC', 'EASIA'): [], # Direct Pacific ('USWC', 'NEUR'): ['panama_pac', 'panama_atl'], ('USWC', 'SAW'): [], ('USWC', 'AUSNZ'): [], ('SAFR', 'NEUR'): ['cape_good_hope', 'gibraltar', 'dover'], ('SAFR', 'EASIA'): ['cape_good_hope', 'malacca'], ('SAFR', 'SASIA'): ['cape_good_hope'], ('SAE', 'NEUR'): [], ('SAE', 'USEC'): [], ('SAE', 'EASIA'): ['cape_good_hope', 'malacca'], ('SAW', 'USEC'): ['panama_pac', 'panama_atl'], ('SAW', 'USWC'): [], ('CARIB', 'NEUR'): [], ('CARIB', 'USEC'): [], ('CARIB', 'EASIA'): ['panama_atl', 'panama_pac'], ('AUSNZ', 'EASIA'): ['malacca'], ('AUSNZ', 'NEUR'): ['malacca', 'bab_el_mandeb', 'suez_s', 'suez_n', 'gibraltar', 'dover'], # Caspian Sea (landlocked — river-sea vessels only via Volga-Don Canal) ('CASP', 'CASP'): [], # Direct within Caspian ('CASP', 'BSEA'): ['volga_don'], # Via Volga-Don → Azov → Black Sea ('CASP', 'MED'): ['volga_don', 'bosphorus'], # Caspian → Black Sea → Med ('CASP', 'NEUR'): ['volga_don', 'bosphorus', 'gibraltar', 'dover'], ('CASP', 'GULF'): [], # INSTC: Caspian → Iran overland → AG ('CASP', 'SASIA'): [], # INSTC: Caspian → Iran → India ('BSEA', 'CASP'): ['volga_don'], ('MED', 'CASP'): ['bosphorus', 'volga_don'], ('NEUR', 'CASP'): ['dover', 'gibraltar', 'bosphorus', 'volga_don'], ('GULF', 'CASP'): [], # INSTC reverse ('SASIA', 'CASP'): [], # INSTC reverse } def _haversine_nm(lat1, lon1, lat2, lon2): """Calculate great circle distance in nautical miles.""" R = 3440.065 # Earth radius in NM lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) dlat = lat2 - lat1 dlon = lon2 - lon1 a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 c = 2 * math.asin(math.sqrt(min(a, 1.0))) return R * c def calculate_sea_route(from_port: str, to_port: str, vessel_type: str = 'default', dwt: float = None) -> Optional[Dict]: """ Calculate sea route between two ports. Returns distance, time, waypoints, estimated cost RANGES based on DWT. """ port_a = resolve_port(from_port) port_b = resolve_port(to_port) if not port_a or not port_b: return None # Get port keys for region lookup (resolve_port includes 'key') key_a = port_a.get('key') key_b = port_b.get('key') if not key_a or not key_b: return None region_a = PORT_REGIONS.get(key_a, 'OTHER') region_b = PORT_REGIONS.get(key_b, 'OTHER') # Find waypoint sequence wp_keys = ROUTE_WAYPOINTS.get((region_a, region_b)) if wp_keys is None: # Try reverse wp_keys_rev = ROUTE_WAYPOINTS.get((region_b, region_a)) if wp_keys_rev is not None: wp_keys = list(reversed(wp_keys_rev)) else: wp_keys = [] # Direct route # Build coordinate chain: port_a → waypoints → port_b chain = [(port_a['lat'], port_a['lon'], port_a['name'])] for wpk in wp_keys: wp = WAYPOINTS[wpk] chain.append((wp['lat'], wp['lon'], wp['name'])) chain.append((port_b['lat'], port_b['lon'], port_b['name'])) # Calculate total distance total_nm = 0.0 legs = [] for i in range(len(chain) - 1): lat1, lon1, name1 = chain[i] lat2, lon2, name2 = chain[i + 1] dist = _haversine_nm(lat1, lon1, lat2, lon2) # Sea routes are ~15-20% longer than great circle due to coastlines sea_dist = dist * 1.15 total_nm += sea_dist legs.append({ 'from': name1, 'to': name2, 'distance_nm': round(sea_dist), }) # Canals used — DWT-based costs canals_used = [] canal_cost_low = 0 canal_cost_high = 0 canal_hours = 0 if 'suez_n' in wp_keys or 'suez_s' in wp_keys: s_low, s_high = estimate_suez_cost(dwt) canals_used.append({'name': 'Suez Canal', 'cost_low': s_low, 'cost_high': s_high}) canal_cost_low += s_low canal_cost_high += s_high canal_hours += CANAL_TRANSIT_HOURS['suez'] if 'panama_atl' in wp_keys or 'panama_pac' in wp_keys: p_low, p_high = estimate_panama_cost(dwt) canals_used.append({'name': 'Panama Canal', 'cost_low': p_low, 'cost_high': p_high}) canal_cost_low += p_low canal_cost_high += p_high canal_hours += CANAL_TRANSIT_HOURS['panama'] # Speed and time vtype = vessel_type.lower() if vessel_type else 'default' speed = VESSEL_SPEEDS.get(vtype, VESSEL_SPEEDS['default']) sailing_hours = total_nm / speed total_hours = sailing_hours + canal_hours total_days = total_hours / 24 # Cost estimation — ranges fuel_low, fuel_high = estimate_fuel_consumption(vtype, dwt) fuel_cost_low = (sailing_hours / 24) * fuel_low * BUNKER_PRICE_USD fuel_cost_high = (sailing_hours / 24) * fuel_high * BUNKER_PRICE_USD port_low, port_high = estimate_port_charges(dwt) port_cost_low = port_low * 2 # Load + discharge port_cost_high = port_high * 2 total_cost_low = fuel_cost_low + canal_cost_low + port_cost_low total_cost_high = fuel_cost_high + canal_cost_high + port_cost_high # Vessel subtype info subtype = classify_vessel_subtype(vtype, dwt) # Waypoint names for display via_points = [] for wpk in wp_keys: wp = WAYPOINTS[wpk] # Skip pairs (suez_n + suez_s → "Suez Canal", panama_atl + panama_pac → "Panama Canal") if wpk in ('suez_s', 'panama_pac'): continue via_points.append(wp['name']) # Build coordinate chain for map polyline route_coords = [[lat, lon] for lat, lon, _ in chain] result = { 'from': port_a['name'], 'from_country': port_a['country'], 'from_lat': port_a['lat'], 'from_lon': port_a['lon'], 'to': port_b['name'], 'to_country': port_b['country'], 'to_lat': port_b['lat'], 'to_lon': port_b['lon'], 'route_coords': route_coords, 'distance_nm': round(total_nm), 'distance_km': round(total_nm * 1.852), 'vessel_type': vtype, 'speed_knots': speed, 'sailing_hours': round(sailing_hours), 'canal_transit_hours': canal_hours, 'total_hours': round(total_hours), 'total_days': round(total_days, 1), 'via': via_points, 'canals': [c['name'] for c in canals_used], 'legs': legs, 'cost_estimate': { 'fuel_usd': {'low': round(fuel_cost_low), 'high': round(fuel_cost_high)}, 'canal_fees_usd': {'low': canal_cost_low, 'high': canal_cost_high}, 'port_charges_usd': {'low': port_cost_low, 'high': port_cost_high}, 'total_usd': {'low': round(total_cost_low), 'high': round(total_cost_high)}, 'bunker_price_usd_ton': BUNKER_PRICE_USD, 'note': 'Estimated range based on vessel size. Actual costs depend on market rates, cargo, season, and operator.' } } if dwt: result['dwt'] = dwt if subtype: result['vessel_subtype'] = subtype.get('name') return result # ============================================================================= # FEATURE 1: FIXTURE MATCHING ENGINE # ============================================================================= def fixture_match(cargo_type: str, tonnage: float, from_port: str, to_port: str = None, vessels: list = None) -> Dict: """ Match cargo to suitable vessels with scoring. Returns scored candidates + route info. """ # Determine required vessel type vtype = classify_cargo(cargo_type) if not vtype: return {'error': f"Cannot classify cargo '{cargo_type}'", 'candidates': []} load_port = resolve_port(from_port) if not load_port: return {'error': f"Port '{from_port}' not found", 'candidates': []} # Subtype recommendation based on tonnage subtype = classify_vessel_subtype(vtype, tonnage * 1.15 if tonnage else None) # 15% margin # Score each vessel scored = [] for v in (vessels or []): score = 0 reasons = [] # Type match (40 pts) v_cat = v.get('type_category', '') if v_cat == vtype: score += 40 reasons.append('type_match') elif vtype in (v.get('type', '') or '').lower(): score += 25 reasons.append('partial_type_match') else: continue # Skip non-matching types entirely # DWT fit (30 pts) v_dwt = None try: v_dwt = float(v.get('dwt', 0) or 0) except (ValueError, TypeError): pass if v_dwt and tonnage: ratio = v_dwt / tonnage if tonnage > 0 else 0 if 1.05 <= ratio <= 1.5: score += 30 # Perfect fit (5-50% margin) reasons.append('dwt_perfect') elif 1.0 <= ratio <= 2.0: score += 20 reasons.append('dwt_acceptable') elif ratio > 2.0: score += 5 reasons.append('dwt_oversized') else: score += 0 reasons.append('dwt_too_small') elif not v_dwt: score += 10 # Unknown DWT — neutral reasons.append('dwt_unknown') # Availability — anchored/moored vessels are more likely available (20 pts) nav = v.get('nav_status', '') spd = 0 try: spd = float(v.get('speed', 0) or 0) except (ValueError, TypeError): pass if nav in ('at anchor', 'moored') or spd < 0.5: score += 20 reasons.append('likely_available') elif spd < 3: score += 15 reasons.append('slow_moving') else: score += 5 reasons.append('underway') # Proximity bonus (10 pts) — vessels closer to load port v_lat = v.get('lat') v_lon = v.get('lon') if v_lat and v_lon: try: dist = _haversine_nm(float(v_lat), float(v_lon), load_port['lat'], load_port['lon']) if dist < 50: score += 10 reasons.append('very_close') elif dist < 200: score += 7 reasons.append('nearby') elif dist < 500: score += 3 reasons.append('regional') except (ValueError, TypeError): pass v_scored = {**v, '_score': score, '_match_reasons': reasons} if v_dwt: v_scored['_subtype'] = classify_vessel_subtype(vtype, v_dwt) scored.append(v_scored) # Sort by score descending scored.sort(key=lambda x: -x['_score']) # Route calculation if destination provided route = None if to_port: route = calculate_sea_route(from_port, to_port, vtype, dwt=tonnage) return { 'cargo_type': cargo_type, 'vessel_type_required': vtype, 'recommended_subtype': subtype.get('name') if subtype else None, 'recommended_dwt_range': f"{subtype['dwt_min']:,}–{subtype['dwt_max']:,}" if subtype else None, 'loading_port': load_port['name'], 'tonnage': tonnage, 'candidates': scored[:15], 'total_matches': len(scored), 'route': route, } # ============================================================================= # FEATURE 2: FREIGHT RATE INTELLIGENCE # ============================================================================= # Freight rates $/ton by route corridor + vessel type (Q1 2026 approximations) # Based on Baltic Exchange indices: BDI (bulk), BDTI (tanker), SCFI (container) FREIGHT_RATES = { 'bulk': { # route_key: (low_$/ton, high_$/ton, benchmark) ('EASIA', 'NEUR'): (18, 28, 'C3 Cape Brazil-China equiv'), ('NEUR', 'EASIA'): (12, 20, 'Backhaul'), ('SAE', 'EASIA'): (15, 25, 'C5TC Capesize Brazil-China'), ('AUSNZ', 'EASIA'): (8, 14, 'C5 Australia-China'), ('GULF', 'EASIA'): (12, 20, 'AG-Far East'), ('USEC', 'NEUR'): (10, 18, 'USG-Continent grain'), ('USEC', 'EASIA'): (25, 40, 'USG-Far East'), ('SAE', 'NEUR'): (12, 20, 'ECSA-Continent'), ('BSEA', 'MED'): (8, 15, 'Black Sea-Med grain'), ('BSEA', 'EASIA'): (22, 35, 'Black Sea-Far East'), ('SAFR', 'EASIA'): (10, 16, 'S.Africa-China'), ('SASIA', 'EASIA'): (6, 12, 'India-China coastal'), ('MED', 'NEUR'): (6, 12, 'Short-sea Med-Continent'), ('NEUR', 'USEC'): (8, 14, 'Continent-USG backhaul'), ('CASP', 'CASP'): (8, 18, 'Caspian Sea internal'), ('CASP', 'BSEA'): (20, 40, 'Caspian-Black Sea via Volga-Don'), ('BSEA', 'CASP'): (18, 35, 'Black Sea-Caspian via Volga-Don'), ('CASP', 'GULF'): (12, 25, 'Caspian-Persian Gulf INSTC'), ('GULF', 'CASP'): (14, 28, 'Persian Gulf-Caspian INSTC'), ('CASP', 'SASIA'): (22, 45, 'Caspian-India INSTC corridor'), }, 'tanker': { ('GULF', 'EASIA'): (8, 15, 'TD3C VLCC AG-China'), ('GULF', 'NEUR'): (10, 18, 'AG-UKC'), ('GULF', 'USEC'): (12, 20, 'AG-USAC'), ('GULF', 'MED'): (9, 16, 'AG-Med'), ('GULF', 'SASIA'): (5, 10, 'AG-India'), ('WAFR', 'EASIA'): (12, 22, 'WAF-East'), ('WAFR', 'USEC'): (8, 14, 'WAF-USG'), ('USEC', 'NEUR'): (6, 12, 'USG-UKC'), ('NEUR', 'USEC'): (5, 10, 'UKC-USG backhaul'), ('MED', 'NEUR'): (4, 8, 'Med-Continent'), ('BSEA', 'MED'): (5, 10, 'CPC-Med Aframax'), ('EASIA', 'EASIA'): (3, 7, 'Intra-Asia'), ('CASP', 'CASP'): (6, 14, 'Caspian tanker coastal'), ('CASP', 'BSEA'): (18, 35, 'Caspian-Black Sea crude via VDC'), ('BSEA', 'CASP'): (16, 30, 'Black Sea-Caspian products via VDC'), ('CASP', 'GULF'): (10, 22, 'Caspian-AG crude shuttle'), ('CASP', 'MED'): (25, 50, 'Caspian-Med crude via BTC/pipeline'), }, 'container': { # $/TEU rates (not per ton) ('EASIA', 'NEUR'): (1200, 2800, 'SCFI Shanghai-Europe'), ('EASIA', 'USEC'): (2500, 5000, 'SCFI Shanghai-USEC'), ('EASIA', 'USWC'): (1800, 3500, 'SCFI Shanghai-USWC'), ('EASIA', 'MED'): (1400, 3000, 'Shanghai-Med'), ('NEUR', 'EASIA'): (400, 900, 'Europe-Asia backhaul'), ('NEUR', 'USEC'): (800, 1500, 'Europe-USEC'), ('EASIA', 'SASIA'): (300, 700, 'Intra-Asia'), ('EASIA', 'AUSNZ'): (600, 1200, 'China-Australia'), ('EASIA', 'ERED'): (800, 1500, 'Asia-East Africa'), ('EASIA', 'SAE'): (1000, 2000, 'Asia-S.America'), ('MED', 'ERED'): (600, 1200, 'Med-East Africa'), ('USEC', 'SAE'): (500, 1000, 'US-S.America'), }, } # DWT multipliers for rate adjustment (smaller vessels = higher $/ton) _RATE_DWT_MULTIPLIER = { 'bulk': [ (10000, 1.8), # River-Sea premium (Caspian/inland) (40000, 1.4), # Handysize premium (65000, 1.15), # Supramax (100000, 1.0), # Panamax (base) (200000, 0.85), # Capesize discount (999999, 0.75), # VLOC ], 'tanker': [ (25000, 1.8), # River-Sea tanker premium (Caspian/inland) (55000, 1.5), # MR premium (80000, 1.2), # LR1 (120000, 1.0), # Aframax (base) (200000, 0.8), # Suezmax (999999, 0.65), # VLCC ], } def estimate_freight_rate(from_port: str, to_port: str, vessel_type: str = 'bulk', dwt: float = None) -> Optional[Dict]: """ Estimate freight rate for a route + vessel type. Returns $/ton range (bulk/tanker) or $/TEU range (container). """ port_a = resolve_port(from_port) port_b = resolve_port(to_port) if not port_a or not port_b: return None # Get regions (resolve_port includes 'key') key_a = port_a.get('key') key_b = port_b.get('key') if not key_a or not key_b: return None region_a = PORT_REGIONS.get(key_a, 'OTHER') region_b = PORT_REGIONS.get(key_b, 'OTHER') vtype = vessel_type.lower() if vessel_type else 'bulk' rates_table = FREIGHT_RATES.get(vtype, {}) # Direct match rate_data = rates_table.get((region_a, region_b)) # Try reverse with discount (backhaul) if not rate_data: rev = rates_table.get((region_b, region_a)) if rev: rate_data = (int(rev[0] * 0.6), int(rev[1] * 0.7), f"Backhaul from {rev[2]}") # Fallback: estimate from distance if not rate_data: route = calculate_sea_route(from_port, to_port, vtype, dwt=dwt) if route: dist = route['distance_nm'] # ~$0.003-$0.006/ton/NM for bulk base_rates = {'bulk': (0.003, 0.006), 'tanker': (0.0025, 0.005), 'container': (0.3, 0.7)} r = base_rates.get(vtype, (0.003, 0.006)) rate_data = (round(dist * r[0]), round(dist * r[1]), 'Distance-based estimate') else: return None low, high, benchmark = rate_data # DWT adjustment multiplier = 1.0 if dwt and vtype in _RATE_DWT_MULTIPLIER: try: d = float(dwt) for threshold, mult in _RATE_DWT_MULTIPLIER[vtype]: if d <= threshold: multiplier = mult break except (ValueError, TypeError): pass low_adj = round(low * multiplier) high_adj = round(high * multiplier) unit = '$/TEU' if vtype == 'container' else '$/ton' subtype = classify_vessel_subtype(vtype, dwt) return { 'from': port_a['name'], 'to': port_b['name'], 'vessel_type': vtype, 'vessel_subtype': subtype.get('name') if subtype else None, 'rate_low': low_adj, 'rate_high': high_adj, 'unit': unit, 'benchmark': benchmark, 'dwt': dwt, 'dwt_multiplier': round(multiplier, 2), 'note': 'Indicative market rate range. Actual fixtures depend on market conditions, vessel age, port costs, and negotiation.', } # ============================================================================= # FEATURE 3: SANCTIONS & COMPLIANCE SCREENER # (Extracted to maritime_compliance.py — re-exported for backward compatibility) # ============================================================================= from maritime_compliance import ( SANCTIONED_FLAGS, SANCTIONED_ENTITIES, DARK_FLEET_INDICATORS, screen_sanctions, _max_risk, ) # ============================================================================= # FEATURE 4: PORT CONGESTION PREDICTOR # ============================================================================= # Average port congestion data (wait days by port, updated periodically) # Sources: public port authority reports, shipping indices PORT_CONGESTION = { # port_key: {'wait_days': (low, high), 'berth_utilization': %, 'trend': up/down/stable} 'shanghai': {'wait_days': (1, 3), 'berth_utilization': 85, 'trend': 'stable', 'peak_months': [3, 4, 9, 10, 11]}, 'singapore': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [1, 2, 11, 12]}, 'rotterdam': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [9, 10, 11]}, 'antwerp': {'wait_days': (0.5, 2), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [9, 10, 11]}, 'houston': {'wait_days': (1, 4), 'berth_utilization': 82, 'trend': 'up', 'peak_months': [1, 2, 6, 7, 8]}, 'long_beach': {'wait_days': (0.5, 3), 'berth_utilization': 72, 'trend': 'down', 'peak_months': [8, 9, 10, 11]}, 'los_angeles': {'wait_days': (0.5, 3), 'berth_utilization': 73, 'trend': 'down', 'peak_months': [8, 9, 10, 11]}, 'busan': {'wait_days': (0.5, 1.5), 'berth_utilization': 70, 'trend': 'stable', 'peak_months': [3, 4, 10, 11]}, 'ningbo': {'wait_days': (1, 3), 'berth_utilization': 88, 'trend': 'up', 'peak_months': [3, 4, 9, 10, 11]}, 'hong_kong': {'wait_days': (0.5, 1), 'berth_utilization': 60, 'trend': 'down', 'peak_months': [1, 2, 10, 11]}, 'jebel_ali': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [1, 2, 3, 11, 12]}, 'santos': {'wait_days': (2, 6), 'berth_utilization': 90, 'trend': 'up', 'peak_months': [2, 3, 4, 5]}, 'paranagua': {'wait_days': (3, 10), 'berth_utilization': 92, 'trend': 'up', 'peak_months': [2, 3, 4, 5]}, 'port_hedland': {'wait_days': (1, 4), 'berth_utilization': 88, 'trend': 'stable', 'peak_months': [6, 7, 8, 9]}, 'richards_bay': {'wait_days': (2, 7), 'berth_utilization': 85, 'trend': 'stable', 'peak_months': [6, 7, 8]}, 'new_orleans': {'wait_days': (1, 4), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [9, 10, 11, 12]}, 'novorossiysk': {'wait_days': (2, 8), 'berth_utilization': 85, 'trend': 'up', 'peak_months': [6, 7, 8, 9]}, 'istanbul': {'wait_days': (1, 5), 'berth_utilization': 82, 'trend': 'stable', 'peak_months': [6, 7, 8, 9]}, 'piraeus': {'wait_days': (0.5, 1.5), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [6, 7, 8]}, 'laem_chabang': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [11, 12, 1]}, 'mumbai': {'wait_days': (1, 4), 'berth_utilization': 85, 'trend': 'up', 'peak_months': [10, 11, 12, 1]}, 'durban': {'wait_days': (2, 5), 'berth_utilization': 88, 'trend': 'up', 'peak_months': [1, 2, 3]}, 'callao': {'wait_days': (1, 3), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [2, 3, 4]}, 'vancouver': {'wait_days': (1, 5), 'berth_utilization': 82, 'trend': 'up', 'peak_months': [8, 9, 10, 11]}, 'savannah': {'wait_days': (0.5, 2), 'berth_utilization': 78, 'trend': 'stable', 'peak_months': [8, 9, 10]}, 'new_york': {'wait_days': (0.5, 2), 'berth_utilization': 76, 'trend': 'stable', 'peak_months': [9, 10, 11]}, 'hamburg': {'wait_days': (0.5, 1.5), 'berth_utilization': 70, 'trend': 'down', 'peak_months': [9, 10, 11]}, 'felixstowe': {'wait_days': (0.5, 1.5), 'berth_utilization': 75, 'trend': 'stable', 'peak_months': [9, 10, 11]}, 'tanger_med': {'wait_days': (0.5, 1), 'berth_utilization': 72, 'trend': 'stable', 'peak_months': [6, 7, 8]}, 'port_said': {'wait_days': (0.5, 2), 'berth_utilization': 80, 'trend': 'stable', 'peak_months': [1, 2, 3]}, } def estimate_port_congestion(port_name: str, month: int = None) -> Optional[Dict]: """ Estimate port congestion level and expected wait time. Returns congestion assessment with wait days, level, and trend. """ port = resolve_port(port_name) if not port: return None # Get port key (resolve_port includes 'key') port_key = port.get('key') if not port_key: return None congestion = PORT_CONGESTION.get(port_key) if not congestion: # Default estimate based on region region = PORT_REGIONS.get(port_key, 'OTHER') defaults = { 'EASIA': {'wait_days': (1, 3), 'berth_utilization': 80, 'trend': 'stable'}, 'NEUR': {'wait_days': (0.5, 2), 'berth_utilization': 72, 'trend': 'stable'}, 'MED': {'wait_days': (0.5, 2), 'berth_utilization': 75, 'trend': 'stable'}, 'GULF': {'wait_days': (0.5, 2), 'berth_utilization': 75, 'trend': 'stable'}, 'USEC': {'wait_days': (0.5, 3), 'berth_utilization': 76, 'trend': 'stable'}, 'USWC': {'wait_days': (0.5, 2), 'berth_utilization': 70, 'trend': 'stable'}, } congestion = defaults.get(region, {'wait_days': (1, 3), 'berth_utilization': 75, 'trend': 'stable'}) wait_low, wait_high = congestion['wait_days'] utilization = congestion['berth_utilization'] trend = congestion['trend'] # Seasonal adjustment current_month = month or datetime.now().month peak_months = congestion.get('peak_months', []) is_peak = current_month in peak_months if is_peak: wait_low = round(wait_low * 1.3, 1) wait_high = round(wait_high * 1.5, 1) utilization = min(98, utilization + 8) # Congestion level if utilization >= 90 or wait_high >= 6: level = 'severe' elif utilization >= 80 or wait_high >= 3: level = 'moderate' elif utilization >= 70: level = 'normal' else: level = 'low' return { 'port': port['name'], 'country': port['country'], 'congestion_level': level, 'wait_days': {'low': wait_low, 'high': wait_high}, 'berth_utilization_pct': utilization, 'trend': trend, 'is_peak_season': is_peak, 'peak_months': peak_months, 'note': 'Estimate based on historical patterns and seasonal trends. Real-time congestion may vary.', } # ============================================================================= # FEATURE 5: BUNKER PRICE OPTIMIZER # ============================================================================= # VLSFO bunker prices by major bunkering port (USD/ton, approximate Q1 2026) BUNKER_PRICES = { # port_key: {'vlsfo': $/ton, 'hsfo': $/ton, 'mgo': $/ton, 'supply': 'good'|'limited'} 'singapore': {'vlsfo': 580, 'hsfo': 440, 'mgo': 750, 'supply': 'excellent'}, 'fujairah': {'vlsfo': 570, 'hsfo': 430, 'mgo': 740, 'supply': 'excellent'}, 'rotterdam': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'excellent'}, 'houston': {'vlsfo': 575, 'hsfo': 430, 'mgo': 730, 'supply': 'excellent'}, 'busan': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'}, 'hong_kong': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'good'}, 'shanghai': {'vlsfo': 605, 'hsfo': 460, 'mgo': 780, 'supply': 'good'}, 'antwerp': {'vlsfo': 595, 'hsfo': 455, 'mgo': 775, 'supply': 'good'}, 'hamburg': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'}, 'piraeus': {'vlsfo': 585, 'hsfo': 445, 'mgo': 760, 'supply': 'good'}, 'istanbul': {'vlsfo': 595, 'hsfo': 450, 'mgo': 770, 'supply': 'good'}, 'algeciras': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'good'}, 'tanger_med': {'vlsfo': 580, 'hsfo': 440, 'mgo': 750, 'supply': 'good'}, 'port_said': {'vlsfo': 600, 'hsfo': 460, 'mgo': 790, 'supply': 'limited'}, 'durban': {'vlsfo': 620, 'hsfo': 480, 'mgo': 810, 'supply': 'limited'}, 'cape_town': {'vlsfo': 615, 'hsfo': 475, 'mgo': 800, 'supply': 'limited'}, 'santos': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'limited'}, 'colon': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'good'}, 'new_york': {'vlsfo': 595, 'hsfo': 450, 'mgo': 775, 'supply': 'good'}, 'los_angeles': {'vlsfo': 600, 'hsfo': 460, 'mgo': 785, 'supply': 'good'}, 'long_beach': {'vlsfo': 600, 'hsfo': 460, 'mgo': 785, 'supply': 'good'}, 'vancouver': {'vlsfo': 610, 'hsfo': 470, 'mgo': 790, 'supply': 'good'}, 'jebel_ali': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'excellent'}, 'mumbai': {'vlsfo': 605, 'hsfo': 460, 'mgo': 780, 'supply': 'good'}, 'colombo': {'vlsfo': 595, 'hsfo': 455, 'mgo': 775, 'supply': 'good'}, 'laem_chabang': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'limited'}, 'jeddah': {'vlsfo': 580, 'hsfo': 440, 'mgo': 755, 'supply': 'good'}, 'salalah': {'vlsfo': 575, 'hsfo': 435, 'mgo': 745, 'supply': 'limited'}, 'marsaxlokk': {'vlsfo': 590, 'hsfo': 450, 'mgo': 770, 'supply': 'limited'}, 'kaohsiung': {'vlsfo': 600, 'hsfo': 460, 'mgo': 780, 'supply': 'good'}, } def get_bunker_prices(port_name: str) -> Optional[Dict]: """Get bunker fuel prices for a port + nearby alternatives.""" port = resolve_port(port_name) if not port: return None # Get port key (resolve_port includes 'key') port_key = port.get('key') prices = BUNKER_PRICES.get(port_key) if port_key else None # Find nearby bunkering alternatives alternatives = [] nearby = find_nearby_ports(port['lat'], port['lon'], radius_nm=500) for np in nearby: np_key = np.get('key') if np_key and np_key != port_key and np_key in BUNKER_PRICES: bp = BUNKER_PRICES[np_key] alternatives.append({ 'port': np['name'], 'distance_nm': np['distance_nm'], 'vlsfo': bp['vlsfo'], 'hsfo': bp['hsfo'], 'mgo': bp['mgo'], 'supply': bp['supply'], }) alternatives.sort(key=lambda x: x['vlsfo']) # Sort by cheapest VLSFO result = { 'port': port['name'], 'country': port['country'], } if prices: result.update({ 'vlsfo_usd_ton': prices['vlsfo'], 'hsfo_usd_ton': prices['hsfo'], 'mgo_usd_ton': prices['mgo'], 'supply_level': prices['supply'], }) else: result['note'] = 'No direct bunker price data for this port.' if alternatives: result['alternatives'] = alternatives[:5] # Savings calculation if prices and alternatives: cheapest = alternatives[0] if cheapest['vlsfo'] < prices['vlsfo']: saving = prices['vlsfo'] - cheapest['vlsfo'] result['cheapest_alternative'] = { 'port': cheapest['port'], 'vlsfo': cheapest['vlsfo'], 'saving_per_ton': saving, 'distance_nm': cheapest['distance_nm'], 'note': f'Save ${saving}/ton VLSFO by bunkering at {cheapest["port"]} ({cheapest["distance_nm"]} NM away)', } return result def optimize_bunker_route(from_port: str, to_port: str, vessel_type: str = 'bulk', dwt: float = None) -> Optional[Dict]: """ Find optimal bunkering ports along a route. Returns route with bunker price comparison at key stops. """ route = calculate_sea_route(from_port, to_port, vessel_type, dwt=dwt) if not route: return None # Collect bunkering options along the route bunker_options = [] # Check departure port dep_bunker = get_bunker_prices(from_port) if dep_bunker and dep_bunker.get('vlsfo_usd_ton'): bunker_options.append({ 'port': dep_bunker['port'], 'position': 'departure', 'vlsfo': dep_bunker['vlsfo_usd_ton'], 'supply': dep_bunker.get('supply_level', 'unknown'), }) # Check waypoint ports (canals often have nearby bunkering) waypoint_bunker_ports = { 'gibraltar': 'algeciras', 'suez_n': 'port said', 'bab_el_mandeb': 'jeddah', 'hormuz': 'fujairah', 'malacca': 'singapore', 'panama_atl': 'colon', 'cape_good_hope': 'cape town', 'dover': 'rotterdam', } for wp in route.get('via', []): wp_lower = wp.lower() for waypoint_key, bunker_port_key in waypoint_bunker_ports.items(): if waypoint_key.replace('_', ' ') in wp_lower or wp_lower in waypoint_key: bp = BUNKER_PRICES.get(bunker_port_key) if bp: port_info = WORLD_PORTS.get(bunker_port_key) bunker_options.append({ 'port': port_info['name'] if port_info else bunker_port_key, 'position': 'en_route', 'vlsfo': bp['vlsfo'], 'supply': bp['supply'], }) break # Check arrival port arr_bunker = get_bunker_prices(to_port) if arr_bunker and arr_bunker.get('vlsfo_usd_ton'): bunker_options.append({ 'port': arr_bunker['port'], 'position': 'arrival', 'vlsfo': arr_bunker['vlsfo_usd_ton'], 'supply': arr_bunker.get('supply_level', 'unknown'), }) # Find cheapest if bunker_options: cheapest = min(bunker_options, key=lambda x: x['vlsfo']) cheapest['recommended'] = True # Estimate savings fuel_low, fuel_high = estimate_fuel_consumption(vessel_type, dwt) avg_fuel = (fuel_low + fuel_high) / 2 sailing_days = route['total_days'] total_fuel_tons = avg_fuel * sailing_days return { 'route': {'from': route['from'], 'to': route['to'], 'distance_nm': route['distance_nm'], 'total_days': route['total_days'], 'canals': route['canals']}, 'estimated_fuel_tons': round(total_fuel_tons), 'bunker_options': bunker_options, 'price_spread': { 'cheapest_vlsfo': min(b['vlsfo'] for b in bunker_options) if bunker_options else None, 'most_expensive_vlsfo': max(b['vlsfo'] for b in bunker_options) if bunker_options else None, 'potential_saving_usd': round((max(b['vlsfo'] for b in bunker_options) - min(b['vlsfo'] for b in bunker_options)) * total_fuel_tons) if bunker_options else 0, }, 'note': 'VLSFO prices are approximate and fluctuate daily. Contact bunker suppliers for live quotes.', } # ============================================================================= # FEATURE 6: CHARTER PARTY GENERATOR # ============================================================================= def generate_charter_party(vessel_name: str, cargo_type: str, tonnage: float, from_port: str, to_port: str, rate: float = None, laydays: str = None, demurrage_rate: float = None, charterer: str = None, owner: str = None) -> Optional[Dict]: """Generate a structured charter party agreement.""" port_load = resolve_port(from_port) port_discharge = resolve_port(to_port) if not port_load or not port_discharge: return None route = calculate_sea_route(from_port, to_port, cargo_type) distance = route['distance_nm'] if route else 0 voyage_days = route['total_days'] if route else 0 # Estimate rate if not provided cargo_lower = cargo_type.lower() if cargo_type else 'bulk' if not rate: freight = estimate_freight_rate(from_port, to_port, cargo_lower) rate = freight['rate_high'] if freight else 15.0 # Default laytime calculation (based on tonnage, ~5000t/day loading rate) load_rate = 5000 if tonnage <= 50000 else 8000 if tonnage <= 100000 else 12000 laytime_days = round(tonnage / load_rate, 1) if tonnage else 5 if not demurrage_rate: demurrage_rate = round(tonnage * 0.15 if tonnage else 15000, 0) demurrage_rate = min(max(demurrage_rate, 8000), 80000) from datetime import datetime, timedelta today = datetime.now() laycan_start = today + timedelta(days=14) laycan_end = laycan_start + timedelta(days=5) cp_number = f"CP-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name or 'TBN').encode()).hexdigest()[:8], 16) % 10000:04d}" return { 'cp_number': cp_number, 'date': today.strftime('%Y-%m-%d'), 'type': 'Voyage Charter Party', 'parties': { 'owner': owner or '[Owner name to be inserted]', 'charterer': charterer or '[Charterer name to be inserted]', }, 'vessel': { 'name': vessel_name or 'TBN (To Be Nominated)', 'type': cargo_lower.title(), 'dwt': tonnage, }, 'cargo': { 'type': cargo_type, 'quantity_mt': tonnage, 'tolerance': '5% MOLOO (More or Less at Owner\'s Option)', 'stowage_factor': 'As per standard', }, 'ports': { 'loading': { 'name': port_load['name'], 'country': port_load['country'], 'unlocode': port_load.get('unlocode', ''), 'terms': 'FIOS (Free In/Out Stowed)', }, 'discharge': { 'name': port_discharge['name'], 'country': port_discharge['country'], 'unlocode': port_discharge.get('unlocode', ''), 'terms': 'FIOS (Free In/Out Stowed)', }, }, 'commercial': { 'freight_rate': f"${rate:.2f}/MT", 'total_freight': f"${rate * tonnage:,.0f}", 'payment': 'Within 5 banking days of completion of loading, less address commission 3.75%', 'laycan': laydays or f"{laycan_start.strftime('%d %b')} - {laycan_end.strftime('%d %b %Y')}", }, 'laytime': { 'loading_days': laytime_days, 'discharge_days': laytime_days, 'total_days': laytime_days * 2, 'loading_rate': f"{load_rate:,} MT/day SHINC", 'commencement': 'NOR tendered, whether in berth or not (WIBON), whether in port or not (WIPON)', }, 'demurrage': { 'rate_per_day': f"${demurrage_rate:,.0f}/day", 'despatch': f"${demurrage_rate / 2:,.0f}/day (half demurrage rate)", 'payment': 'Within 30 days of completion of discharge', }, 'voyage': { 'distance_nm': distance, 'estimated_days': voyage_days, 'canals': route.get('canals', []) if route else [], }, 'clauses': [ 'General Average to be settled according to York-Antwerp Rules 2016', 'Both-to-Blame Collision Clause as per Conwartime 2013', 'War Risk Clause — Voywar 2013', 'Ice Clause — if applicable', 'BIMCO Sanctions Clause 2020', 'BIMCO Infectious or Contagious Diseases Clause 2022', 'ISM/ISPS Clause as per BIMCO', 'US Clause Paramount — if US port involved', 'Hague-Visby Rules to apply', ], 'arbitration': 'London, English Law — LMAA Terms', 'note': 'DRAFT — This is an AI-generated charter party template. Must be reviewed by qualified maritime lawyers before execution.', } # ============================================================================= # FEATURE 7: VESSEL PERFORMANCE ANALYTICS # ============================================================================= # Typical fuel consumption by vessel subtype (tons/day at design speed) VESSEL_FUEL_BENCHMARKS = { 'capesize': {'speed_kn': 14.5, 'fuel_td': 55, 'dwt_range': (100000, 200000)}, 'panamax_bulk': {'speed_kn': 14.0, 'fuel_td': 35, 'dwt_range': (65000, 99999)}, 'supramax': {'speed_kn': 13.5, 'fuel_td': 28, 'dwt_range': (50000, 64999)}, 'handysize': {'speed_kn': 13.0, 'fuel_td': 22, 'dwt_range': (15000, 49999)}, 'vlcc': {'speed_kn': 15.0, 'fuel_td': 80, 'dwt_range': (200000, 350000)}, 'suezmax': {'speed_kn': 14.5, 'fuel_td': 55, 'dwt_range': (120000, 199999)}, 'aframax': {'speed_kn': 14.5, 'fuel_td': 45, 'dwt_range': (80000, 119999)}, 'mr_tanker': {'speed_kn': 14.0, 'fuel_td': 30, 'dwt_range': (45000, 79999)}, 'ulcv': {'speed_kn': 22.0, 'fuel_td': 250, 'dwt_range': (150000, 250000)}, 'neo_panamax': {'speed_kn': 21.0, 'fuel_td': 180, 'dwt_range': (100000, 149999)}, 'panamax_container': {'speed_kn': 20.0, 'fuel_td': 130, 'dwt_range': (50000, 99999)}, 'feeder': {'speed_kn': 18.0, 'fuel_td': 50, 'dwt_range': (5000, 49999)}, } def analyze_vessel_performance(vessel_name: str = None, imo: str = None, dwt: float = None, speed: float = None, vessel_type: str = None, year_built: int = None, fuel_consumption: float = None) -> Optional[Dict]: """Analyze vessel performance: fuel efficiency, speed, hull condition.""" if not vessel_type and not dwt: return {'error': 'Vessel type or DWT required for performance analysis'} vtype = (vessel_type or 'bulk').lower() # Find benchmark benchmark = None if dwt: for name, data in VESSEL_FUEL_BENCHMARKS.items(): low, high = data['dwt_range'] if low <= dwt <= high and (vtype in name or name in vtype or vtype == 'bulk' or vtype == 'tanker'): benchmark = {**data, 'subtype': name} break if not benchmark: # Default by type defaults = {'bulk': 'handysize', 'tanker': 'mr_tanker', 'container': 'feeder'} key = defaults.get(vtype, 'handysize') benchmark = {**VESSEL_FUEL_BENCHMARKS[key], 'subtype': key} design_speed = benchmark['speed_kn'] design_fuel = benchmark['fuel_td'] actual_speed = speed or design_speed actual_fuel = fuel_consumption or design_fuel # Vessel age impact current_year = datetime.now().year age = (current_year - year_built) if year_built else 10 age_fuel_penalty = min(age * 0.8, 20) # ~0.8% per year, max 20% expected_fuel = round(design_fuel * (1 + age_fuel_penalty / 100), 1) # Speed vs fuel (cubic relationship) speed_ratio = actual_speed / design_speed if design_speed else 1 theoretical_fuel = round(design_fuel * (speed_ratio ** 3), 1) # Efficiency score (100 = perfect) if actual_fuel > 0 and theoretical_fuel > 0: efficiency = round(min(100, (theoretical_fuel / actual_fuel) * 100), 1) else: efficiency = round(100 - age_fuel_penalty, 1) # Hull condition estimate if efficiency >= 95: hull_condition = 'Excellent — recently dry-docked' elif efficiency >= 85: hull_condition = 'Good — within normal parameters' elif efficiency >= 75: hull_condition = 'Fair — hull cleaning recommended within 3-6 months' elif efficiency >= 65: hull_condition = 'Below average — significant fouling likely, dry-dock recommended' else: hull_condition = 'Poor — urgent hull treatment needed, significant fuel waste' # CII rating estimate (Carbon Intensity Indicator) if efficiency >= 90: cii_rating = 'A' elif efficiency >= 80: cii_rating = 'B' elif efficiency >= 70: cii_rating = 'C' elif efficiency >= 60: cii_rating = 'D' else: cii_rating = 'E' recommendations = [] if actual_speed > design_speed * 0.95: recommendations.append(f'Slow steaming to {design_speed * 0.85:.1f} kn could save ~{round(design_fuel * 0.25)}t/day fuel') if efficiency < 85: recommendations.append('Hull cleaning/dry-docking would improve fuel efficiency by 10-15%') if age > 15: recommendations.append('Consider engine overhaul or retrofit for improved performance') if cii_rating in ('D', 'E'): recommendations.append(f'CII rating {cii_rating} — corrective plan required under IMO regulations') if not recommendations: recommendations.append('Vessel performing within expected parameters') return { 'vessel': vessel_name or 'Unknown', 'imo': imo or '', 'vessel_type': vtype, 'subtype': benchmark['subtype'].replace('_', ' ').title(), 'dwt': dwt, 'age_years': age, 'year_built': year_built, 'performance': { 'design_speed_kn': design_speed, 'actual_speed_kn': actual_speed, 'speed_utilization_pct': round(speed_ratio * 100, 1), 'design_fuel_td': design_fuel, 'expected_fuel_td': expected_fuel, 'actual_fuel_td': actual_fuel, 'efficiency_score': efficiency, }, 'hull_condition': hull_condition, 'cii_rating': cii_rating, 'cii_note': 'IMO Carbon Intensity Indicator (A=best, E=worst). D/E requires corrective action plan.', 'fleet_comparison': f"{'Above' if efficiency >= 80 else 'Below'} average for {benchmark['subtype'].replace('_',' ')} fleet", 'annual_fuel_cost_estimate': { 'at_current_speed': f"${round(actual_fuel * 365 * BUNKER_PRICE_USD):,}", 'at_eco_speed': f"${round(design_fuel * 0.7 * 365 * BUNKER_PRICE_USD):,}", 'potential_annual_saving': f"${round((actual_fuel - design_fuel * 0.7) * 365 * BUNKER_PRICE_USD):,}", }, 'recommendations': recommendations, 'note': 'Performance estimates based on vessel class benchmarks and reported parameters. Actual performance depends on sea/weather conditions, cargo, trim, and maintenance.', } # ============================================================================= # FEATURE 8: DIGITAL BILL OF LADING # ============================================================================= def generate_bill_of_lading(shipper: str, consignee: str, vessel_name: str, from_port: str, to_port: str, cargo_description: str, weight_mt: float = None, packages: int = None, notify_party: str = None, voyage_no: str = None, marks: str = None, freight_terms: str = 'PREPAID') -> Optional[Dict]: """Generate a structured digital Bill of Lading.""" port_load = resolve_port(from_port) port_discharge = resolve_port(to_port) if not port_load or not port_discharge: return None from datetime import datetime today = datetime.now() bl_number = f"SFBL-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name + shipper).encode()).hexdigest()[:8], 16) % 100000:05d}" return { 'bl_number': bl_number, 'date_issued': today.strftime('%Y-%m-%d'), 'type': 'Negotiable OCEAN BILL OF LADING', 'original_copies': 3, 'shipper': { 'name': shipper, 'address': '[Shipper address]', }, 'consignee': { 'name': consignee, 'instruction': 'TO ORDER' if consignee.lower() in ('to order', 'order') else consignee, }, 'notify_party': { 'name': notify_party or consignee, 'address': '[Notify party address]', }, 'vessel': { 'name': vessel_name, 'voyage_no': voyage_no or f"V.{today.strftime('%y')}{int(hashlib.md5(vessel_name.encode()).hexdigest()[:4], 16) % 100:02d}", 'flag': '[Flag state]', }, 'port_of_loading': { 'name': port_load['name'], 'country': port_load['country'], 'unlocode': port_load.get('unlocode', ''), }, 'port_of_discharge': { 'name': port_discharge['name'], 'country': port_discharge['country'], 'unlocode': port_discharge.get('unlocode', ''), }, 'cargo': { 'description': cargo_description, 'marks_and_numbers': marks or 'N/M (No Marks)', 'packages': packages, 'gross_weight_mt': weight_mt, 'measurement': f"{round(weight_mt * 1.2, 1)} CBM" if weight_mt else None, }, 'freight': { 'terms': f'FREIGHT {freight_terms.upper()}', 'payable_at': port_load['name'], }, 'conditions': { 'shipped_on_board': today.strftime('%Y-%m-%d'), 'clean_on_board': True, 'said_to_contain': True, 'shipper_load_stow_count': True, }, 'clauses': [ 'Shipped on board in apparent good order and condition', 'Weight, measure, quantity, quality, contents and value unknown', 'Subject to all terms and conditions of the Charter Party dated as per C/P', 'Clause Paramount — Hague-Visby Rules apply', ], 'place_of_issue': port_load['name'], 'signatory': 'Master or Agent on behalf of the Master', 'note': 'DRAFT — This is a digital B/L template. For legally binding B/L, use eBL platforms (Bolero, essDOCS, WAVE BL) or contact your P&I club.', } # ============================================================================= # FEATURE 9: CREW CHANGE OPTIMIZER # ============================================================================= # Major crew change hubs with airport connectivity and cost factors CREW_CHANGE_HUBS = { 'singapore': {'airport': 'SIN (Changi)', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 800, 'hotel_day': 120, 'connectivity': 10}, 'rotterdam': {'airport': 'AMS (Schiphol)', 'flights': 'excellent', 'visa': 'schengen', 'agent_cost': 1200, 'hotel_day': 150, 'connectivity': 9}, 'hamburg': {'airport': 'HAM', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 1100, 'hotel_day': 140, 'connectivity': 8}, 'piraeus': {'airport': 'ATH', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 900, 'hotel_day': 100, 'connectivity': 8}, 'istanbul': {'airport': 'IST', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 700, 'hotel_day': 80, 'connectivity': 9}, 'dubai': {'airport': 'DXB', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 1000, 'hotel_day': 130, 'connectivity': 10}, 'jebel_ali': {'airport': 'DXB', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 1000, 'hotel_day': 130, 'connectivity': 10}, 'mumbai': {'airport': 'BOM', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 500, 'hotel_day': 60, 'connectivity': 8}, 'manila': {'airport': 'MNL', 'flights': 'good', 'visa': 'easy', 'agent_cost': 400, 'hotel_day': 50, 'connectivity': 9}, 'hong_kong': {'airport': 'HKG', 'flights': 'excellent', 'visa': 'easy', 'agent_cost': 900, 'hotel_day': 140, 'connectivity': 10}, 'shanghai': {'airport': 'PVG', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 800, 'hotel_day': 100, 'connectivity': 8}, 'busan': {'airport': 'PUS', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 700, 'hotel_day': 90, 'connectivity': 7}, 'houston': {'airport': 'IAH', 'flights': 'good', 'visa': 'difficult', 'agent_cost': 1500, 'hotel_day': 130, 'connectivity': 7}, 'new_york': {'airport': 'JFK', 'flights': 'excellent', 'visa': 'difficult', 'agent_cost': 1600, 'hotel_day': 180, 'connectivity': 8}, 'santos': {'airport': 'GRU (São Paulo)', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 900, 'hotel_day': 90, 'connectivity': 7}, 'durban': {'airport': 'DUR', 'flights': 'limited', 'visa': 'moderate', 'agent_cost': 700, 'hotel_day': 70, 'connectivity': 5}, 'cape_town': {'airport': 'CPT', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 750, 'hotel_day': 80, 'connectivity': 6}, 'las_palmas': {'airport': 'LPA', 'flights': 'good', 'visa': 'schengen', 'agent_cost': 800, 'hotel_day': 90, 'connectivity': 7}, 'colombo': {'airport': 'CMB', 'flights': 'good', 'visa': 'easy', 'agent_cost': 450, 'hotel_day': 50, 'connectivity': 7}, 'port_said': {'airport': 'CAI (Cairo)', 'flights': 'good', 'visa': 'moderate', 'agent_cost': 600, 'hotel_day': 60, 'connectivity': 6}, } def optimize_crew_change(current_port: str, next_ports: list = None, crew_count: int = None, vessel_name: str = None) -> Optional[Dict]: """Find optimal crew change ports along vessel route.""" port = resolve_port(current_port) if not port: return None crew_count = crew_count or 20 candidates = [] # Score ports: current port + next ports + nearby hubs ports_to_check = [current_port] if next_ports: ports_to_check.extend(next_ports[:5]) # Also add nearby crew change hubs nearby = find_nearby_ports(port['lat'], port['lon'], radius_nm=500) for n in nearby[:10]: nkey = n.get('key', '') if nkey in CREW_CHANGE_HUBS: ports_to_check.append(n['name']) seen = set() for pname in ports_to_check: p = resolve_port(pname) if not p: continue pkey = p.get('key', '') if pkey in seen: continue seen.add(pkey) hub = CREW_CHANGE_HUBS.get(pkey) if not hub: # Generic scoring for non-hub ports hub = {'airport': 'Regional', 'flights': 'limited', 'visa': 'moderate', 'agent_cost': 1000, 'hotel_day': 100, 'connectivity': 4} # Scoring (0-100) scores = {} conn_map = {'excellent': 10, 'good': 7, 'limited': 4} scores['flight_connectivity'] = hub['connectivity'] * 10 scores['visa_ease'] = {'easy': 90, 'schengen': 80, 'moderate': 60, 'difficult': 30}.get(hub['visa'], 50) scores['cost_efficiency'] = max(0, 100 - int(hub['agent_cost'] / 20)) scores['agent_availability'] = conn_map.get(hub['flights'], 4) * 10 overall = round(sum(scores.values()) / len(scores), 1) est_cost = hub['agent_cost'] + hub['hotel_day'] * 2 + 800 * crew_count # flights estimate candidates.append({ 'port': p['name'], 'country': p['country'], 'airport': hub['airport'], 'scores': scores, 'overall_score': overall, 'estimated_cost_usd': est_cost, 'cost_breakdown': { 'agent_fees': hub['agent_cost'], 'hotel_2_nights': hub['hotel_day'] * 2, 'flights_estimate': 800 * crew_count, 'transport_launch': 500, }, 'visa_requirement': hub['visa'], 'flight_availability': hub['flights'], }) candidates.sort(key=lambda x: -x['overall_score']) return { 'vessel': vessel_name or 'Unknown', 'crew_count': crew_count, 'current_location': port['name'], 'recommended_ports': candidates[:5], 'best_option': candidates[0]['port'] if candidates else None, 'note': 'Scores based on airport connectivity, visa requirements, agent availability, and cost. Actual costs depend on crew nationality, flight routes, and local regulations.', } # ============================================================================= # FEATURE 10: MARITIME INSURANCE CALCULATOR # ============================================================================= # War risk / high risk areas WAR_RISK_ZONES = { 'gulf_of_aden': {'lat_range': (11, 15), 'lon_range': (43, 51), 'premium_pct': 0.05}, 'strait_of_hormuz': {'lat_range': (25, 27), 'lon_range': (54, 57), 'premium_pct': 0.03}, 'red_sea': {'lat_range': (12, 30), 'lon_range': (32, 44), 'premium_pct': 0.07}, 'west_africa_gog': {'lat_range': (-5, 7), 'lon_range': (-5, 10), 'premium_pct': 0.03}, 'black_sea': {'lat_range': (41, 47), 'lon_range': (27, 42), 'premium_pct': 0.10}, 'south_china_sea': {'lat_range': (5, 22), 'lon_range': (105, 120), 'premium_pct': 0.01}, } def calculate_maritime_insurance(vessel_name: str = None, imo: str = None, vessel_type: str = None, dwt: float = None, year_built: int = None, flag: str = None, from_port: str = None, to_port: str = None, cargo_value: float = None, hull_value: float = None) -> Optional[Dict]: """Calculate maritime insurance premiums (H&M, P&I, Cargo, War Risk).""" vtype = (vessel_type or 'bulk').lower() # Estimate hull value if not provided if not hull_value and dwt: age = (datetime.now().year - year_built) if year_built else 10 base_value = dwt * 120 # ~$120/dwt for new vessel depreciation = max(0.2, 1 - age * 0.04) # 4%/year, min 20% residual hull_value = round(base_value * depreciation) hull_value = hull_value or 20000000 cargo_value = cargo_value or 0 # H&M premium (0.15% - 0.45% of hull value) age = (datetime.now().year - year_built) if year_built else 10 hm_base_rate = 0.002 # 0.2% base if age > 20: hm_base_rate += 0.002 elif age > 15: hm_base_rate += 0.001 elif age > 10: hm_base_rate += 0.0005 # Flag discount/surcharge flag_lower = (flag or '').lower() if flag_lower in ('panama', 'liberia', 'marshall islands', 'bahamas', 'singapore', 'hong kong'): hm_base_rate *= 0.95 # Reputable open registries elif flag_lower in SANCTIONED_FLAGS: hm_base_rate *= 2.0 # Sanctioned flag = major surcharge hm_premium = round(hull_value * hm_base_rate) # P&I premium (based on GRT, typically $5-30/GRT) grt_estimate = round(dwt * 0.6) if dwt else 15000 pi_rate = 8 # $/GRT base if age > 20: pi_rate = 15 elif age > 15: pi_rate = 12 pi_premium = round(grt_estimate * pi_rate) # Cargo insurance (0.1% - 0.5% of cargo value) cargo_premium = 0 if cargo_value > 0: cargo_rate = 0.002 # 0.2% base cargo_premium = round(cargo_value * cargo_rate) # War risk premium war_risk = 0 war_zones_crossed = [] if from_port and to_port: p1 = resolve_port(from_port) p2 = resolve_port(to_port) if p1 and p2: for zone_name, zone in WAR_RISK_ZONES.items(): lat_min, lat_max = zone['lat_range'] lon_min, lon_max = zone['lon_range'] # Check if route might cross this zone (simplified) lats = [p1['lat'], p2['lat']] lons = [p1['lon'], p2['lon']] if (min(lats) <= lat_max and max(lats) >= lat_min and min(lons) <= lon_max and max(lons) >= lon_min): war_zones_crossed.append(zone_name.replace('_', ' ').title()) war_risk += round(hull_value * zone['premium_pct']) total = hm_premium + pi_premium + cargo_premium + war_risk risk_factors = [] if age > 20: risk_factors.append(f'Vessel age ({age} years) increases H&M and P&I premiums') if war_zones_crossed: risk_factors.append(f'Route crosses war risk zones: {", ".join(war_zones_crossed)}') if flag_lower in SANCTIONED_FLAGS: risk_factors.append(f'Flag state ({flag}) has sanctions exposure') if not risk_factors: risk_factors.append('Standard risk profile — no significant surcharges') return { 'vessel': vessel_name or 'Unknown', 'vessel_type': vtype, 'dwt': dwt, 'age_years': age, 'hull_value_usd': hull_value, 'cargo_value_usd': cargo_value, 'premiums': { 'hull_and_machinery': {'annual_usd': hm_premium, 'rate_pct': round(hm_base_rate * 100, 3)}, 'p_and_i': {'annual_usd': pi_premium, 'rate_per_grt': pi_rate, 'grt_estimate': grt_estimate}, 'cargo': {'premium_usd': cargo_premium, 'rate_pct': 0.2} if cargo_value else None, 'war_risk': {'premium_usd': war_risk, 'zones': war_zones_crossed} if war_risk else None, }, 'total_annual_premium_usd': total, 'risk_factors': risk_factors, 'recommended_coverage': [ 'H&M (Hull & Machinery) — physical damage to vessel', 'P&I (Protection & Indemnity) — third-party liability, crew, pollution', 'Loss of Hire — income protection during off-hire periods', 'FD&D (Freight, Demurrage & Defence) — legal costs', ], 'note': 'Indicative premiums based on vessel profile and standard market rates. Contact insurance broker for firm quotation. Underwriting subject to survey and claims history.', } # ============================================================================= # FEATURE 11: PORT COST ESTIMATOR # ============================================================================= # Regional port cost multipliers (base = 1.0 for average) def _assign_region(lat, lon): """Assign maritime region from coordinates.""" if lon < -100 and lat > 25: return 'USWC' elif lon < -30 and lat > 25: return 'USEC' elif lon < -30 and -5 < lat <= 25: return 'CARIB' elif lon < -70 and lat <= -5: return 'SAW' elif lon < -30 and lat <= -5: return 'SAE' elif lon > 100 and lat < -10: return 'AUSNZ' elif lon > 100: return 'EASIA' elif 60 < lon <= 100 and lat > 0: return 'SASIA' elif 40 < lon <= 60 and lat > 10: return 'GULF' elif 10 < lon < 45 and lat < -20: return 'SAFR' elif 30 < lon <= 45 and -10 < lat < 20: return 'ERED' elif -5 < lon < 15 and -5 < lat < 15: return 'WAFR' elif 25 < lon <= 40 and 40 < lat <= 48: return 'BSEA' elif lon < 40 and lat > 48: return 'NEUR' elif -10 <= lon <= 40 and 25 < lat <= 48: return 'MED' else: return 'OTHER' PORT_COST_REGIONS = { 'NEUR': 1.3, 'MED': 1.0, 'BSEA': 0.8, 'EASIA': 0.9, 'SASIA': 0.7, 'GULF': 0.85, 'ERED': 0.75, 'USWC': 1.2, 'USEC': 1.25, 'CARIB': 0.9, 'SAE': 0.8, 'SAW': 0.85, 'WAFR': 0.8, 'SAFR': 0.85, 'AUSNZ': 1.15, 'OTHER': 0.9, } def estimate_port_costs(port_name: str, vessel_type: str = 'bulk', dwt: float = None, grt: float = None, duration_days: float = 3) -> Optional[Dict]: """Estimate total port call costs (pilotage, towage, berth, agency, etc.).""" port = resolve_port(port_name) if not port: return None dwt = dwt or 50000 grt = grt or round(dwt * 0.6) vtype = (vessel_type or 'bulk').lower() port_key = port.get('key', '') region = port.get('region') or _assign_region(port.get('lat', 0), port.get('lon', 0)) multiplier = PORT_COST_REGIONS.get(region, 1.0) # Base costs (USD) for 50,000 GRT vessel size_factor = grt / 50000 pilotage_in = round(2500 * size_factor * multiplier) pilotage_out = round(2500 * size_factor * multiplier) towage = round(5000 * size_factor * multiplier) berth_per_day = round(1500 * size_factor * multiplier) mooring = round(800 * size_factor * multiplier) port_dues = round(grt * 0.08 * multiplier) # ~$0.08/GRT light_dues = round(grt * 0.03 * multiplier) # ~$0.03/GRT agency_fees = round(3000 * multiplier) waste_disposal = round(500 * multiplier) fresh_water = round(300 * duration_days) documentation = round(400 * multiplier) # Canal surcharges for specific ports canal_fee = 0 canal_name = None if port_key in ('port_said', 'suez', 'ismailia'): canal_fee = round(dwt * 0.12) canal_name = 'Suez Canal Transit' elif port_key in ('colon', 'balboa', 'cristobal'): canal_fee = round(dwt * 0.10) canal_name = 'Panama Canal Transit' berth_total = round(berth_per_day * duration_days) total = (pilotage_in + pilotage_out + towage + berth_total + mooring + port_dues + light_dues + agency_fees + waste_disposal + fresh_water + documentation + canal_fee) return { 'port': port['name'], 'country': port['country'], 'region': region, 'vessel_type': vtype, 'dwt': dwt, 'grt': grt, 'duration_days': duration_days, 'cost_breakdown': { 'pilotage_inbound': pilotage_in, 'pilotage_outbound': pilotage_out, 'towage': towage, 'berth_charges': berth_total, 'mooring_unmooring': mooring, 'port_dues': port_dues, 'light_dues': light_dues, 'agency_fees': agency_fees, 'waste_disposal': waste_disposal, 'fresh_water': fresh_water, 'documentation': documentation, 'canal_transit': {'fee': canal_fee, 'canal': canal_name} if canal_fee else None, }, 'total_estimated_usd': total, 'cost_per_day': round(total / max(duration_days, 1)), 'comparison': f"{'Above' if multiplier > 1.0 else 'Below'} global average (region factor: {multiplier}x)", 'note': 'Estimated based on port region tariffs and vessel size. Actual costs vary by terminal operator, berth availability, and local regulations. Request proforma DA from port agent for exact figures.', } # ============================================================================= # FEATURE 12: WEATHER ROUTING # ============================================================================= # Seasonal weather patterns by region/month WEATHER_PATTERNS = { 'north_atlantic_winter': {'months': [11, 12, 1, 2, 3], 'lat_range': (35, 65), 'lon_range': (-60, 0), 'risk': 'high', 'wave_m': (4, 8), 'wind_kn': (25, 45), 'speed_loss_pct': 15}, 'north_pacific_winter': {'months': [11, 12, 1, 2, 3], 'lat_range': (35, 60), 'lon_range': (140, 180), 'risk': 'high', 'wave_m': (4, 8), 'wind_kn': (25, 45), 'speed_loss_pct': 15}, 'indian_ocean_monsoon': {'months': [6, 7, 8, 9], 'lat_range': (0, 25), 'lon_range': (50, 80), 'risk': 'moderate', 'wave_m': (3, 6), 'wind_kn': (20, 35), 'speed_loss_pct': 10}, 'typhoon_season_wp': {'months': [7, 8, 9, 10, 11], 'lat_range': (10, 35), 'lon_range': (120, 160), 'risk': 'high', 'wave_m': (3, 10), 'wind_kn': (25, 65), 'speed_loss_pct': 20}, 'hurricane_season_carib': {'months': [6, 7, 8, 9, 10, 11], 'lat_range': (10, 30), 'lon_range': (-90, -50), 'risk': 'high', 'wave_m': (3, 10), 'wind_kn': (25, 65), 'speed_loss_pct': 20}, 'cape_of_good_hope': {'months': list(range(1, 13)), 'lat_range': (-40, -30), 'lon_range': (15, 30), 'risk': 'moderate', 'wave_m': (3, 7), 'wind_kn': (20, 40), 'speed_loss_pct': 12}, 'drake_passage': {'months': list(range(1, 13)), 'lat_range': (-65, -55), 'lon_range': (-70, -55), 'risk': 'high', 'wave_m': (4, 10), 'wind_kn': (30, 50), 'speed_loss_pct': 20}, } def calculate_weather_routing(from_port: str, to_port: str, vessel_type: str = 'bulk', dwt: float = None, departure_month: int = None) -> Optional[Dict]: """Calculate weather-optimized route vs direct route.""" route = calculate_sea_route(from_port, to_port, vessel_type, dwt) if not route: return None port_from = resolve_port(from_port) port_to = resolve_port(to_port) if not port_from or not port_to: return None month = departure_month or datetime.now().month # Check which weather zones the route might cross weather_risks = [] total_speed_loss = 0 for zone_name, pattern in WEATHER_PATTERNS.items(): if month not in pattern['months']: continue lat_min, lat_max = pattern['lat_range'] lon_min, lon_max = pattern['lon_range'] lats = [port_from['lat'], port_to['lat']] lons = [port_from['lon'], port_to['lon']] if (min(lats) <= lat_max and max(lats) >= lat_min and min(lons) <= lon_max and max(lons) >= lon_min): weather_risks.append({ 'zone': zone_name.replace('_', ' ').title(), 'risk_level': pattern['risk'], 'wave_height_m': f"{pattern['wave_m'][0]}-{pattern['wave_m'][1]}m", 'wind_speed_kn': f"{pattern['wind_kn'][0]}-{pattern['wind_kn'][1]} kn", 'speed_reduction': f"{pattern['speed_loss_pct']}%", }) total_speed_loss = max(total_speed_loss, pattern['speed_loss_pct']) direct_days = route['total_days'] weather_delay = round(direct_days * total_speed_loss / 100, 1) optimized_extra_nm = round(route['distance_nm'] * 0.03) if total_speed_loss > 10 else 0 optimized_days = round(direct_days + weather_delay * 0.5 + optimized_extra_nm / (route.get('speed_knots', 14) * 24), 1) # Fuel comparison speed = route.get('speed_knots', 14) fuel_day = VESSEL_FUEL_BENCHMARKS.get('handysize', {}).get('fuel_td', 30) if dwt: for _, bench in VESSEL_FUEL_BENCHMARKS.items(): if bench['dwt_range'][0] <= dwt <= bench['dwt_range'][1]: fuel_day = bench['fuel_td'] break direct_fuel = round(direct_days * fuel_day) optimized_fuel = round(optimized_days * fuel_day * 0.9) # Eco speed on longer route fuel_saving = direct_fuel - optimized_fuel return { 'from': route['from'], 'to': route['to'], 'departure_month': month, 'direct_route': { 'distance_nm': route['distance_nm'], 'estimated_days': direct_days, 'fuel_consumption_mt': direct_fuel, 'canals': route.get('canals', []), }, 'optimized_route': { 'distance_nm': route['distance_nm'] + optimized_extra_nm, 'estimated_days': optimized_days, 'fuel_consumption_mt': optimized_fuel, 'deviation_nm': optimized_extra_nm, 'strategy': 'Great circle with weather avoidance' if total_speed_loss > 10 else 'Direct route acceptable', }, 'weather_risks': weather_risks, 'savings': { 'time_saved_vs_bad_weather': f"{weather_delay - (optimized_days - direct_days):.1f} days", 'fuel_difference_mt': fuel_saving, 'fuel_cost_difference_usd': round(fuel_saving * BUNKER_PRICE_USD), }, 'recommendations': [ f"{'Route deviation recommended — significant weather risk' if total_speed_loss >= 15 else 'Direct route feasible with monitoring' if total_speed_loss >= 5 else 'Direct route recommended — low weather risk'}", f"Monitor {'tropical cyclone warnings' if any('typhoon' in r['zone'].lower() or 'hurricane' in r['zone'].lower() for r in weather_risks) else 'weather forecasts'}" if weather_risks else 'Standard weather monitoring sufficient', ], 'note': 'Weather routing based on seasonal patterns. For real-time optimization, subscribe to routing services (StormGeo, DTN, WRI).', } # ============================================================================= # FEATURE 13: FIXTURE RECAP GENERATOR # ============================================================================= def generate_fixture_recap(vessel_name: str, cargo_type: str, tonnage: float, from_port: str, to_port: str, rate: float, laycan: str = None, charterer: str = None, owner: str = None, demurrage_rate: float = None, commission: float = 3.75) -> Optional[Dict]: """Generate a fixture recap summary.""" port_load = resolve_port(from_port) port_discharge = resolve_port(to_port) if not port_load or not port_discharge: return None route = calculate_sea_route(from_port, to_port) from datetime import datetime, timedelta today = datetime.now() recap_number = f"FIX-{today.strftime('%Y%m%d')}-{int(hashlib.md5((vessel_name + cargo_type).encode()).hexdigest()[:8], 16) % 10000:04d}" if not laycan: start = today + timedelta(days=10) end = start + timedelta(days=5) laycan = f"{start.strftime('%d %b')} - {end.strftime('%d %b %Y')}" if not demurrage_rate: demurrage_rate = round(max(8000, min(80000, tonnage * 0.15))) load_rate = 5000 if (tonnage or 0) <= 50000 else 8000 if (tonnage or 0) <= 100000 else 12000 return { 'recap_number': recap_number, 'date': today.strftime('%Y-%m-%d %H:%M UTC'), 'status': 'ON SUBJECTS — Subject to charterer/owner approval within 24 hours', 'principals': { 'owner': owner or '[Owner / Disponent Owner]', 'charterer': charterer or '[Charterer]', }, 'vessel': { 'name': vessel_name, 'type': cargo_type.title() if cargo_type else 'TBN', 'dwt': tonnage, 'flag': '[TBC]', 'year_built': '[TBC]', 'class': '[TBC]', }, 'cargo': { 'description': cargo_type, 'quantity': f"{tonnage:,.0f} MT" if tonnage else 'TBC', 'tolerance': '5% MOLOO', }, 'loading': { 'port': port_load['name'], 'country': port_load['country'], 'terms': 'FIOS', 'rate': f"{load_rate:,} MT/SHINC", }, 'discharge': { 'port': port_discharge['name'], 'country': port_discharge['country'], 'terms': 'FIOS', 'rate': f"{load_rate:,} MT/SHINC", }, 'laycan': laycan, 'freight': { 'rate': f"${rate:.2f}/MT", 'total': f"${rate * tonnage:,.0f}" if tonnage else 'TBC', 'payment': '95% within 5 banking days of signing B/L, 5% on completion of discharge', }, 'demurrage_despatch': { 'demurrage': f"${demurrage_rate:,.0f}/day pro rata", 'despatch': f"${demurrage_rate / 2:,.0f}/day pro rata (half demurrage)", }, 'commission': f"{commission}% total — {commission - 1.25}% address + 1.25% brokerage", 'voyage_estimate': { 'distance_nm': route['distance_nm'] if route else 0, 'days': route['total_days'] if route else 0, 'via': route.get('canals', []) if route else [], }, 'governing_law': 'English Law, London Arbitration (LMAA)', 'cp_form': 'GENCON 2022 (or applicable standard form)', 'subjects': [ 'Subject charterer board approval — 24 hours', 'Subject owner confirmation — 24 hours', 'Subject stem confirmation', 'Subject satisfactory vessel inspection', ], 'note': 'DRAFT RECAP — This is an AI-generated fixture recap template. All terms subject to main charter party agreement.', } # ============================================================================= # FEATURE 14: AIS ANOMALY DETECTOR # FEATURE 15: DARK FLEET DETECTOR # (Extracted to maritime_compliance.py — re-exported for backward compatibility) # ============================================================================= from maritime_compliance import STS_HOTSPOTS, detect_ais_anomalies, detect_dark_fleet # ============================================================================= # TILE FETCHER — curl_cffi (Cloudflare bypass, no browser) # ============================================================================= def lat_lon_to_tile(lat, lon, zoom): """Convert lat/lon to slippy map tile X/Y at given zoom level.""" n = 2 ** zoom x = int((lon + 180.0) / 360.0 * n) y = int((1.0 - math.log(math.tan(math.radians(lat)) + 1.0 / math.cos(math.radians(lat))) / math.pi) / 2.0 * n) return max(0, min(x, n - 1)), max(0, min(y, n - 1)) def _fetch_mt_tile(z, x, y): """Fetch single MarineTraffic tile via curl_cffi (TLS fingerprint bypass). Returns list of raw vessel dicts from tile data, or empty list on failure.""" if not _HAS_CURL_CFFI: return [] url = f'https://www.marinetraffic.com/getData/get_data_json_4/z:{z}/X:{x}/Y:{y}/station:0' headers = { 'Referer': f'https://www.marinetraffic.com/en/ais/home/centerx:0/centery:0/zoom:{z}', 'X-Requested-With': 'XMLHttpRequest', } try: resp = cf_requests.get(url, headers=headers, impersonate='chrome', timeout=15) if resp.status_code == 200: data = resp.json() if isinstance(data, dict): rows = data.get('data', data) if isinstance(rows, dict): rows = rows.get('rows', []) return rows if isinstance(rows, list) else [] return [] except Exception as e: logger.debug(f"Tile z:{z}/X:{x}/Y:{y} fetch error: {e}") return [] LIVE_SCAN_ZOOM = 3 # Free MT API limit: z:2-4 only. z:3 = best per-tile density. def live_scan_area(lat, lon, radius_nm, zoom=LIVE_SCAN_ZOOM): """Live scan MarineTraffic tiles covering area around (lat, lon). Uses curl_cffi to bypass Cloudflare — no browser needed. Free MT API works at z:2-4 only (z:5+ requires Pro login). At z:3: 1-4 tiles per port query, ~1 second. Returns list of raw MT vessel row dicts (SHIP_ID, SHIPNAME, LAT, LON, etc). """ if not _HAS_CURL_CFFI: logger.warning("curl_cffi not available, live_scan_area disabled") return [] delta_lat = radius_nm / 60.0 cos_lat = max(math.cos(math.radians(lat)), 0.01) delta_lon = radius_nm / (60.0 * cos_lat) x_min, y_min = lat_lon_to_tile(lat + delta_lat, lon - delta_lon, zoom) # NW x_max, y_max = lat_lon_to_tile(lat - delta_lat, lon + delta_lon, zoom) # SE all_rows = [] tile_count = 0 for x in range(x_min, x_max + 1): for y in range(y_min, y_max + 1): rows = _fetch_mt_tile(zoom, x, y) all_rows.extend(rows) tile_count += 1 logger.info(f"live_scan_area({lat:.1f}, {lon:.1f}, {radius_nm}nm): " f"{tile_count} tiles z{zoom}, {len(all_rows)} raw vessels") return all_rows # ============================================================================= # CONVENIENCE FUNCTIONS # ============================================================================= _parser = None def get_parser() -> MarineTrafficParser: """Get singleton parser instance""" global _parser if _parser is None: _parser = MarineTrafficParser() return _parser def search_vessel(query: str) -> List[Dict]: """Quick vessel search""" return get_parser().search_vessel_public(query) def get_vessel(mmsi: str) -> Dict: """Get vessel details""" return get_parser().get_vessel_page(mmsi) if __name__ == "__main__": # Test parser = MarineTrafficParser() logger.info("API key configured: %s", parser.has_api_key()) # Test public search logger.info("Searching for 'MAERSK'...") results = parser.search_vessel_public("MAERSK") for v in results[:5]: logger.info(" - %s", v) # Test port resolution logger.info("Port resolution test:") for name in ["Rotterdam", "dubai", "SGSIN", "pusan"]: port = resolve_port(name) if port: logger.info(f" {name} -> {port['name']} ({port['country']}) [{port['lat']}, {port['lon']}]") else: logger.info(f" {name} -> NOT FOUND")