#!/usr/bin/env python3 """ Bulk Carrier Ownership Scraper — MarineTraffic Pro Edition ========================================================== Systematically scrapes bulk carrier fleet data from MarineTraffic: - Vessel specs: name, flag, year built, DWT, IMO - Ownership: Beneficial Owner, Registered Owner, Commercial Manager, Operator/Charterer - Company websites: attempts to fetch from MT company profile pages Data is saved to the SeaFare Montana PostgreSQL/SQLite database. Usage: python bulk_carrier_scraper.py --login EMAIL --password PASS python bulk_carrier_scraper.py --login EMAIL --password PASS --limit 500 --delay 2.5 python bulk_carrier_scraper.py --login EMAIL --password PASS --resume # continue from checkpoint python bulk_carrier_scraper.py --login EMAIL --password PASS --test # test 10 vessels, don't save Environment variables (alternative to CLI args): MT_LOGIN=email@example.com MT_PASSWORD=yourpassword Output: - Vessels saved to DB (vessels table: name, flag, imo, dwt, year_built, owner, operator, companies_json) - Owners saved to contacts table (company_name, country, website, vessels_mmsi) - Checkpoint file: scraper_checkpoint.json (resume support) - Log file: scraper.log """ import os import sys import json import time import logging import argparse import math import traceback from datetime import datetime from typing import List, Dict, Optional, Set # Setup logging first logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('scraper.log', encoding='utf-8'), ] ) logger = logging.getLogger('bulk_scraper') # ============================================================================= # GLOBAL COVERAGE GRID — Key maritime regions # ============================================================================= # Each point is (lat, lon) representing center of a scrape area (~150 NM radius) # Covers all major bulk carrier routes globally GRID_POINTS = [ # ========================================================================== # === 1. CASPIAN SEA (start here) === # ========================================================================== (41.5, 50.5), # Baku area (Azerbaijan) (43.5, 51.5), # Aktau area (Kazakhstan) (40.0, 53.0), # Turkmenbashi area (Turkmenistan) (46.5, 48.0), # Astrakhan area (Russia) (37.5, 49.5), # Anzali / Nowshahr (Iran, south Caspian) # ========================================================================== # === 2. BLACK SEA + AZOV (bridge between Caspian and Med) === # ========================================================================== (41.0, 29.0), # Istanbul / Bosphorus (gateway to Med) (43.0, 33.0), # Central Black Sea (45.5, 30.0), # Odessa / Kherson area (Ukraine) (45.0, 36.5), # Kerch Strait / Azov Sea (43.5, 28.0), # Constanta area (Romania) (41.8, 41.7), # Batumi / Poti (Georgia) (44.5, 38.0), # Novorossiysk (Russia, key grain port) # ========================================================================== # === 3. MEDITERRANEAN SEA === # ========================================================================== (37.5, 23.0), # Greece / Aegean (Piraeus) (35.5, 28.5), # Rhodes / Eastern Med (33.0, 35.5), # Beirut / Israel coast (36.5, 14.0), # Central Med (Malta) (37.0, 10.0), # Tunisia / Sfax (36.5, 2.5), # Algeria (Algiers / Oran) (43.5, 5.0), # Marseille area (France) (38.0, 15.5), # Messina Strait / Sicily (40.6, 14.3), # Naples / South Italy (44.4, 12.5), # Ravenna / Adriatic (37.9, 23.7), # Piraeus (biggest Med bulk hub) # ========================================================================== # === 4. BALTIC SEA === # ========================================================================== (59.3, 18.1), # Stockholm (Sweden) (57.0, 24.0), # Riga (Latvia) (59.5, 24.8), # Tallinn / Helsinki (Estonia/Finland) (54.4, 18.6), # Gdansk / Gdynia (Poland, key grain port) (59.9, 29.5), # St. Petersburg (Russia) (54.1, 12.1), # Rostock (Germany) (57.5, 11.5), # Gothenburg (Sweden) (55.7, 12.5), # Copenhagen / Oresund (Denmark) (60.5, 22.3), # Turku (Finland) (56.0, 21.0), # Klaipeda (Lithuania) (58.4, 24.5), # Paldiski (Estonia) # ========================================================================== # === 5. NORTH SEA / NW EUROPE === # ========================================================================== (51.9, 4.5), # Rotterdam (biggest European bulk port) (51.2, 4.4), # Antwerp (Belgium) (53.5, 0.0), # Humber / Grimsby / Immingham (UK bulk) (60.0, 5.0), # Norway / Bergen (65.0, 14.0), # Norway / Bodø / north # ========================================================================== # === 6. UNITED KINGDOM === # ========================================================================== (51.5, 0.5), # Thames / London / Tilbury (53.5, -3.0), # Liverpool / Mersey (57.1, -2.1), # Aberdeen (Scotland) (55.9, -4.5), # Clyde / Glasgow # ========================================================================== # === 7. ATLANTIC (Iberia + Bay of Biscay) === # ========================================================================== (38.5, -9.0), # Lisbon / Sines (Portugal) (43.3, -8.4), # La Coruña / Ferrol (Spain) (47.0, -10.0), # Bay of Biscay (36.0, -8.0), # Strait of Gibraltar / Algeciras # ========================================================================== # === 8. RED SEA / GULF OF ADEN === # ========================================================================== (23.0, 38.0), # Jeddah area (Saudi Arabia) (16.5, 41.5), # Eritrea / Massawa coast (14.0, 43.0), # Yemen / Aden (12.0, 44.5), # Djibouti (key transshipment) (27.2, 33.8), # Suez Canal / Port Said area # ========================================================================== # === 9. PERSIAN GULF === # ========================================================================== (25.0, 56.0), # Gulf of Oman / Strait of Hormuz (26.5, 53.0), # Central Gulf (Dubai / Abu Dhabi) (29.5, 49.0), # Kuwait / Basra / Iraq (24.5, 51.5), # Qatar / Doha # ========================================================================== # === 10. INDIAN OCEAN === # ========================================================================== (10.0, 65.0), # Arabian Sea (18.0, 83.0), # East India / Vizag (key coal/ore port) (8.0, 77.0), # Sri Lanka / South India (5.0, 80.0), # Sri Lanka east coast (-5.0, 55.0), # East Africa / Mozambique Channel (-20.0, 57.0), # Mauritius area (10.0, 50.0), # Gulf of Aden / Somalia # ========================================================================== # === 11. SOUTH / SOUTHEAST ASIA === # ========================================================================== (1.5, 103.8), # Singapore / Malacca Strait (3.5, 108.0), # South China Sea (10.0, 108.0), # Vietnam coast (16.0, 108.0), # Da Nang / Central Vietnam # ========================================================================== # === 12. EAST ASIA === # ========================================================================== (22.0, 114.0), # Hong Kong area (30.0, 122.0), # Shanghai area (world's busiest port) (35.0, 129.0), # South Korea / Busan (33.5, 135.0), # Japan / Osaka / Nagoya (35.5, 140.0), # Tokyo Bay / Yokohama (39.0, 117.7), # Tianjin / North China # ========================================================================== # === 13. AUSTRALIA / PACIFIC === # ========================================================================== (-17.5, 149.0), # Hay Point / Dalrymple Bay (Queensland coal — world's largest) (-27.0, 153.0), # Brisbane (-33.9, 151.2), # Sydney / Newcastle (NSW coal) (-31.9, 115.8), # Fremantle / Perth (WA iron ore) (-20.3, 118.6), # Port Hedland (world's largest iron ore export) (-34.0, 137.8), # Port Adelaide # ========================================================================== # === 14. US EAST COAST / GULF === # ========================================================================== (29.5, -94.5), # Houston / Galveston (US grain) (30.0, -90.0), # New Orleans / Mississippi (largest grain export) (38.0, -76.0), # Baltimore / Chesapeake (40.7, -74.0), # New York / New Jersey (25.5, -80.2), # Miami (32.0, -81.0), # Savannah (US east coast grain) # ========================================================================== # === 15. US WEST COAST === # ========================================================================== (37.8, -122.3), # San Francisco Bay (33.7, -118.3), # Los Angeles / Long Beach (48.5, -122.7), # Puget Sound / Seattle / Tacoma # ========================================================================== # === 16. SOUTH AMERICA === # ========================================================================== (-33.9, -70.7), # Valparaiso / San Antonio (Chile) (-23.0, -43.2), # Rio de Janeiro / Itaguaí (iron ore) (-34.0, -51.0), # Rio Grande do Sul (Brazil soy/grain) (-38.0, -57.5), # Bahia Blanca (Argentina grain) (-32.0, -52.1), # Rio Grande (Brazil) (10.6, -61.5), # Trinidad / Caribbean # ========================================================================== # === 17. AFRICA === # ========================================================================== (-34.0, 18.5), # Cape Town (South Africa) (-29.9, 31.0), # Durban (South Africa) (-33.0, 27.9), # Port Elizabeth / Ngqura (-4.0, 39.5), # Mombasa (Kenya) (6.5, 3.4), # Lagos (Nigeria) (5.0, -4.0), # Abidjan (Côte d'Ivoire) (0.0, 9.5), # Libreville / Owendo (Gabon) # ========================================================================== # === 18. GREAT LAKES / CANADA === # ========================================================================== (43.5, -79.5), # Toronto / Hamilton (Lake Ontario) (45.5, -73.6), # Montreal / St Lawrence Seaway (46.5, -84.5), # Sault Ste. Marie / Lake Superior ] CHECKPOINT_FILE = 'scraper_checkpoint.json' SOURCE_TAG = 'mt_pro_scraper' # AIS bulk carrier type codes (70-79 = cargo, but 70 is general cargo) # We filter by name keywords for actual bulk carriers BULK_KEYWORDS = ['bulk', 'ore carrier', 'self-unloader', 'cement carrier', 'coal carrier', 'grain carrier'] BULK_TYPE_CODES = set(range(70, 80)) # AIS cargo codes def _load_checkpoint() -> dict: """Load progress checkpoint from file.""" if os.path.exists(CHECKPOINT_FILE): try: with open(CHECKPOINT_FILE, 'r') as f: return json.load(f) except Exception: pass return { 'processed_mmsis': [], 'total_processed': 0, 'last_grid_idx': 0, 'started_at': datetime.utcnow().isoformat(), 'owners_added': 0, } def _save_checkpoint(cp: dict): """Save checkpoint to file.""" try: with open(CHECKPOINT_FILE, 'w') as f: json.dump(cp, f, indent=2) except Exception as e: logger.error(f"Checkpoint save error: {e}") def _is_bulk_carrier(vessel: dict) -> bool: """Determine if a vessel is a bulk carrier based on type string and/or type code.""" vtype = (vessel.get('type') or vessel.get('type_name') or '').lower() type_code = vessel.get('type_code') or vessel.get('shiptype') or 0 try: type_code = int(type_code) except (ValueError, TypeError): type_code = 0 # Check type string keywords for kw in BULK_KEYWORDS: if kw in vtype: return True # AIS type codes 70-79 but exclude obvious non-bulkers # type_code 70 = general cargo (broad category) # 71, 72, 73, 79 are more specific cargo types if type_code in BULK_TYPE_CODES: # Exclude if clearly identified as non-bulk non_bulk = ['container', 'ro-ro', 'roro', 'passenger', 'reefer', 'refrigerator'] if not any(nb in vtype for nb in non_bulk): return True # Accept as potential bulk carrier return False def _save_vessel_to_db(vessel_data: dict, test_mode: bool = False) -> Optional[int]: """Save vessel data to DB using existing upsert_vessel function.""" if test_mode: logger.info(f" [TEST] Would save vessel: {vessel_data.get('name', 'Unknown')} " f"IMO={vessel_data.get('imo', 'N/A')} " f"Owner={vessel_data.get('owner', 'N/A')}") return None try: import maritime_db as mdb vessel_data['source'] = SOURCE_TAG if 'companies' in vessel_data and vessel_data['companies']: vessel_data['companies_json'] = json.dumps(vessel_data['companies'], ensure_ascii=False) vessel_data.pop('companies', None) return mdb.upsert_vessel(vessel_data) except Exception as e: logger.error(f"DB vessel save error ({vessel_data.get('mmsi')}): {e}") return None def _save_owner_to_contacts(company: dict, mmsi: str, test_mode: bool = False): """Save ownership data to contacts table.""" if not company.get('name') or company['name'] in ('-', 'N/A', 'Unknown'): return if test_mode: logger.info(f" [TEST] Would save contact: {company.get('name')} " f"role={company.get('role')} website={company.get('website', 'N/A')}") return try: import maritime_db as mdb conn = mdb.get_connection() cursor = conn.cursor() # Check if exists cursor.execute("SELECT id, vessels_mmsi FROM contacts WHERE company_name = ? AND source = ?", (company['name'], SOURCE_TAG)) existing = cursor.fetchone() if existing: # Update vessels_mmsi list existing_mmsis_str = existing[1] if hasattr(existing, '__getitem__') else dict(existing).get('vessels_mmsi', '') existing_mmsis = set(json.loads(existing_mmsis_str)) if existing_mmsis_str else set() existing_mmsis.add(mmsi) new_mmsis = json.dumps(list(existing_mmsis)) cursor.execute("UPDATE contacts SET vessels_mmsi = ?, updated_at = CURRENT_TIMESTAMP WHERE company_name = ? AND source = ?", (new_mmsis, company['name'], SOURCE_TAG)) if company.get('website'): cursor.execute("UPDATE contacts SET website = ? WHERE company_name = ? AND source = ? AND (website IS NULL OR website = '')", (company['website'], company['name'], SOURCE_TAG)) else: cursor.execute(""" INSERT INTO contacts (type, company_name, country, website, notes, vessels_mmsi, source, verified) VALUES (?, ?, ?, ?, ?, ?, ?, 0) """, ( 'owner', company['name'], company.get('country', ''), company.get('website', ''), f"Role: {company.get('role', 'owner')} | Scraped from MT Pro", json.dumps([mmsi]), SOURCE_TAG, )) conn.commit() conn.close() except Exception as e: logger.error(f"DB contacts save error ({company.get('name')}): {e}") def _load_cookies_into_session(session, cookies_file: str) -> bool: """Load MT browser cookies from JSON file into requests/curl_cffi session.""" try: with open(cookies_file, 'r') as f: cookies = json.load(f) for c in cookies: session.cookies.set(c['name'], c['value'], domain=c.get('domain', '.marinetraffic.com')) logger.info(f"Loaded {len(cookies)} cookies from {cookies_file}") return True except Exception as e: logger.error(f"Failed to load cookies from {cookies_file}: {e}") return False def run_scraper(email: str, password: str, limit: int = 99999, delay: float = 2.0, test_mode: bool = False, resume: bool = True, totp_secret: str = None, cookies_file: str = None): """Main scraper function.""" logger.info("=" * 60) logger.info(f"Bulk Carrier Scraper started at {datetime.utcnow().isoformat()}") logger.info(f"Login: {email} | Limit: {limit} | Delay: {delay}s | Test: {test_mode} | Resume: {resume}") logger.info(f"TOTP: {'enabled' if totp_secret else 'not set'} | Grid points: {len(GRID_POINTS)}") logger.info("=" * 60) # Load checkpoint cp = _load_checkpoint() if resume else { 'processed_mmsis': [], 'total_processed': 0, 'last_grid_idx': 0, 'started_at': datetime.utcnow().isoformat(), 'owners_added': 0, } processed_mmsis: Set[str] = set(cp.get('processed_mmsis', [])) total_processed = cp.get('total_processed', 0) owners_added = cp.get('owners_added', 0) start_grid_idx = cp.get('last_grid_idx', 0) if resume else 0 logger.info(f"Resuming from grid point {start_grid_idx}, " f"{total_processed} vessels already processed, {len(processed_mmsis)} MMSIs tracked") # Initialize MT parser and login from marinetraffic_parser import MarineTrafficParser parser = MarineTrafficParser(totp_secret=totp_secret) if cookies_file and os.path.exists(cookies_file): # Load pre-extracted browser cookies (bypasses Cloudflare JS challenge) logger.info(f"Loading cookies from {cookies_file}...") _load_cookies_into_session(parser.session, cookies_file) logger.info("Cookies loaded — skipping form login") elif not test_mode: logger.info(f"Logging into MarineTraffic Pro as {email}...") logged_in = parser.login(email, password, totp_secret=totp_secret) if logged_in: logger.info("Login successful!") else: logger.warning("Login may have failed — ownership data might be limited. Continuing anyway...") else: logger.info("[TEST MODE] Skipping actual MT login") vessels_this_run = 0 for grid_idx in range(start_grid_idx, len(GRID_POINTS)): lat, lon = GRID_POINTS[grid_idx] logger.info(f"\n[Grid {grid_idx+1}/{len(GRID_POINTS)}] Scanning area: lat={lat}, lon={lon} (r=150NM)") # Scrape vessels in this area try: area_vessels = parser.scrape_area_vessels(lat, lon, radius_nm=150) except Exception as e: logger.error(f"Area scrape error at ({lat},{lon}): {e}") area_vessels = [] # Filter bulk carriers bulk_vessels = [v for v in area_vessels if _is_bulk_carrier(v)] new_vessels = [v for v in bulk_vessels if v.get('mmsi') and v['mmsi'] not in processed_mmsis] logger.info(f" Found {len(area_vessels)} vessels total, {len(bulk_vessels)} bulk carriers, " f"{len(new_vessels)} new (not yet processed)") for vessel in new_vessels: if vessels_this_run >= limit: logger.info(f"Reached limit of {limit} vessels. Stopping.") cp['last_grid_idx'] = grid_idx cp['processed_mmsis'] = list(processed_mmsis)[-5000:] # Keep last 5000 to avoid huge file cp['total_processed'] = total_processed cp['owners_added'] = owners_added _save_checkpoint(cp) _print_summary(total_processed, owners_added, vessels_this_run, cp['started_at']) return mmsi = vessel['mmsi'] # Fetch full ownership page from MT Pro logger.info(f" [{vessels_this_run+1}] MMSI={mmsi} {vessel.get('name', '?')} | Fetching ownership...") try: ownership_data = parser.get_vessel_ownership(mmsi) except Exception as e: logger.error(f" Ownership fetch error for {mmsi}: {e}") ownership_data = {} # Merge position data with ownership data merged = {**vessel} # Start with AIS data (name, type, flag, mmsi, lat, lon, etc) if ownership_data: # Ownership data enriches/overrides AIS basic fields for k, v in ownership_data.items(): if v and k != 'mmsi': # Don't override MMSI merged[k] = v # Try to fetch company websites for each owner (slower but more valuable) companies = merged.get('companies', []) for company in companies: mt_url = company.get('mt_profile_url') if mt_url and not company.get('website'): try: website = parser.get_company_website(mt_url) if website: company['website'] = website # Also set on merged vessel if it's the main owner if not merged.get('website') and company.get('role') in ( 'beneficial owner', 'registered owner', 'commercial manager' ): merged['website'] = website time.sleep(0.5) # Brief pause between company page fetches except Exception: pass # Log what we found owner_info = merged.get('beneficial_owner') or merged.get('registered_owner') or merged.get('owner', 'N/A') website_info = merged.get('website', '-') logger.info(f" Name={merged.get('name', '?')} | Flag={merged.get('flag', '?')} | " f"DWT={merged.get('deadweight', '?')} | Year={merged.get('year_built', '?')} | " f"Owner={owner_info} | Website={website_info}") # Save vessel to DB _save_vessel_to_db(merged, test_mode) # Save each owner/operator to contacts table for company in companies: if company.get('name'): _save_owner_to_contacts(company, mmsi, test_mode) owners_added += 1 # Track progress processed_mmsis.add(mmsi) total_processed += 1 vessels_this_run += 1 # Save checkpoint every 100 vessels if total_processed % 100 == 0: cp['last_grid_idx'] = grid_idx cp['processed_mmsis'] = list(processed_mmsis)[-5000:] cp['total_processed'] = total_processed cp['owners_added'] = owners_added _save_checkpoint(cp) logger.info(f" [Checkpoint saved] {total_processed} vessels total, {owners_added} owner records") # Rate limiting: respect MT's servers time.sleep(delay) # Brief pause between grid areas time.sleep(1.0) # Save grid-level checkpoint cp['last_grid_idx'] = grid_idx + 1 cp['processed_mmsis'] = list(processed_mmsis)[-5000:] cp['total_processed'] = total_processed cp['owners_added'] = owners_added _save_checkpoint(cp) # Completed all grid points logger.info("\nAll grid points processed!") _print_summary(total_processed, owners_added, vessels_this_run, cp['started_at']) def _print_summary(total_processed: int, owners_added: int, vessels_this_run: int, started_at: str): """Print final summary.""" elapsed = (datetime.utcnow() - datetime.fromisoformat(started_at)).total_seconds() hours = int(elapsed // 3600) minutes = int((elapsed % 3600) // 60) logger.info("=" * 60) logger.info(f"SCRAPER SUMMARY") logger.info(f" Vessels this run: {vessels_this_run}") logger.info(f" Total all-time: {total_processed}") logger.info(f" Owner records: {owners_added}") logger.info(f" Elapsed: {hours}h {minutes}m") logger.info("=" * 60) def main(): parser = argparse.ArgumentParser( description='Bulk Carrier Ownership Scraper — MarineTraffic Pro Edition', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument('--login', default=os.environ.get('MT_LOGIN', ''), help='MarineTraffic Pro email (or set MT_LOGIN env var)') parser.add_argument('--password', default=os.environ.get('MT_PASSWORD', ''), help='MarineTraffic Pro password (or set MT_PASSWORD env var)') parser.add_argument('--totp-secret', default=os.environ.get('MT_TOTP_SECRET', ''), help='Google Authenticator base32 secret for 2FA (or set MT_TOTP_SECRET env var)') parser.add_argument('--cookies-file', default='mt_cookies.json', help='Path to cookies JSON exported by mt_cookie_extractor.py (default: mt_cookies.json)') parser.add_argument('--limit', type=int, default=99999, help='Max vessels to process in this run (default: unlimited)') parser.add_argument('--delay', type=float, default=2.0, help='Delay between vessel page requests in seconds (default: 2.0)') parser.add_argument('--test', action='store_true', help='Test mode: scrape but do NOT save to DB') parser.add_argument('--resume', action='store_true', default=True, help='Resume from checkpoint (default: True)') parser.add_argument('--fresh', action='store_true', help='Start fresh, ignoring checkpoint') parser.add_argument('--grid-from', type=int, default=None, help='Start from specific grid point index (0-based)') parser.add_argument('--status', action='store_true', help='Show scraper status from checkpoint and exit') args = parser.parse_args() # Status command if args.status: cp = _load_checkpoint() print(f"\nScraper Status:") print(f" Total processed: {cp.get('total_processed', 0)}") print(f" Owners added: {cp.get('owners_added', 0)}") print(f" Last grid point: {cp.get('last_grid_idx', 0)} / {len(GRID_POINTS)}") print(f" MMSIs tracked: {len(cp.get('processed_mmsis', []))}") print(f" Started at: {cp.get('started_at', 'N/A')}") return # Validate credentials if not args.login or not args.password: print("ERROR: MarineTraffic login credentials required.") print(" Use --login EMAIL --password PASS") print(" Or set MT_LOGIN and MT_PASSWORD environment variables") sys.exit(1) resume = not args.fresh if args.grid_from is not None: # Override grid start point in checkpoint cp = _load_checkpoint() cp['last_grid_idx'] = args.grid_from _save_checkpoint(cp) try: run_scraper( email=args.login, password=args.password, limit=args.limit, delay=args.delay, test_mode=args.test, resume=resume, totp_secret=args.totp_secret or None, cookies_file=args.cookies_file, ) except KeyboardInterrupt: logger.info("\nScraper interrupted by user. Progress saved in checkpoint.") sys.exit(0) except Exception as e: logger.error(f"Fatal error: {e}") logger.error(traceback.format_exc()) sys.exit(1) if __name__ == '__main__': main()