#!/usr/bin/env python3 """ MT Regional Scraper — Full pipeline for specific sea regions. For each region: tile scraping → save to DB → MMSI lookup → ownership fetch. One browser session, one script, complete data for each region. Regions: Caspian → Mediterranean → Baltic → everything else. Usage: python mt_regional_scraper.py --region caspian # Caspian Sea only python mt_regional_scraper.py --region med # Mediterranean only python mt_regional_scraper.py --region baltic # Baltic only python mt_regional_scraper.py --region all # All regions sequentially python mt_regional_scraper.py --region caspian --phase tiles # Only tile phase python mt_regional_scraper.py --region caspian --phase mmsi # Only MMSI phase python mt_regional_scraper.py --region caspian --phase owner # Only ownership phase """ import asyncio, json, sys, os, time, re, struct, hmac, hashlib, base64, argparse, math import psycopg2 from datetime import datetime os.chdir(os.path.dirname(os.path.abspath(__file__))) if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace') EMAIL = "operation@mrlogisticcorp.com" PASSWORD = "NKh9i8Z!7fU9jfi" TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH" DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db' OWNERSHIP_COLS = 'shipname,imo,flag,beneficial_owner,operator,registered_owner' CKPT_PREFIX = 'mt_regional_' def db_connect(): """Connect to DB with keepalive to survive SSH tunnel drops.""" return psycopg2.connect( DB_URL, connect_timeout=15, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5 ) def _restart_ssh_tunnel(): """Kill and restart the SSH tunnel to prod DB.""" import subprocess # Kill existing ssh try: subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5) except Exception: pass time.sleep(2) # Start new tunnel in background try: subprocess.Popen( ['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120', '-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no', '-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) print(f" [SSH] Tunnel restarted, waiting 5s...") time.sleep(5) except Exception as e: print(f" [SSH] Failed to restart: {e}") def db_reconnect(conn): """Close old connection and reconnect. Returns (conn, cursor). Will restart SSH tunnel if needed.""" try: conn.close() except Exception: pass # First try: just reconnect (tunnel might still be alive) for attempt in range(5): try: time.sleep(3) new_conn = db_connect() print(f" [DB] Reconnected (attempt {attempt+1})") return new_conn, new_conn.cursor() except Exception as e: print(f" [DB] Reconnect attempt {attempt+1} failed: {e}") # Second try: restart SSH tunnel and reconnect print(f" [DB] Restarting SSH tunnel...") _restart_ssh_tunnel() for attempt in range(10): try: time.sleep(5) new_conn = db_connect() print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})") return new_conn, new_conn.cursor() except Exception as e: print(f" [DB] Post-restart attempt {attempt+1} failed: {e}") if attempt == 4: # try restarting tunnel again _restart_ssh_tunnel() raise Exception("DB reconnect failed after tunnel restart") def db_safe_commit(conn): """Commit with reconnect if connection died (SSH tunnel drop).""" try: conn.commit() return conn except (psycopg2.InterfaceError, psycopg2.OperationalError): print(f" [DB] Connection lost on commit, reconnecting...") conn, _ = db_reconnect(conn) return conn def db_safe_execute(conn, cur, query, params=None): """Execute query with auto-reconnect on connection drop. Returns (conn, cur).""" try: cur.execute(query, params) return conn, cur except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: print(f" [DB] Connection lost on execute ({e}), reconnecting...") conn, cur = db_reconnect(conn) # Retry the query on new connection cur.execute(query, params) return conn, cur # ─── REGIONS: dense grid points at zoom=8 for max coverage ────────────────── # Each region has: name, zoom, [(lat, lon), ...] grid points # At zoom=8, each view covers ~5-6 degrees, so points every ~4 degrees for overlap. # At zoom=9, each view covers ~2.5 degrees. Using zoom=8 for wider capture. REGIONS = { 'caspian': { 'name': 'Caspian Sea', 'zoom': 10, # max detail! At zoom=10 every vessel visible 'grid': [ # Dense Caspian coverage with zoom=10 (each view ~1 degree) # South Caspian (Iran coast: Anzali, Noshahr, Amirabad) (37.0, 49.0), (37.0, 50.0), (37.0, 51.0), (37.0, 52.0), (37.0, 53.0), (36.8, 49.5), (36.8, 50.5), (36.8, 51.5), (36.8, 52.5), (37.5, 49.0), (37.5, 50.0), (37.5, 51.0), (37.5, 52.0), # Baku area (major hub) (39.8, 49.5), (40.0, 49.8), (40.3, 50.0), (40.5, 50.3), (39.5, 49.0), (39.5, 50.0), (40.0, 50.5), # Turkmenistan coast (Turkmenbashi) (39.5, 52.5), (40.0, 53.0), (40.5, 52.5), (39.0, 52.0), (38.5, 53.0), (38.0, 52.5), # Central Caspian (40.0, 51.0), (41.0, 51.0), (42.0, 51.0), (40.5, 52.0), (41.5, 52.0), # Kazakhstan (Aktau port) (43.0, 51.0), (43.6, 51.2), (44.0, 51.5), (43.0, 52.0), (44.0, 52.0), # North Caspian (45.0, 51.0), (45.0, 52.0), (46.0, 51.5), (46.5, 51.0), (47.0, 51.5), (47.5, 52.0), # Astrakhan / Volga delta area (45.5, 48.5), (46.0, 49.0), (46.5, 49.5), (45.0, 49.0), (45.5, 49.5), # Dagestan coast (Makhachkala) (42.5, 47.5), (43.0, 47.5), (43.5, 48.0), (42.0, 48.5), (42.5, 48.0), # West coast (Azerbaijan south) (38.5, 49.0), (39.0, 49.5), (41.0, 49.0), (38.0, 49.5), (41.5, 50.0), ], }, 'med': { 'name': 'Mediterranean Sea', 'zoom': 10, # max detail 'grid': [ # Gibraltar / Strait (36.0, -5.5), (36.0, -4.0), (36.5, -5.0), (36.5, -3.0), # Alboran Sea (36.0, -2.5), (36.0, -1.0), (36.5, -2.0), (37.0, -1.5), # Spanish coast (Cartagena, Valencia, Barcelona) (37.5, -1.0), (38.0, 0.0), (39.0, 0.5), (39.5, -0.5), (40.0, 0.5), (41.0, 1.0), (41.5, 2.5), # Balearics (39.5, 2.5), (39.0, 3.5), # French Riviera / Marseille (43.0, 5.0), (43.5, 5.5), (43.0, 7.0), # Gulf of Lion (42.5, 3.5), (42.5, 5.0), # Corsica/Sardinia (41.5, 9.0), (40.0, 9.5), (39.0, 9.0), # Genoa / NW Italy (44.0, 9.0), (43.5, 10.5), # Tyrrhenian Sea (41.0, 13.0), (40.5, 14.0), (39.5, 14.5), (38.5, 15.5), (40.0, 12.0), (41.5, 12.5), # Sicily / Malta (37.5, 15.0), (37.0, 14.0), (36.0, 14.5), (35.5, 14.5), # Tunisia/Libya coast (37.0, 10.0), (36.5, 11.0), (35.0, 12.0), (33.0, 13.0), (32.5, 15.0), (32.0, 20.0), (31.5, 25.0), # Adriatic (south to north) (40.0, 18.5), (41.0, 17.0), (42.0, 16.5), (43.0, 15.5), (44.0, 13.5), (44.5, 12.5), (45.0, 13.5), (41.5, 19.0), (39.5, 19.5), # Ionian Sea (38.0, 20.5), (37.5, 21.5), (36.5, 21.0), # Crete (35.5, 24.0), (35.0, 25.5), (35.5, 26.5), # Aegean (37.5, 24.0), (38.0, 25.0), (38.5, 23.5), (39.0, 26.0), (39.5, 25.5), (37.0, 25.5), (38.5, 26.5), # Dardanelles / Marmara (40.5, 27.0), (40.0, 26.0), (41.0, 29.0), # South Turkey coast (36.5, 28.5), (36.5, 30.0), (36.5, 32.0), (36.5, 34.0), (37.0, 35.5), (36.0, 33.0), # Cyprus (34.5, 33.0), (35.0, 34.0), (34.0, 32.5), # Lebanon/Israel (33.5, 35.0), (32.5, 34.5), (31.5, 34.0), # Egypt (Alexandria, Port Said, Suez Canal entry) (31.5, 30.0), (31.0, 32.0), (31.5, 32.5), (30.0, 32.5), (30.5, 33.0), # Libya south / offshore (33.5, 12.0), (33.5, 15.0), (34.0, 11.0), # Algeria coast (36.5, 1.0), (37.0, 3.0), (37.0, 6.0), (36.5, 8.0), ], }, 'baltic': { 'name': 'Baltic Sea', 'zoom': 10, # max detail 'grid': [ # Danish Straits / Kattegat / Skagerrak (55.5, 8.0), (55.5, 9.0), (55.5, 10.0), (55.5, 11.0), (56.0, 8.5), (56.0, 10.0), (56.0, 11.5), (56.5, 8.5), (56.5, 10.0), (56.5, 12.0), (57.0, 9.0), (57.0, 10.5), (57.5, 8.5), (57.5, 10.0), (58.0, 10.0), (58.5, 10.5), # South Baltic (Kiel, Rostock, Gdansk, Kaliningrad) (54.0, 10.5), (54.0, 12.0), (54.0, 13.5), (54.5, 10.0), (54.5, 11.5), (54.5, 13.0), (54.5, 14.5), (54.5, 16.0), (54.5, 18.0), (54.5, 19.5), (55.0, 12.0), (55.0, 14.0), (55.0, 16.0), (55.0, 18.0), (55.0, 20.0), # Central Baltic (Sweden east coast, Latvia, Lithuania) (55.5, 13.0), (55.5, 15.0), (55.5, 17.0), (55.5, 19.0), (55.5, 21.0), (56.0, 14.0), (56.0, 16.0), (56.0, 18.0), (56.0, 20.0), (56.5, 14.0), (56.5, 16.0), (56.5, 18.0), (56.5, 20.0), (57.0, 17.0), (57.0, 19.0), (57.0, 21.0), (57.5, 18.0), (57.5, 20.0), (58.0, 17.0), (58.0, 19.0), # Gotland / Öland area (57.5, 16.0), (57.0, 18.5), # Gulf of Riga (57.5, 22.0), (57.5, 24.0), (58.0, 23.0), # Estonia / Gulf of Finland (59.0, 21.0), (59.0, 23.0), (59.0, 24.5), (59.5, 22.0), (59.5, 23.5), (59.5, 25.0), (59.5, 27.0), (60.0, 24.0), (60.0, 25.5), (60.0, 27.0), (60.0, 29.0), (60.5, 25.0), (60.5, 27.0), (60.5, 29.0), # Stockholm area (59.0, 18.0), (59.5, 18.5), # Åland Sea (60.0, 19.5), (60.0, 21.0), # Gulf of Bothnia (61.0, 18.0), (61.0, 20.0), (61.0, 22.0), (62.0, 18.0), (62.0, 20.0), (62.0, 22.0), (63.0, 18.5), (63.0, 20.5), (64.0, 20.0), (64.0, 22.0), (65.0, 22.0), (65.0, 24.0), (65.5, 23.5), # St. Petersburg approach (59.5, 28.5), (60.0, 28.5), ], }, 'blacksea': { 'name': 'Black Sea', 'zoom': 10, 'grid': [ # Bosphorus / Istanbul approach (41.0, 29.0), (41.5, 29.5), (41.0, 28.0), # West Black Sea (Bulgaria, Romania) (43.0, 28.0), (43.5, 28.5), (42.5, 28.0), (44.0, 29.0), (44.5, 29.5), # Constanta / Odessa area (44.0, 30.0), (44.5, 30.5), (45.0, 30.0), (45.5, 30.0), (45.5, 31.0), (46.0, 31.0), (46.5, 31.5), # Central Black Sea (43.0, 31.0), (43.0, 33.0), (43.0, 35.0), (44.0, 32.0), (44.0, 34.0), (44.0, 36.0), # Turkey north coast (42.0, 33.0), (42.0, 35.0), (42.0, 37.0), (41.5, 31.0), (41.5, 33.0), (41.5, 36.0), (41.0, 37.0), (41.0, 39.0), (41.0, 41.0), # Trabzon / Batumi area (41.5, 40.0), (41.5, 41.5), (42.0, 41.0), # Samsun area (41.5, 36.0), (41.0, 35.5), # East Black Sea / Georgia (42.0, 39.0), (42.0, 41.5), (42.5, 37.0), (43.0, 38.0), (43.0, 40.0), # Novorossiysk / Tuapse (44.5, 37.5), (44.0, 38.0), (44.5, 38.5), # Crimea / Sevastopol (44.5, 33.0), (44.5, 34.0), (45.0, 33.5), # Sea of Azov (46.0, 37.0), (46.5, 38.0), (47.0, 38.5), (45.5, 36.0), (46.0, 39.0), ], }, 'redsea': { 'name': 'Red Sea + Suez', 'zoom': 10, 'grid': [ # Suez Canal (30.5, 32.5), (30.0, 32.5), (31.0, 32.5), # Gulf of Suez (29.5, 32.5), (29.0, 33.0), (28.5, 33.5), (28.0, 33.5), (27.5, 34.0), # North Red Sea (27.0, 34.5), (26.5, 35.0), (26.0, 35.5), (25.5, 36.0), (25.0, 36.5), # Central Red Sea (24.0, 37.0), (23.0, 37.5), (22.0, 38.0), (21.0, 38.5), (20.0, 39.0), (19.0, 39.5), # South Red Sea (18.0, 40.0), (17.0, 40.5), (16.0, 41.5), (15.0, 42.0), (14.0, 42.5), # Bab el-Mandeb / Djibouti (13.0, 43.0), (12.5, 43.5), (12.0, 44.0), (11.5, 43.0), # Gulf of Aden (12.0, 45.0), (12.5, 46.0), (12.0, 47.0), (13.0, 48.0), (12.5, 49.0), (12.0, 50.0), # Jeddah area (21.5, 39.0), (22.0, 39.0), ], }, 'persian': { 'name': 'Persian Gulf', 'zoom': 10, 'grid': [ # Strait of Hormuz (26.0, 56.0), (26.5, 56.5), (26.0, 57.0), (25.5, 56.5), (25.0, 57.0), # UAE coast (Fujairah, Dubai, Abu Dhabi) (25.0, 55.0), (25.5, 55.5), (25.0, 54.0), (24.5, 54.5), (24.5, 53.5), (24.0, 53.0), # Qatar / Bahrain (26.0, 51.0), (26.0, 50.5), (25.5, 51.5), (25.5, 50.5), # Kuwait (29.0, 48.5), (29.5, 48.5), (29.0, 49.0), # Iraq (Basra) (29.5, 48.0), (30.0, 48.5), # Iran coast (27.0, 52.0), (27.5, 52.5), (27.0, 53.5), (28.0, 51.0), (28.5, 50.5), # Central Gulf (26.5, 52.0), (26.5, 54.0), (27.0, 50.0), (27.0, 51.0), # Saudi coast (26.5, 50.0), (26.0, 49.5), (28.0, 49.0), # Oman Sea / Muscat (23.5, 58.5), (24.0, 57.5), (23.0, 58.0), ], }, } # Include ALL cargo types, not just bulk ALL_CARGO_TYPES = {'4', '5', '6', '7', '8', '9'} # all that appear as "green" MIN_DWT = 1000 # minimum DWT for any cargo vessel # GT_SHIPTYPE values for GREEN (cargo) vessels on MT map GREEN_GT_TYPES = ('6', '9', '11', '12', '70', '122', '139') # bulk, cargo, container, roro, multipurpose def totp(secret): s = secret.upper().replace(' ', '') pad = (-len(s)) % 8 key = base64.b32decode(s + '=' * pad) counter = int(time.time()) // 30 msg = struct.pack('>Q', counter) h = hmac.new(key, msg, hashlib.sha1).digest() offset = h[-1] & 0x0f code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff return str(code % 1000000).zfill(6) def ts(): return datetime.now().strftime('%H:%M:%S') async def do_login(page, max_retries=2): for attempt in range(max_retries): print(f"[{ts()}] LOGIN (attempt {attempt+1}/{max_retries})...") await page.goto('https://www.marinetraffic.com/en/users/login', wait_until='domcontentloaded', timeout=30000) await asyncio.sleep(3) await page.fill('input[name="username"]', EMAIL) await page.click('button[type="submit"]') await asyncio.sleep(3) await page.fill('input[type="password"]', PASSWORD) await page.click('button[type="submit"]') await asyncio.sleep(4) # Handle 2FA - might be method selection or direct OTP entry if 'mfa' in page.url.lower() or 'auth.kpler' in page.url: # Try clicking Google Authenticator if method selection screen try: await page.click('button:has-text("Google Authenticator")', timeout=3000) await asyncio.sleep(2) except Exception: pass # Already on OTP input screen # Wait for OTP field, then generate fresh TOTP right before typing await asyncio.sleep(1) otp = totp(TOTP_SECRET) # Generate as close to fill as possible print(f" TOTP: {otp}") try: await page.fill('input[name="code"]', otp) await page.click('button[type="submit"]') except Exception: # Try alternative selectors try: await page.fill('input[type="text"]', otp) await page.click('button[type="submit"]') except Exception as e: print(f" 2FA fill error: {e}") await asyncio.sleep(8) # Wait longer for redirect after 2FA ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url if ok: print(f"[{ts()}] Login: OK | {page.url[:80]}") return True else: print(f"[{ts()}] Login attempt {attempt+1} failed | {page.url[:80]}") if attempt < max_retries - 1: print(f" Retrying in 5s...") await asyncio.sleep(5) print(f"[{ts()}] Login: FAILED after {max_retries} attempts") return False def is_cargo(row): """Return True if tile row is a cargo/bulk vessel worth collecting.""" st = str(row.get('SHIPTYPE', '') or '') dwt = int(row.get('DWT', 0) or 0) # Collect all cargo types (green on map) with reasonable DWT if st in ALL_CARGO_TYPES and dwt >= MIN_DWT: return True # Also catch GT_SHIPTYPE codes for bulk/cargo if SHIPTYPE is empty gt = str(row.get('GT_SHIPTYPE', '') or '') if gt in ('6', '9', '11', '12', '17', '88') and dwt >= MIN_DWT: return True return False # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 1: Tile Scraping # ═══════════════════════════════════════════════════════════════════════════════ async def phase_tiles(page, conn, region_key, region): """Navigate region grid and collect vessels from tile API responses.""" zoom = region['zoom'] grid = region['grid'] print(f"\n[{ts()}] === TILES: {region['name']} ({len(grid)} points, zoom={zoom}) ===") captured_tiles = [] new_vessels = 0 existing_ship_ids = set() # Get existing ship_ids for this region cur = conn.cursor() cur.execute("SELECT ship_id FROM mt_bulk_staging") for row in cur.fetchall(): existing_ship_ids.add(str(row[0])) async def on_tile(response): if 'getData/get_data_json_4' not in response.url: return try: body = await response.body() parsed = json.loads(body) rows = parsed.get('data', {}).get('rows', []) captured_tiles.extend(rows) except Exception: pass page.on('response', on_tile) for i, (lat, lon) in enumerate(grid): captured_tiles.clear() url = f'https://www.marinetraffic.com/en/ais/home/centerx:{lon}/centery:{lat}/zoom:{zoom}' print(f" [{i+1}/{len(grid)}] ({lat:.1f},{lon:.1f})", end=' ', flush=True) try: await page.goto(url, wait_until='load', timeout=35000) except Exception as e: print(f"ERROR: {e}") await asyncio.sleep(5) try: await page.goto(url, wait_until='domcontentloaded', timeout=35000) except Exception: print(f" SKIP") continue await asyncio.sleep(10) # Wait for tiles to load area_new = 0 for row in captured_tiles: if not is_cargo(row): continue sid = str(row.get('SHIP_ID', '')) if not sid or sid in existing_ship_ids: continue # Insert into mt_bulk_staging try: conn, cur = db_safe_execute(conn, cur, """ INSERT INTO mt_bulk_staging (ship_id, name, flag, dwt, shiptype, gt_shiptype, lat, lon, speed, course, destination, scraped_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (ship_id) DO UPDATE SET lat = EXCLUDED.lat, lon = EXCLUDED.lon, speed = EXCLUDED.speed, course = EXCLUDED.course, scraped_at = NOW() """, ( sid, row.get('SHIPNAME', ''), row.get('FLAG', ''), int(row.get('DWT', 0) or 0), str(row.get('SHIPTYPE', '')), str(row.get('GT_SHIPTYPE', '')), row.get('LAT'), row.get('LON'), row.get('SPEED'), row.get('COURSE'), row.get('DESTINATION', ''), )) existing_ship_ids.add(sid) area_new += 1 new_vessels += 1 except Exception as e: try: conn.rollback() except: pass conn = db_safe_commit(conn) cur = conn.cursor() print(f"tiles={len(captured_tiles):4d} new={area_new:3d} total_new={new_vessels}") await asyncio.sleep(2) page.remove_listener('response', on_tile) print(f"[{ts()}] TILES DONE: {new_vessels} new vessels added for {region['name']}") return new_vessels, conn # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 2: MMSI Lookup (via quicksearch_shipid) # ═══════════════════════════════════════════════════════════════════════════════ async def phase_mmsi(page, conn, region_key, region): """Get MMSI for vessels without it, using reports quicksearch_shipid.""" print(f"\n[{ts()}] === MMSI: {region['name']} ===") cur = conn.cursor() # Get region bounds from grid points lats = [p[0] for p in region['grid']] lons = [p[1] for p in region['grid']] lat_min, lat_max = min(lats) - 5, max(lats) + 5 lon_min, lon_max = min(lons) - 5, max(lons) + 5 gt_filter = ','.join(f"'{g}'" for g in GREEN_GT_TYPES) cur.execute(f""" SELECT ship_id, name, gt_shiptype, dwt, flag FROM mt_bulk_staging WHERE mmsi IS NULL AND lat BETWEEN %s AND %s AND lon BETWEEN %s AND %s AND gt_shiptype IN ({gt_filter}) ORDER BY CASE WHEN gt_shiptype='6' THEN 0 ELSE 1 END, dwt DESC NULLS LAST """, (lat_min, lat_max, lon_min, lon_max)) vessels = cur.fetchall() if not vessels: print(f" No vessels without MMSI in region") return 0, conn print(f" Vessels to process: {len(vessels)}") cols = 'flag,shipname,imo,ship_type,speed' found = 0 for i, (ship_id, name, gt, dwt, flag) in enumerate(vessels): url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels' f'&columns={cols}&quicksearch_shipid={ship_id}') js = f""" async () => {{ try {{ const r = await fetch({json.dumps(url)}, {{ credentials: 'include', cache: 'no-store', headers: {{ 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json', 'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels', }} }}); const text = await r.text(); const d = JSON.parse(text); return (d.data || []).slice(0, 3); }} catch(e) {{ return []; }} }} """ try: rows = await page.evaluate(js) except Exception: rows = [] mmsi = None imo = None if rows: row = rows[0] api_name = (row.get('SHIPNAME') or '').upper() our_name = (name or '').upper() match = 0 if api_name and our_name: if api_name == our_name: match = 3 elif our_name[:10] in api_name or api_name[:10] in our_name: match = 2 if match >= 2 or len(rows) == 1: mmsi = str(row.get('MMSI', '')) or None imo = str(row.get('IMO', '')) or None if mmsi: found += 1 conn, cur = db_safe_execute(conn, cur, """ UPDATE mt_bulk_staging SET mmsi = %s, imo = COALESCE(%s, imo), scraped_at = NOW() WHERE ship_id = %s """, (mmsi, imo, ship_id)) if i < 10 or (i + 1) % 50 == 0: status = f"MMSI={mmsi}" if mmsi else "no match" print(f" [{i+1}/{len(vessels)}] {name} -> {status}") if (i + 1) % 200 == 0: conn = db_safe_commit(conn) cur = conn.cursor() print(f" --- COMMIT [{i+1}/{len(vessels)}] found={found} ---") await asyncio.sleep(0.8) conn = db_safe_commit(conn) cur = conn.cursor() print(f"[{ts()}] MMSI DONE: {found}/{len(vessels)} found for {region['name']}") return found, conn # ═══════════════════════════════════════════════════════════════════════════════ # PHASE 3: Ownership Fetch # ═══════════════════════════════════════════════════════════════════════════════ async def phase_ownership(page, conn, region_key, region): """Get ownership data for vessels with MMSI but no owner.""" print(f"\n[{ts()}] === OWNERSHIP: {region['name']} ===") cur = conn.cursor() lats = [p[0] for p in region['grid']] lons = [p[1] for p in region['grid']] lat_min, lat_max = min(lats) - 5, max(lats) + 5 lon_min, lon_max = min(lons) - 5, max(lons) + 5 gt_filter = ','.join(f"'{g}'" for g in GREEN_GT_TYPES) cur.execute(f""" SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo FROM mt_bulk_staging WHERE mmsi IS NOT NULL AND owner IS NULL AND lat BETWEEN %s AND %s AND lon BETWEEN %s AND %s AND gt_shiptype IN ({gt_filter}) ORDER BY CASE WHEN gt_shiptype='6' THEN 0 ELSE 1 END, dwt DESC NULLS LAST """, (lat_min, lat_max, lon_min, lon_max)) vessels = cur.fetchall() if not vessels: print(f" No vessels needing ownership in region") return 0, conn print(f" Vessels to process: {len(vessels)}") found = 0 for i, (ship_id, name, gt, dwt, flag, mmsi, imo) in enumerate(vessels): url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels' f'&columns={OWNERSHIP_COLS}&quicksearch_shipid={ship_id}') js = f""" async () => {{ try {{ const r = await fetch({json.dumps(url)}, {{ credentials: 'include', cache: 'no-store', headers: {{ 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json', 'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels', }} }}); const text = await r.text(); const d = JSON.parse(text); return (d.data || []).length > 0 ? (d.data[0]) : null; }} catch(e) {{ return null; }} }} """ try: row = await page.evaluate(js) except Exception: row = None if row: owner = row.get('BENEFICIAL_OWNER') or row.get('REGISTERED_OWNER') or None reg_owner = row.get('REGISTERED_OWNER') or None operator = row.get('OPERATOR') or None row_imo = row.get('IMO') or None if owner or operator: found += 1 conn, cur = db_safe_execute(conn, cur, """ UPDATE mt_bulk_staging SET owner = COALESCE(%s, owner), operator = COALESCE(%s, operator), registered_owner = COALESCE(%s, registered_owner), imo = COALESCE(%s, imo), scraped_at = NOW() WHERE ship_id = %s """, (owner, operator, reg_owner, str(row_imo) if row_imo and str(row_imo) != '0' else imo, ship_id)) if i < 10 or (i + 1) % 50 == 0: status = f"{owner[:35] if owner else '?'}" if row else "no data" print(f" [{i+1}/{len(vessels)}] {name} -> {status}") if (i + 1) % 200 == 0: conn = db_safe_commit(conn) cur = conn.cursor() print(f" --- COMMIT [{i+1}/{len(vessels)}] found={found} ---") await asyncio.sleep(0.5) conn = db_safe_commit(conn) cur = conn.cursor() print(f"[{ts()}] OWNERSHIP DONE: {found}/{len(vessels)} for {region['name']}") return found, conn # ═══════════════════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════════════════ async def main(): parser = argparse.ArgumentParser() parser.add_argument('--region', required=True, choices=list(REGIONS.keys()) + ['all'], help='Region to scrape') parser.add_argument('--phase', choices=['tiles', 'mmsi', 'owner', 'all'], default='all', help='Which phase(s) to run') parser.add_argument('--zoom', type=int, default=0, help='Override zoom level for tiles phase') args = parser.parse_args() # Determine regions to process if args.region == 'all': regions_to_do = ['caspian', 'med', 'baltic', 'blacksea', 'redsea', 'persian'] else: regions_to_do = [args.region] # DB connection try: conn = psycopg2.connect(DB_URL, connect_timeout=10) cur = conn.cursor() cur.execute('SELECT count(*) FROM mt_bulk_staging') total = cur.fetchone()[0] print(f"DB connected. mt_bulk_staging: {total} vessels") except Exception as e: print(f"DB ERROR: {e}") print("Is SSH tunnel running?") return # Ensure registered_owner column exists try: cur.execute("ALTER TABLE mt_bulk_staging ADD COLUMN IF NOT EXISTS registered_owner TEXT") cur.execute("ALTER TABLE mt_bulk_staging ADD COLUMN IF NOT EXISTS year_built INTEGER") conn.commit() except Exception: conn.rollback() from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=['--no-sandbox', '--disable-blink-features=AutomationControlled'] ) context = await browser.new_context( viewport={'width': 1440, 'height': 900}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', ) page = await context.new_page() if not await do_login(page): print("Login FAILED!") await browser.close() conn.close() return await asyncio.sleep(3) # Warm up data page for reports API if args.phase in ('mmsi', 'owner', 'all'): await page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels', wait_until='load', timeout=40000) await asyncio.sleep(5) # Process each region for region_key in regions_to_do: region = REGIONS[region_key] if args.zoom: region = {**region, 'zoom': args.zoom} print(f"\n{'='*60}") print(f"REGION: {region['name']} ({len(region['grid'])} grid points)") print(f"{'='*60}") t0 = time.time() if args.phase in ('tiles', 'all'): new_tiles, conn = await phase_tiles(page, conn, region_key, region) # After tiles, warm up data page for MMSI/ownership API if args.phase == 'all' and new_tiles > 0: await page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels', wait_until='load', timeout=40000) await asyncio.sleep(5) if args.phase in ('mmsi', 'all'): _, conn = await phase_mmsi(page, conn, region_key, region) if args.phase in ('owner', 'all'): _, conn = await phase_ownership(page, conn, region_key, region) elapsed = time.time() - t0 # Region summary cur = conn.cursor() lats = [p[0] for p in region['grid']] lons = [p[1] for p in region['grid']] lat_min, lat_max = min(lats) - 5, max(lats) + 5 lon_min, lon_max = min(lons) - 5, max(lons) + 5 cur.execute(""" SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE lat BETWEEN %s AND %s AND lon BETWEEN %s AND %s """, (lat_min, lat_max, lon_min, lon_max)) t, m, o = cur.fetchone() print(f"\n>>> {region['name']} COMPLETE in {elapsed/60:.1f} min") print(f" Vessels: {t}, with MMSI: {m}, with owner: {o}") # Final global summary cur = conn.cursor() cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging") t, m, o = cur.fetchone() cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6'") tb, mb, ob = cur.fetchone() print(f"\n{'='*60}") print(f"GLOBAL: total={t} mmsi={m} owner={o}") print(f"BULK: total={tb} mmsi={mb} owner={ob}") print(f"{'='*60}") conn.close() await browser.close() print("Done!") asyncio.run(main())