#!/usr/bin/env python3 """ Maritime Compliance Module — Sanctions, AIS Anomaly, Dark Fleet Detection Extracted from marinetraffic_parser.py for modularity. All functions are self-contained with no dependencies on routing/ports/scraping. """ import logging from datetime import datetime from typing import Dict logger = logging.getLogger('maritime_compliance') # ============================================================================= # SANCTIONED FLAGS & ENTITIES # ============================================================================= # Sanctioned flag states (OFAC, EU, UN — as of 2025-2026) SANCTIONED_FLAGS = { 'north korea': {'level': 'critical', 'lists': ['OFAC SDN', 'UN', 'EU'], 'code': 'KP'}, 'dprk': {'level': 'critical', 'lists': ['OFAC SDN', 'UN', 'EU'], 'code': 'KP'}, 'iran': {'level': 'critical', 'lists': ['OFAC SDN', 'EU'], 'code': 'IR'}, 'syria': {'level': 'critical', 'lists': ['OFAC SDN', 'EU'], 'code': 'SY'}, 'cuba': {'level': 'high', 'lists': ['OFAC SDN'], 'code': 'CU'}, 'crimea': {'level': 'high', 'lists': ['OFAC', 'EU'], 'code': '-'}, 'russia': {'level': 'elevated', 'lists': ['OFAC sectoral', 'EU partial'], 'code': 'RU'}, 'venezuela': {'level': 'elevated', 'lists': ['OFAC sectoral'], 'code': 'VE'}, 'myanmar': {'level': 'elevated', 'lists': ['OFAC targeted', 'EU targeted'], 'code': 'MM'}, 'belarus': {'level': 'elevated', 'lists': ['OFAC', 'EU'], 'code': 'BY'}, } # High-risk company keywords (sanctioned entities) SANCTIONED_ENTITIES = [ 'irisl', 'national iranian', 'cosco dalian', 'cosco shipping (dalian)', 'ocean mighty', 'glory wealth', 'dark fleet', 'patriot', 'korea ocean shipping', 'pyongyang', 'korea kumgang', 'syria petroleum', 'sytrol', 'rosneft trading', 'sovcomflot', # Partial sanctions 'pdvsa', 'petroleos de venezuela', ] # Dark fleet indicators DARK_FLEET_INDICATORS = [ 'ais_gaps', # Frequent AIS switch-off 'flag_changes', # Multiple flag changes in short period 'sts_transfers', # Ship-to-ship transfers in suspicious areas 'age_25plus', # Vessel age > 25 years (tankers) 'unknown_owner', # Opaque ownership structure ] # ============================================================================= # STS TRANSFER HOTSPOTS # ============================================================================= # High-risk STS transfer zones STS_HOTSPOTS = { 'south_laconia_greece': {'lat_range': (36, 37), 'lon_range': (22, 24)}, 'off_ceuta': {'lat_range': (35, 36), 'lon_range': (-6, -5)}, 'malacca_strait': {'lat_range': (1, 4), 'lon_range': (101, 104)}, 'off_lomé_togo': {'lat_range': (5, 7), 'lon_range': (0, 2)}, 'east_malaysia': {'lat_range': (4, 7), 'lon_range': (110, 115)}, } # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def _max_risk(current: str, new: str) -> str: """Return the higher of two risk levels.""" levels = ['clear', 'low', 'elevated', 'high', 'critical'] return new if levels.index(new) > levels.index(current) else current # ============================================================================= # SANCTIONS & COMPLIANCE SCREENER # ============================================================================= def screen_sanctions(vessel_name: str = None, imo: str = None, flag: str = None, companies: list = None, vessel_type: str = None, year_built: int = None) -> Dict: """ Screen vessel/company against sanctions lists. Returns risk assessment with level, flags, and recommendations. """ risks = [] risk_level = 'clear' # clear → low → elevated → high → critical # 1. Flag screening if flag: flag_lower = flag.lower().strip() for sanctioned, info in SANCTIONED_FLAGS.items(): if sanctioned in flag_lower or info['code'].lower() == flag_lower: risks.append({ 'type': 'sanctioned_flag', 'severity': info['level'], 'detail': f"Flag state '{flag}' is on {', '.join(info['lists'])} sanctions lists", 'lists': info['lists'], }) risk_level = _max_risk(risk_level, info['level']) # 2. Company screening if companies: for company in companies: c_name = '' if isinstance(company, dict): c_name = (company.get('name', '') or '').lower() elif isinstance(company, str): c_name = company.lower() for entity in SANCTIONED_ENTITIES: if entity in c_name: risks.append({ 'type': 'sanctioned_entity', 'severity': 'high', 'detail': f"Company '{c_name}' matches sanctioned entity pattern '{entity}'", }) risk_level = _max_risk(risk_level, 'high') break # 3. Dark fleet indicators dark_fleet_flags = [] if vessel_type and vessel_type.lower() in ('tanker', 'oil tanker', 'crude oil tanker'): if year_built: try: age = 2026 - int(year_built) if age >= 25: dark_fleet_flags.append('age_25plus') risks.append({ 'type': 'dark_fleet_indicator', 'severity': 'elevated', 'detail': f"Vessel is {age} years old — common in dark/shadow fleet operations", }) risk_level = _max_risk(risk_level, 'elevated') elif age >= 20: dark_fleet_flags.append('aging_vessel') risks.append({ 'type': 'dark_fleet_indicator', 'severity': 'low', 'detail': f"Vessel is {age} years old — monitor for fleet age concerns", }) except (ValueError, TypeError): pass # 4. Name-based heuristics (common in sanctioned fleets) if vessel_name: vn = vessel_name.lower() # Frequent name changes (vessels with very generic names) generic_names = ['glory', 'success', 'fortune', 'lucky', 'star', 'ocean', 'sea', 'diamond', 'golden'] generic_count = sum(1 for g in generic_names if g in vn) if generic_count >= 2: risks.append({ 'type': 'name_pattern', 'severity': 'low', 'detail': 'Vessel name contains multiple generic keywords — common in dark fleet renaming patterns', }) # Build result result = { 'vessel_name': vessel_name, 'imo': imo, 'flag': flag, 'risk_level': risk_level, 'risk_count': len(risks), 'risks': risks, 'screened_against': ['OFAC SDN', 'OFAC Sectoral', 'EU Consolidated', 'UN Security Council'], } if risk_level == 'clear': result['recommendation'] = 'No sanctions flags detected. Standard due diligence applies.' elif risk_level == 'low': result['recommendation'] = 'Minor flags noted. Proceed with enhanced due diligence.' elif risk_level == 'elevated': result['recommendation'] = 'Elevated risk. Obtain legal counsel before proceeding. Check latest OFAC/EU advisories.' elif risk_level == 'high': result['recommendation'] = 'HIGH RISK — sanctioned entity match. Do NOT proceed without legal clearance. Report to compliance.' elif risk_level == 'critical': result['recommendation'] = 'CRITICAL — vessel/entity appears on primary sanctions lists. DO NOT ENGAGE. Report to compliance officer immediately.' return result # ============================================================================= # AIS ANOMALY DETECTOR # ============================================================================= def detect_ais_anomalies(vessel_name: str = None, imo: str = None, mmsi: str = None, flag: str = None, vessel_type: str = None, year_built: int = None, last_known_lat: float = None, last_known_lon: float = None, ais_gap_hours: float = None) -> Dict: """Detect AIS behavioral anomalies for compliance screening.""" anomalies = [] risk_score = 0 # 0-100 # 1. AIS gap detection if ais_gap_hours and ais_gap_hours > 12: severity = 'critical' if ais_gap_hours > 72 else 'high' if ais_gap_hours > 48 else 'elevated' if ais_gap_hours > 24 else 'moderate' risk_score += min(30, int(ais_gap_hours / 2)) anomalies.append({ 'type': 'ais_gap', 'severity': severity, 'detail': f"AIS signal gap of {ais_gap_hours:.0f} hours detected", 'implication': 'Possible intentional AIS switch-off — common indicator of sanctions evasion or illicit STS transfer', }) # 2. Location in STS hotspot if last_known_lat is not None and last_known_lon is not None: for zone_name, zone in STS_HOTSPOTS.items(): lat_min, lat_max = zone['lat_range'] lon_min, lon_max = zone['lon_range'] if lat_min <= last_known_lat <= lat_max and lon_min <= last_known_lon <= lon_max: risk_score += 20 anomalies.append({ 'type': 'sts_hotspot', 'severity': 'high', 'detail': f"Vessel located in known STS transfer zone: {zone_name.replace('_', ' ').title()}", 'implication': 'Ship-to-ship transfers in this area are flagged for potential sanctions evasion', }) # 3. Flag risk if flag: flag_lower = flag.lower() for sanctioned, info in SANCTIONED_FLAGS.items(): if sanctioned in flag_lower: risk_score += 15 anomalies.append({ 'type': 'flag_risk', 'severity': info['level'], 'detail': f"Flag state '{flag}' on {', '.join(info['lists'])} lists", 'implication': 'Enhanced due diligence required for all commercial engagements', }) break # 4. Vessel age risk (for tankers especially) if year_built and vessel_type: age = datetime.now().year - year_built vtype = vessel_type.lower() if 'tanker' in vtype and age > 20: risk_score += 10 anomalies.append({ 'type': 'aged_tanker', 'severity': 'elevated', 'detail': f"Tanker vessel aged {age} years (built {year_built})", 'implication': 'Aged tankers are commonly used in dark fleet operations. Check P&I coverage and ownership history.', }) # 5. Spoofing indicators (if position seems wrong for vessel type) if last_known_lat is not None and last_known_lon is not None: if abs(last_known_lat) > 80: risk_score += 15 anomalies.append({ 'type': 'position_anomaly', 'severity': 'high', 'detail': f"Position reported in extreme latitude ({last_known_lat:.1f}\u00b0)", 'implication': 'Possible AIS spoofing or GPS manipulation', }) # Determine overall risk level if risk_score >= 60: risk_level = 'critical' elif risk_score >= 40: risk_level = 'high' elif risk_score >= 20: risk_level = 'elevated' elif risk_score > 0: risk_level = 'low' else: risk_level = 'clear' recommendations = [] if risk_level in ('critical', 'high'): recommendations.extend([ 'Do NOT engage commercially without enhanced due diligence', 'Verify vessel ownership chain and beneficial owner', 'Check P&I club status and insurance coverage', 'Request recent port state control inspection reports', 'Monitor for further AIS anomalies over next 7 days', ]) elif risk_level == 'elevated': recommendations.extend([ 'Enhanced monitoring recommended', 'Verify P&I and H&M insurance status', 'Request vessel particulars and ownership documentation', ]) else: recommendations.append('No significant anomalies detected — standard monitoring sufficient') return { 'vessel': vessel_name or 'Unknown', 'imo': imo or '', 'mmsi': mmsi or '', 'flag': flag or '', 'risk_score': risk_score, 'risk_level': risk_level, 'anomalies_found': len(anomalies), 'anomalies': anomalies, 'recommendations': recommendations, 'screened_against': [ 'AIS gap detection (>12h threshold)', 'STS transfer hotspot zones (5 regions)', 'Sanctions flag screening (OFAC/EU/UN)', 'Dark fleet age profiling', 'Position anomaly / spoofing detection', ], 'note': 'AIS anomaly assessment based on available data points. For real-time monitoring, integrate with live AIS feed (Spire, Kpler, MarineTraffic).', } # ============================================================================= # DARK FLEET DETECTOR # ============================================================================= def detect_dark_fleet(vessel_name: str = None, imo: str = None, flag: str = None, year_built: int = None, owner: str = None, vessel_type: str = None, dwt: float = None, previous_flags: list = None, p_and_i_club: str = None, ais_gaps_30d: int = None) -> Dict: """Screen vessel for dark/shadow fleet indicators.""" indicators = [] score = 0 # 0-100 vtype = (vessel_type or '').lower() age = (datetime.now().year - year_built) if year_built else 0 # 1. Vessel age (critical for tankers) if age > 25 and 'tanker' in vtype: score += 20 indicators.append({ 'indicator': 'Aged tanker (>25 years)', 'severity': 'high', 'detail': f"Built {year_built}, age {age} years. Tankers >20 years are predominant in dark fleet.", 'weight': 20, }) elif age > 20 and 'tanker' in vtype: score += 12 indicators.append({ 'indicator': 'Aging tanker (>20 years)', 'severity': 'elevated', 'detail': f"Built {year_built}, age {age} years. Approaching dark fleet age profile.", 'weight': 12, }) elif age > 15: score += 5 indicators.append({ 'indicator': 'Older vessel', 'severity': 'low', 'detail': f"Built {year_built}, age {age} years.", 'weight': 5, }) # 2. Flag state risk if flag: flag_lower = flag.lower() for sanctioned, info in SANCTIONED_FLAGS.items(): if sanctioned in flag_lower: weight = {'critical': 25, 'high': 20, 'elevated': 10}.get(info['level'], 5) score += weight indicators.append({ 'indicator': f"Sanctioned flag state ({flag})", 'severity': info['level'], 'detail': f"On {', '.join(info['lists'])} lists. Dark fleet commonly uses sanctioned or FOC flags.", 'weight': weight, }) break else: # Flags of Convenience (FOC) commonly used by dark fleet foc_flags = ['cameroon', 'palau', 'gabon', 'togo', 'tanzania', 'comoros', 'equatorial guinea', 'sao tome'] if flag_lower in foc_flags: score += 15 indicators.append({ 'indicator': f"High-risk Flag of Convenience ({flag})", 'severity': 'elevated', 'detail': 'Flag commonly associated with dark fleet operations and limited regulatory oversight.', 'weight': 15, }) # 3. Flag changes if previous_flags and len(previous_flags) >= 3: score += 15 indicators.append({ 'indicator': f"Multiple flag changes ({len(previous_flags)} flags)", 'severity': 'high', 'detail': f"Previous flags: {', '.join(previous_flags)}. Frequent flag-hopping is a dark fleet indicator.", 'weight': 15, }) # 4. P&I club status if p_and_i_club: reputable_clubs = ['gard', 'britannia', 'standard', 'steamship mutual', 'north', 'skuld', 'swedish', 'west of england', 'japan', 'london', 'american', 'shipowners', 'uk p&i'] if not any(club in p_and_i_club.lower() for club in reputable_clubs): score += 15 indicators.append({ 'indicator': f"Non-IG P&I club ({p_and_i_club})", 'severity': 'elevated', 'detail': 'Not a member of International Group of P&I Clubs. Dark fleet vessels often lack reputable P&I coverage.', 'weight': 15, }) else: score += 10 indicators.append({ 'indicator': 'P&I club status unknown', 'severity': 'moderate', 'detail': 'Unable to verify P&I coverage. Vessels without proper insurance are high risk.', 'weight': 10, }) # 5. Ownership opacity if owner: owner_lower = owner.lower() suspicious_keywords = ['single purpose', 'one ship', 'special purpose', 'unknown', 'nominee'] if any(kw in owner_lower for kw in suspicious_keywords): score += 15 indicators.append({ 'indicator': 'Opaque ownership structure', 'severity': 'high', 'detail': f"Owner '{owner}' suggests single-purpose entity — common dark fleet structure to limit liability.", 'weight': 15, }) # Check sanctioned entities for entity in SANCTIONED_ENTITIES: if entity in owner_lower: score += 25 indicators.append({ 'indicator': f"Owner matches sanctioned entity", 'severity': 'critical', 'detail': f"Owner '{owner}' matches known sanctioned entity pattern.", 'weight': 25, }) break # 6. AIS behavior if ais_gaps_30d and ais_gaps_30d > 5: score += 15 indicators.append({ 'indicator': f"Frequent AIS gaps ({ais_gaps_30d} in 30 days)", 'severity': 'high', 'detail': 'Frequent AIS switch-offs are primary indicator of sanctions-evading dark fleet operations.', 'weight': 15, }) # Overall assessment score = min(100, score) if score >= 70: risk_level = 'critical' classification = 'HIGH PROBABILITY dark fleet vessel' elif score >= 50: risk_level = 'high' classification = 'LIKELY dark fleet / shadow fleet vessel' elif score >= 30: risk_level = 'elevated' classification = 'POSSIBLE dark fleet — enhanced due diligence required' elif score >= 10: risk_level = 'low' classification = 'LOW risk — minor indicators present' else: risk_level = 'clear' classification = 'No dark fleet indicators detected' compliance = { 'can_engage': risk_level in ('clear', 'low'), 'requires_due_diligence': risk_level in ('elevated',), 'do_not_engage': risk_level in ('high', 'critical'), } return { 'vessel': vessel_name or 'Unknown', 'imo': imo or '', 'vessel_type': vtype or 'unknown', 'dwt': dwt, 'flag': flag or 'Unknown', 'year_built': year_built, 'age_years': age, 'dark_fleet_score': score, 'risk_level': risk_level, 'classification': classification, 'indicators_found': len(indicators), 'indicators': indicators, 'compliance_status': compliance, 'screened_against': [ 'Vessel age profiling (>20yr tankers)', 'OFAC/EU/UN sanctions flag screening', 'Flag-hopping detection', 'P&I club verification (IG membership)', 'Beneficial ownership opacity check', 'Sanctioned entity matching', 'AIS behavior analysis', ], 'recommendation': ( 'DO NOT ENGAGE — vessel shows multiple dark fleet indicators. Report to compliance team.' if risk_level in ('critical', 'high') else 'ENHANCED DUE DILIGENCE required before engagement. Verify ownership, insurance, and recent trading history.' if risk_level == 'elevated' else 'Standard due diligence sufficient. Monitor for changes.' ), 'note': 'Dark fleet screening based on publicly available indicators. For comprehensive screening, cross-reference with Windward, Kpler, or IHS Markit databases.', }