#!/usr/bin/env python3 """ MT Green Fleet Scraper — Get ALL cargo vessels from MT Data page via direct API pagination. Strategy: 1. Login to MT Pro 2. Navigate to Data page (establishes session context) 3. Capture the API URL from the first response 4. Page through ALL results via page.evaluate(fetch()) with page=N 5. Filter client-side: keep only cargo types (Cargo, Bulk, Container, General, Multipurpose, Ro-Ro) 6. Save to mt_bulk_staging with all available data (name, mmsi, imo, owner, position) MT has ~2.8M vessels total. Cargo ~30-40% = ~900k vessels. At 0.5s/page × 500/page → ~900k vessels in ~30 minutes! Usage: python mt_green_fleet.py --probe # Test: first 5 pages, show API format python mt_green_fleet.py # Full scrape: all cargo vessels python mt_green_fleet.py --start 100 # Resume from page 100 python mt_green_fleet.py --max-pages 500 # Limit pages """ import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse, re import threading import psycopg2 # ---- WATCHDOG: kills process if no heartbeat for 90 seconds ---- # This handles the case where asyncio.wait_for() hangs due to Playwright # protocol issues (corrupted WebSocket state, frozen browser). # The outer restart loop will restart from checkpoint. _heartbeat_time = time.time() _WATCHDOG_TIMEOUT = 180 # seconds — must be > login time (TOTP wait up to 30s + navigation 60-90s = ~150s) def _update_heartbeat(): global _heartbeat_time _heartbeat_time = time.time() def _watchdog_thread(): while True: time.sleep(10) elapsed_since_heartbeat = time.time() - _heartbeat_time if elapsed_since_heartbeat > _WATCHDOG_TIMEOUT: print(f"\n[WATCHDOG] No heartbeat for {elapsed_since_heartbeat:.0f}s — exiting for restart!", flush=True) sys.stdout.flush() # Exit with code 2 — the outer bash wrapper will restart from checkpoint os._exit(2) # code 2 = watchdog timeout, needs restart _wd = threading.Thread(target=_watchdog_thread, daemon=True) _wd.start() # ---- END WATCHDOG ---- class BrowserRestartNeeded(Exception): """Raised when browser is completely dead and must be fully restarted.""" def __init__(self, resume_page): self.resume_page = resume_page super().__init__(f"Browser restart needed from page {resume_page}") os.chdir(os.path.dirname(os.path.abspath(__file__))) if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True) if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True) EMAIL = "operation@mrlogisticcorp.com" PASSWORD = "NKh9i8Z!7fU9jfi" TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH" DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db' PAGE_DELAY = 2.0 # seconds between pages (2s to avoid MT rate limiting / session hangs) BATCH_SIZE = 500 # commit every N vessels # Cargo vessel types to KEEP (filter out tankers, passenger, military, etc.) CARGO_TYPES = { # GREEN — all cargo vessel types 'cargo', 'bulk carrier', 'general cargo', 'container ship', 'container vessel', 'ro-ro cargo', 'reefer', 'multipurpose', 'heavy load carrier', 'wood chips carrier', 'cement carrier', 'livestock carrier', 'vehicle carrier', 'self-discharging bulk carrier', 'open hatch bulk carrier', 'bulk', 'ore carrier', 'aggregates carrier', 'obo carrier', 'barge', 'special cargo', 'other cargo', 'nuclear fuel carrier', 'inland cargo', # RED — all tanker types 'tanker', 'oil products tanker', 'crude oil tanker', 'oil/chemical tanker', 'chemical tanker', 'lpg tanker', 'lng tanker', 'bunkering tanker', 'asphalt/bitumen tanker', 'water tanker', 'inland tanker', 'special tanker', 'other tanker', 'floating storage/production', 'oil', 'chemical', 'lpg', 'lng', 'bitumen', 'asphalt', 'fso', 'fpso', 'fsu', } # Columns to request — all ownership fields MT Pro provides API_COLS = ('shipname,imo,mmsi,flag,ship_type,dwt,year_built,' 'lat_of_latest_position,lon_of_latest_position,' 'beneficial_owner,registered_owner,operator,commercial_manager') # 500 rows per page — MT Pro default; 1000 causes memory pressure and Chromium crashes. PER_PAGE = 500 def totp(secret): s = secret.upper().replace(' ', '') pad = (-len(s)) % 8 key = base64.b32decode(s + '=' * pad) counter = int(time.time()) // 30 msg = struct.pack('>Q', counter) h = hmac.new(key, msg, hashlib.sha1).digest() offset = h[-1] & 0x0f code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff return str(code % 1000000).zfill(6) def db_connect(): return psycopg2.connect( DB_URL, connect_timeout=15, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5 ) def _restart_ssh_tunnel(): import subprocess try: subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5) except Exception: pass time.sleep(2) try: subprocess.Popen( ['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120', '-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no', '-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) print(f" [SSH] Tunnel restarted, waiting 5s...") time.sleep(5) except Exception as e: print(f" [SSH] Failed: {e}") def db_reconnect(conn): try: conn.close() except Exception: pass for attempt in range(5): try: time.sleep(3) c = db_connect() print(f" [DB] Reconnected (attempt {attempt+1})") return c, c.cursor() except Exception as e: print(f" [DB] Attempt {attempt+1} failed: {e}") print(f" [DB] Restarting SSH tunnel...") _restart_ssh_tunnel() for attempt in range(10): try: time.sleep(5) c = db_connect() print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})") return c, c.cursor() except Exception as e: print(f" [DB] Post-restart attempt {attempt+1} failed: {e}") if attempt == 4: _restart_ssh_tunnel() raise Exception("DB reconnect failed") def db_safe_execute(conn, cur, query, params=None): try: cur.execute(query, params) return conn, cur except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: print(f" [DB] Lost on execute ({e}), reconnecting...") conn, cur = db_reconnect(conn) cur.execute(query, params) return conn, cur except psycopg2.Error as e: try: conn.rollback() # Clear aborted transaction state except Exception: pass raise # Re-raise so caller can skip this record def db_safe_commit(conn): try: conn.commit() return conn except (psycopg2.InterfaceError, psycopg2.OperationalError): print(f" [DB] Lost on commit, reconnecting...") conn, _ = db_reconnect(conn) return conn def is_cargo(ship_type_str): """Return True if vessel is cargo (green) or tanker (red) type.""" if not ship_type_str: return False st = ship_type_str.lower().strip() for ct in CARGO_TYPES: if ct in st or st in ct: return True return False def totp_wait_for_fresh(min_remaining=8): """Wait until TOTP window has at least min_remaining seconds left, then return code. Avoids entering a code that will expire before the server validates it.""" while True: remaining = 30 - (int(time.time()) % 30) if remaining >= min_remaining: break wait = remaining + 1 print(f" [TOTP] Window too close to expiry ({remaining}s left), waiting {wait}s...", flush=True) time.sleep(wait) code = totp(TOTP_SECRET) remaining = 30 - (int(time.time()) % 30) print(f" [TOTP] Code={code} | {remaining}s remaining in window", flush=True) return code async def do_login(page, max_retries=3): for attempt in range(max_retries): _update_heartbeat() # Reset watchdog for each login attempt (can take 60-120s) print(f"LOGIN (attempt {attempt+1}/{max_retries})...") # Navigate to login page — use asyncio.wait_for as outer timeout guard try: await asyncio.wait_for( page.goto('https://www.marinetraffic.com/en/users/login', wait_until='domcontentloaded', timeout=30000), timeout=35.0 ) except asyncio.TimeoutError: print(f" goto asyncio timeout (35s) — continuing with current page", flush=True) except Exception as nav_err: print(f" goto error: {nav_err}") await asyncio.sleep(3) _update_heartbeat() # Reset after navigation (wait_for_selector can be slow) print(f" After goto: {page.url[:80]}") # --- Step 1: Fill email and click Continue --- try: # Wait for username input to appear (the page may redirect to auth.kpler) await page.wait_for_selector('input[name="username"]', timeout=20000) await page.fill('input[name="username"]', EMAIL, timeout=15000) print(f" Filled email", flush=True) # JS click to avoid blocking on navigation await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }") print(f" Clicked Continue (JS)", flush=True) # Wait for URL to change to password page try: await page.wait_for_url('**/login/password**', timeout=20000) print(f" Password page loaded", flush=True) except Exception: await asyncio.sleep(5) print(f" URL after Continue: {page.url[:80]}", flush=True) except Exception as fill_err: print(f" email/continue step error: {fill_err}", flush=True) await asyncio.sleep(5) # --- Step 2: Fill password --- try: await page.wait_for_selector('input[name="password"]', timeout=20000) print(f" Filling password... URL={page.url[:60]}", flush=True) await page.fill('input[name="password"]', PASSWORD, timeout=15000) # JS click to avoid Playwright blocking on navigation after password submit await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }") print(f" Clicked password submit (JS)", flush=True) except Exception as pass_err: print(f" password step error: {pass_err}", flush=True) # Wait for navigation to MFA options page try: await page.wait_for_url('**/mfa**', timeout=25000) print(f" MFA page loaded: {page.url[:60]}", flush=True) except Exception: await asyncio.sleep(5) url = page.url print(f" After password: {url[:80]}") # --- Step 3: MFA flow --- if 'auth.kpler' in url: # If on mfa-login-options page, click Google Authenticator button if 'mfa-login-options' in url: print(" Clicking Google Authenticator option...", flush=True) try: # JS click to avoid navigation blocking clicked = await page.evaluate(""" () => { const selectors = [ 'button[value*="otp"]', 'button[aria-label*="Authenticator"]', 'button[aria-label*="authenticator"]', ]; for (const sel of selectors) { const b = document.querySelector(sel); if (b) { b.click(); return sel; } } // Fallback: find by text for (const b of document.querySelectorAll('button')) { if (b.textContent.includes('Authenticator') || b.textContent.includes('authenticator')) { b.click(); return 'text:' + b.textContent.trim().slice(0,30); } } return null; } """) print(f" GA JS click: selector={clicked}", flush=True) # Wait for navigation to OTP challenge page await page.wait_for_url('**/mfa-otp-challenge**', timeout=20000) print(f" GA clicked, OTP page loaded", flush=True) except Exception as ex: print(f" GA click error: {ex}", flush=True) await asyncio.sleep(5) print(f" After GA click: {page.url[:80]}", flush=True) # Fill TOTP code (with timing check to avoid expiry during validation) if 'mfa-otp-challenge' in page.url or 'auth.kpler' in page.url: _update_heartbeat() # Reset watchdog — totp_wait_for_fresh can wait up to 30s otp = totp_wait_for_fresh(min_remaining=8) try: await page.wait_for_selector('input[name="code"]', timeout=15000) await page.fill('input[name="code"]', otp, timeout=15000) print(f" OTP filled, clicking submit via JS...", flush=True) # Use JS click to avoid Playwright navigation wait blocking await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }") except Exception as ex: print(f" 2FA fill/click error: {ex}", flush=True) _update_heartbeat() # Reset watchdog before wait_for_url (25s wait) # Wait for post-OTP navigation (MT home page or similar) try: await page.wait_for_url('**marinetraffic.com**', timeout=25000) print(f" OTP submitted, redirected: {page.url[:60]}", flush=True) except Exception: await asyncio.sleep(15) print(f" After OTP submit: {page.url[:80]}", flush=True) _update_heartbeat() # Reset after redirect ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url if ok: print(f" Login: OK | {page.url[:60]}") return True print(f" Login failed | {page.url[:80]}") if attempt < max_retries - 1: # Clear auth cookies and start fresh for next attempt try: await page.context.clear_cookies() print(f" Cleared cookies for retry", flush=True) except Exception: pass await asyncio.sleep(10) return False async def fetch_page(page, api_base_url, page_num, timeout_ms=30000): """Fetch one page of vessel data via page.evaluate(fetch()) with session cookies.""" # Build URL: replace existing page param or append it if 'page=' in api_base_url: url = re.sub(r'page=\d+', f'page={page_num}', api_base_url) else: url = api_base_url + f'&page={page_num}' js = f""" async () => {{ const controller = new AbortController(); const tid = setTimeout(() => controller.abort(), {timeout_ms}); try {{ const r = await fetch({json.dumps(url)}, {{ signal: controller.signal, credentials: 'include', cache: 'no-store', headers: {{ 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json', 'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels', }} }}); clearTimeout(tid); if (r.status !== 200) return {{error: 'HTTP ' + r.status, status: r.status}}; const text = await r.text(); return {{status: r.status, body: text}}; }} catch(e) {{ clearTimeout(tid); if (e.name === 'AbortError') return {{error: 'JS_TIMEOUT', status: -1}}; return {{error: e.message}}; }} }} """ try: # asyncio.wait_for() with 50s timeout. # If it fires, the browser is completely frozen (no JS response possible). # After this timeout we MUST do a full browser restart — we cannot use this # browser context anymore (cancelling corrupts Playwright protocol state). # The -3 return value signals BrowserRestartNeeded in the main loop. _update_heartbeat() # Reset watchdog timer before potentially-blocking call result = await asyncio.wait_for(page.evaluate(js), timeout=50.0) _update_heartbeat() # Update after evaluate returns if result.get('error'): err_msg = result.get('error', '') err_status = result.get('status', 0) if err_msg == 'JS_TIMEOUT' or err_status == -1: print(f" fetch JS_TIMEOUT page {page_num} (aborted after {timeout_ms}ms)", flush=True) return None, -1 # timeout signal → page reload print(f" fetch error page {page_num}: {err_msg} (HTTP {err_status})", flush=True) return None, err_status body = result.get('body', '') data = json.loads(body) total = data.get('totalCount', 0) rows = data.get('data', []) return rows, total except asyncio.TimeoutError: print(f" fetch HARD TIMEOUT page {page_num} (50s) — browser frozen, must restart", flush=True) return None, -3 # -3 signals browser dead → must restart entire browser except Exception as e: err_str = str(e) print(f" fetch error page {page_num}: {err_str[:120]}", flush=True) if 'Target crashed' in err_str or 'target crashed' in err_str.lower(): return None, -2 # -2 signals tab crash → create new tab if 'closed' in err_str.lower() or 'TargetClosed' in err_str: return None, -2 # treat as tab crash return None, 0 def parse_row(row): """Extract vessel data from one API row dict.""" if not isinstance(row, dict): return None v = {} for k in ['SHIP_ID', 'SHIPID']: if k in row and row[k]: v['ship_id'] = str(row[k]) break for k in ['SHIPNAME', 'NAME']: if k in row and row[k]: v['name'] = str(row[k]).strip() break for k in ['MMSI', 'mmsi']: val = row.get(k) if val and str(val) not in ('0', '', 'None'): v['mmsi'] = str(val) break for k in ['IMO', 'imo']: val = row.get(k) if val and str(val) not in ('0', '', 'None'): v['imo'] = str(val) break for k in ['FLAG', 'flag']: if k in row and row[k]: v['flag'] = str(row[k]) break for k in ['TYPE_SUMMARY', 'SHIP_TYPE', 'SHIPTYPE']: if k in row and row[k]: v['ship_type'] = str(row[k]) break for k in ['DWT', 'dwt']: if k in row and row[k]: try: v['dwt'] = int(float(str(row[k]).replace(',', ''))) except Exception: pass break for k in ['YEAR_BUILT', 'year_built']: if k in row and row[k]: try: v['year_built'] = int(row[k]) except Exception: pass break for k in ['LAT_OF_LATEST_POSITION', 'LAT', 'lat']: if k in row and row[k]: try: v['lat'] = float(row[k]) except Exception: pass break for k in ['LON_OF_LATEST_POSITION', 'LON', 'lon']: if k in row and row[k]: try: v['lon'] = float(row[k]) except Exception: pass break for k in ['BENEFICIAL_OWNER', 'MANAGER_OWNER']: if k in row and row[k]: v['owner'] = str(row[k]) break for k in ['REGISTERED_OWNER']: if k in row and row[k]: v['registered_owner'] = str(row[k]) break for k in ['OPERATOR']: if k in row and row[k]: v['operator'] = str(row[k]) break for k in ['COMMERCIAL_MANAGER', 'commercial_manager']: if k in row and row[k]: v['commercial_manager'] = str(row[k]) break if not v.get('ship_id') and not v.get('mmsi') and not v.get('name'): return None return v async def main(): parser = argparse.ArgumentParser() parser.add_argument('--probe', action='store_true', help='Test: first 5 pages only') parser.add_argument('--start', type=int, default=1, help='Start from page N (resume)') parser.add_argument('--max-pages', type=int, default=0, help='Max pages (0=unlimited)') parser.add_argument('--delay', type=float, default=PAGE_DELAY) args = parser.parse_args() if args.probe: args.max_pages = 5 # --- DB --- try: conn = db_connect() cur = conn.cursor() cur.execute('SELECT count(*) FROM mt_bulk_staging') existing_count = cur.fetchone()[0] cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL') existing_ids = set(str(r[0]) for r in cur.fetchall()) print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids") except Exception as e: print(f"DB ERROR: {e}") print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158") return from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch( headless=False, # Must be False: CF TLS fingerprint check + MT session cookies args=[ '--no-sandbox', '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', ] ) ctx = await browser.new_context( viewport={'width': 800, 'height': 600}, # Smaller viewport = less GPU memory user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', ) page = await ctx.new_page() # NOTE: Do NOT set default_timeout here — it would break login flow. # Crash/hang recovery is handled via JS AbortController in fetch_page() # and the -2 crash signal → recover_page() in the main loop. COOKIE_FILE = 'mt_session_cookies.json' # Try loading saved cookies first (fast path: no login needed) cookie_loaded = False if args.start > 1 and os.path.exists(COOKIE_FILE): try: with open(COOKIE_FILE) as f: saved_cookies = json.load(f) await ctx.add_cookies(saved_cookies) print(f"Loaded {len(saved_cookies)} cookies from {COOKIE_FILE}") # Navigate to /data/ with 'commit' to activate session token server-side. # Then navigate to /ais/home (lightweight, same-origin) to stay in valid context. try: await asyncio.wait_for( page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels', wait_until='commit', timeout=30000), timeout=35.0 ) except Exception: pass await asyncio.sleep(2) try: await asyncio.wait_for( page.goto('https://www.marinetraffic.com/en/ais/home', wait_until='commit', timeout=20000), timeout=25.0 ) except Exception: pass await asyncio.sleep(2) # Quick API test to verify session validity from same-origin context test_url = ('https://www.marinetraffic.com/en/reports/?asset_type=vessels' '&columns=shipname&per_page=1&page=1') test_rows, test_total = await fetch_page(page, test_url, 1) if test_rows is not None: print(f"Cookie session valid! Skipping login.") cookie_loaded = True else: print(f"Cookie session invalid (status {test_total}). Will re-login.") await ctx.clear_cookies() except Exception as ck_err: print(f"Cookie load failed: {ck_err}. Will login fresh.") # --- Login (skip if cookies loaded successfully) --- if not cookie_loaded: if not await do_login(page): print("LOGIN FAILED!") await browser.close() conn.close() return # Wait for OAuth auto-redirect to complete (MT redirects oauth/callback → /ais/home) # This prevents the /data/ navigation from conflicting with the pending redirect. try: await page.wait_for_url('**/ais/home**', timeout=15000) print(f" OAuth redirect complete: {page.url[:60]}", flush=True) except Exception: await asyncio.sleep(5) # Wait anyway even if URL match fails print(f" After redirect wait: {page.url[:60]}", flush=True) # Save cookies for fast restart try: cookies = await ctx.cookies() with open(COOKIE_FILE, 'w') as f: json.dump(cookies, f) print(f"Saved {len(cookies)} session cookies to {COOKIE_FILE}") except Exception as ck_err: print(f"Cookie save failed (non-fatal): {ck_err}") # After OAuth redirect we're already on marinetraffic.com with valid session. # The reports API fetch() calls work when the browser is on /ais/home. # In headless=True mode, /ais/home's WebSocket map does NOT consume significant # resources (no GPU/canvas rendering), so it's safe to stay here. # DO NOT navigate to /en/reports/... directly — that causes HTTP 403 on subsequent # fetch() calls (MT server sees it as a direct API hit, not a browser XHR). # Must visit /data/ before /reports/ API — MT sets a required session token # (e.g. 'mt-data-access') when the /data/ page loads. # Must visit /data/ to set the MT session token for the reports API. # Without this visit, all /en/reports/ requests return 403. # We navigate with domcontentloaded and wait briefly, then STAY on /data/. # CDP garbage collection every 15 pages prevents V8 OOM crashes. _update_heartbeat() # Reset watchdog before /data/ navigation (can take 30s+) print("\nNavigating to /data/ (required to unlock reports API)...") try: # Use asyncio.wait_for() as a fallback in case Playwright's internal # timeout doesn't fire (known issue when browser is under load). await asyncio.wait_for( page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels', wait_until='commit', timeout=30000), timeout=35.0 ) print(f" /data/ commit done", flush=True) _update_heartbeat() # Reset after commit except asyncio.TimeoutError: print(f" /data/ nav asyncio timeout (35s) — continuing anyway", flush=True) except Exception as nav_err: print(f" /data/ nav error (non-fatal): {nav_err}", flush=True) # Save cookies immediately after /data/ commit (before SPA JS executes and crashes renderer) try: cookies = await ctx.cookies() with open(COOKIE_FILE, 'w') as f: json.dump(cookies, f) print(f"Saved {len(cookies)} cookies (after /data/ visit)") except Exception: pass # Navigate to MT robots.txt — a plain text file, no JavaScript, no SPA, # no crash risk. Same origin as marinetraffic.com so fetch() works without CORS. try: await asyncio.wait_for( page.goto('https://www.marinetraffic.com/robots.txt', wait_until='domcontentloaded', timeout=15000), timeout=18.0 ) print(f" Navigated to robots.txt (lightweight static page)", flush=True) except Exception as nav_err: print(f" robots.txt nav error: {nav_err} — continuing anyway", flush=True) await asyncio.sleep(1) try: print(f" Page URL: {page.url[:80]}") except Exception: print(f" Page URL: (unavailable)") # Hardcoded API base URL — confirmed working format base_url = 'https://www.marinetraffic.com/en/reports/?asset_type=vessels' api_base = f'{base_url}&columns={API_COLS}&per_page={PER_PAGE}' print(f" API base: {api_base[:120]}") # --- Test page 1 (with retry on 403 — session might need extra time) --- # MT reports API sometimes requires prior visit to /data/ page to unlock. print(f"\nTesting API (page 1)...") rows1, total1 = None, 0 data_page_visited = False for api_attempt in range(6): rows1, total1 = await fetch_page(page, api_base, 1) if rows1 is not None: break if total1 == 403: if api_attempt == 2 and not data_page_visited: # After 2 failed attempts, revisit /data/ with 'commit' to unlock session print(f" API 403 x{api_attempt+1} — re-visiting /data/ (commit) to unlock...") try: await asyncio.wait_for( page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels', wait_until='commit', timeout=30000), timeout=35.0 ) await asyncio.sleep(3) await asyncio.wait_for( page.goto('https://www.marinetraffic.com/robots.txt', wait_until='domcontentloaded', timeout=15000), timeout=18.0 ) print(f" /data/ commit + back to robots.txt", flush=True) data_page_visited = True except Exception as data_err: print(f" /data/ nav error: {data_err}") else: print(f" API 403 — waiting 15s for session to establish (attempt {api_attempt+1}/6)...") await asyncio.sleep(15) else: print(f" API error (status {total1}) — waiting 5s...") await asyncio.sleep(5) if rows1 is None: print("ERROR: API calls failing after retries. Exiting.") await browser.close() conn.close() return total_in_mt = total1 est_pages = (total_in_mt + len(rows1) - 1) // max(len(rows1), 1) if rows1 else 0 print(f" Page 1: {len(rows1)} rows | totalCount={total_in_mt} | est_pages={est_pages}") # Sample row keys if rows1: print(f" Row keys: {list(rows1[0].keys())[:15]}") v = parse_row(rows1[0]) print(f" Sample: {v}") # --- Main pagination loop --- total_saved = 0 total_cargo = 0 total_skipped = 0 batch_count = 0 t0 = time.time() start_page = args.start max_pages = args.max_pages if args.max_pages else est_pages + 10 consecutive_errors = 0 checkpoint_file = 'mt_green_fleet_progress.json' # Load checkpoint for resume if args.start > 1 and os.path.exists(checkpoint_file): with open(checkpoint_file) as f: cp = json.load(f) total_saved = cp.get('total_saved', 0) total_cargo = cp.get('total_cargo', 0) print(f" Resuming from page {start_page} | saved={total_saved}") # If starting from page 1, the test already fetched page 1. # Process rows1 directly and start the loop from page 2 to avoid double-fetch # (MT rate-limits duplicate requests immediately and hangs the connection). if args.start == 1 and rows1: print(f" Processing test page 1 results directly (no double-fetch)...", flush=True) for row in rows1: v = parse_row(row) if not v: continue ship_type = v.get('ship_type', '') if not is_cargo(ship_type): total_skipped += 1 continue total_cargo += 1 sid = v.get('ship_id') if not sid: continue is_new = sid not in existing_ids existing_ids.add(sid) st_lower = ship_type.lower() if 'bulk' in st_lower: gt, cat = '6', 'bulk' elif 'container' in st_lower: gt, cat = '11', 'container' elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower: gt, cat = '12', 'roro' else: gt, cat = '9', 'general' try: conn, cur = db_safe_execute(conn, cur, """ INSERT INTO mt_bulk_staging (ship_id, name, flag, dwt, gt_shiptype, type_category, lat, lon, mmsi, imo, owner, registered_owner, operator, commercial_manager, scraped_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW()) ON CONFLICT (ship_id) DO UPDATE SET name=COALESCE(EXCLUDED.name,mt_bulk_staging.name), mmsi=COALESCE(EXCLUDED.mmsi,mt_bulk_staging.mmsi), imo=COALESCE(EXCLUDED.imo,mt_bulk_staging.imo), owner=COALESCE(EXCLUDED.owner,mt_bulk_staging.owner), registered_owner=COALESCE(EXCLUDED.registered_owner,mt_bulk_staging.registered_owner), operator=COALESCE(EXCLUDED.operator,mt_bulk_staging.operator), commercial_manager=COALESCE(EXCLUDED.commercial_manager,mt_bulk_staging.commercial_manager), lat=COALESCE(EXCLUDED.lat,mt_bulk_staging.lat), lon=COALESCE(EXCLUDED.lon,mt_bulk_staging.lon), scraped_at=NOW() """, (sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat, v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'), v.get('owner'), v.get('registered_owner'), v.get('operator'), v.get('commercial_manager'),)) if is_new: total_saved += 1 batch_count += 1 except Exception as e: print(f" DB error (test-page1): {str(e)[:80]}") # Skip page 1 in the loop since we already processed it start_page = 2 print(f"\n{'='*60}") print(f"STARTING SCRAPE: pages {start_page} to {min(max_pages, est_pages)}") print(f"Delay: {args.delay}s/page | Batch: {BATCH_SIZE}") print(f"{'='*60}\n") # Reload interval: DISABLED — reloading the heavy MT SPA page crashes the browser. # Session cookies are maintained in the browser context, so no reload needed. # Recovery via recover_page() only when a crash is actually detected. PAGE_RELOAD_INTERVAL = 999999 async def recover_page(reason="crash"): """Close crashed page and open a fresh one, re-establish /data/ session context. If the browser context itself is dead, raises BrowserRestartNeeded so the outer loop can restart the whole browser from the checkpoint.""" nonlocal page print(f" [Recovery] Creating new tab ({reason})...", flush=True) try: await page.close() except Exception: pass await asyncio.sleep(5) try: page = await ctx.new_page() except Exception as ctx_err: # Browser context is dead — escalate to full browser restart err_s = str(ctx_err) print(f" [Recovery] BrowserContext dead ({err_s[:80]}), escalating to browser restart...", flush=True) with open(checkpoint_file, 'w') as f: json.dump({'page': page_num, 'total_saved': total_saved, 'total_cargo': total_cargo, 'total_skipped': total_skipped, 'elapsed': time.time() - t0}, f) try: await browser.close() except Exception: pass raise BrowserRestartNeeded(page_num) try: # Navigate to robots.txt — static text, no SPA, same origin as MT. # fetch() calls to the reports API work from this page without CORS issues. await page.goto('https://www.marinetraffic.com/robots.txt', wait_until='domcontentloaded', timeout=15000) await asyncio.sleep(1) print(f" [Recovery] New tab ready at robots.txt", flush=True) return True except Exception as re_err: print(f" [Recovery] New tab goto failed: {re_err}", flush=True) return False # V8 heap GC every N fetches via CDP — no page navigation needed. PAGE_REFRESH_EVERY = 15 # run CDP GC every 15 API fetches for page_num in range(start_page, max_pages + 1): # Periodic CDP GC to clear V8 heap and prevent Chromium OOM crash. # Does NOT navigate away — no CORS issues, no SPA reload. fetch_count = page_num - start_page if fetch_count > 0 and fetch_count % PAGE_REFRESH_EVERY == 0: print(f" [HeapFlush] Requesting V8 GC at page {page_num}...", flush=True) try: # Request garbage collection via Playwright CDP (DevTools Protocol). # This frees accumulated V8 heap WITHOUT navigating away from the page. # No tab crash, no CORS issues, no SPA reload needed. cdp = await page.context.new_cdp_session(page) await cdp.send('HeapProfiler.collectGarbage') await cdp.detach() print(f" [HeapFlush] GC done", flush=True) except Exception as flush_err: print(f" [HeapFlush] GC error: {flush_err} — continuing anyway", flush=True) # Fetch page rows, total = await fetch_page(page, api_base, page_num) if rows is None: consecutive_errors += 1 is_browser_dead = total == -3 # hard timeout → browser must be restarted is_crash = total == -2 # tab crash → new tab is_timeout = total == -1 # JS timeout → new tab need_new_tab = is_timeout or is_crash if is_browser_dead: # Save checkpoint and raise so outer loop restarts browser print(f" Page {page_num}: BROWSER DEAD — saving checkpoint and restarting browser...", flush=True) with open(checkpoint_file, 'w') as f: json.dump({'page': page_num, 'total_saved': total_saved, 'total_cargo': total_cargo, 'total_skipped': total_skipped, 'elapsed': time.time() - t0}, f) try: await browser.close() except Exception: pass raise BrowserRestartNeeded(page_num) if need_new_tab: reason = 'CRASH' if is_crash else 'TIMEOUT' print(f" Page {page_num}: {reason} — creating new tab...", flush=True) ok = await recover_page("tab_crash" if is_crash else "timeout") if not ok: # Recovery failed immediately — context dead, escalate to browser restart print(f" Immediate recovery failed — restarting browser from page {page_num}", flush=True) with open(checkpoint_file, 'w') as f: json.dump({'page': page_num, 'total_saved': total_saved, 'total_cargo': total_cargo, 'total_skipped': total_skipped, 'elapsed': time.time() - t0}, f) try: await browser.close() except Exception: pass raise BrowserRestartNeeded(page_num) consecutive_errors = max(consecutive_errors - 1, 0) continue print(f" Page {page_num}: FAILED (error #{consecutive_errors})", flush=True) if consecutive_errors >= 3: # Create fresh tab after 3 consecutive errors print(f" {consecutive_errors} consecutive errors — creating fresh tab...", flush=True) ok = await recover_page(f"errors_{consecutive_errors}") if ok: consecutive_errors = 0 else: # Recovery failed — escalate to full browser restart print(f" Recovery failed — restarting browser from page {page_num}", flush=True) with open(checkpoint_file, 'w') as f: json.dump({'page': page_num, 'total_saved': total_saved, 'total_cargo': total_cargo, 'total_skipped': total_skipped, 'elapsed': time.time() - t0}, f) try: await browser.close() except Exception: pass raise BrowserRestartNeeded(page_num) continue await asyncio.sleep(3) continue consecutive_errors = 0 if not rows: print(f" Page {page_num}: empty — reached end") break # Parse and filter page_cargo = 0 for row in rows: v = parse_row(row) if not v: continue # Filter: keep only cargo types ship_type = v.get('ship_type', '') if not is_cargo(ship_type): total_skipped += 1 continue total_cargo += 1 page_cargo += 1 sid = v.get('ship_id') if not sid: continue is_new = sid not in existing_ids existing_ids.add(sid) # Map gt_shiptype code st_lower = ship_type.lower() if 'bulk' in st_lower: gt = '6' cat = 'bulk' elif 'container' in st_lower: gt = '11' cat = 'container' elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower: gt = '12' cat = 'roro' else: gt = '9' cat = 'general' try: conn, cur = db_safe_execute(conn, cur, """ INSERT INTO mt_bulk_staging (ship_id, name, flag, dwt, gt_shiptype, type_category, lat, lon, mmsi, imo, owner, registered_owner, operator, commercial_manager, scraped_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW()) ON CONFLICT (ship_id) DO UPDATE SET name = COALESCE(EXCLUDED.name, mt_bulk_staging.name), mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi), imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo), owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner), registered_owner = COALESCE(EXCLUDED.registered_owner, mt_bulk_staging.registered_owner), operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator), commercial_manager = COALESCE(EXCLUDED.commercial_manager, mt_bulk_staging.commercial_manager), lat = COALESCE(EXCLUDED.lat, mt_bulk_staging.lat), lon = COALESCE(EXCLUDED.lon, mt_bulk_staging.lon), scraped_at = NOW() """, ( sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat, v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'), v.get('owner'), v.get('registered_owner'), v.get('operator'), v.get('commercial_manager'), )) if is_new: total_saved += 1 batch_count += 1 except Exception as e: print(f" DB error ({type(e).__name__}): {str(e)[:80]}") # Commit batch if batch_count >= BATCH_SIZE: conn = db_safe_commit(conn) cur = conn.cursor() batch_count = 0 # Progress elapsed = time.time() - t0 rate = (page_num - start_page + 1) / elapsed if elapsed > 0 else 0 remaining_pages = (est_pages - page_num) eta_min = remaining_pages / rate / 60 if rate > 0 else 0 _update_heartbeat() # Reset watchdog on each page processed if page_num <= 20 or page_num % 10 == 0: print(f" Page {page_num}/{est_pages} | " f"+{page_cargo} cargo | " f"total_new={total_saved} | " f"skipped={total_skipped} | " f"{elapsed:.0f}s | ETA: {eta_min:.0f}m", flush=True) # Save checkpoint every 5 pages so restarts (watchdog or browser crash) resume close to current position if page_num % 5 == 0: with open(checkpoint_file, 'w') as f: json.dump({ 'page': page_num, 'total_saved': total_saved, 'total_cargo': total_cargo, 'total_skipped': total_skipped, 'elapsed': elapsed, }, f) await asyncio.sleep(args.delay) # Final commit conn = db_safe_commit(conn) cur = conn.cursor() # Final stats cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging") t, m, o = cur.fetchone() cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6'") tb, mb, ob = cur.fetchone() elapsed = time.time() - t0 print(f"\n{'='*60}") print(f"SCRAPE COMPLETE — {elapsed/60:.1f} minutes") print(f" Pages done: {page_num - start_page + 1}") print(f" New vessels: {total_saved}") print(f" Cargo found: {total_cargo}") print(f" Non-cargo skipped: {total_skipped}") print(f"\nGLOBAL: total={t} | mmsi={m} ({m*100//max(t,1)}%) | owner={o} ({o*100//max(t,1)}%)") print(f"BULK: total={tb} | mmsi={mb} ({mb*100//max(tb,1)}%) | owner={ob} ({ob*100//max(tb,1)}%)") print(f"{'='*60}") conn.close() await browser.close() import traceback # Outer restart loop — handles BrowserRestartNeeded by restarting from checkpoint restart_count = 0 max_restarts = 200 # allow up to 200 browser restarts (plenty for 5752 pages) while True: try: asyncio.run(main()) break # Normal completion except BrowserRestartNeeded as br: restart_count += 1 print(f"\n[RESTART #{restart_count}] Browser dead at page {br.resume_page}. Restarting from checkpoint...", flush=True) if restart_count >= max_restarts: print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True) sys.exit(1) # Patch sys.argv to resume from the checkpoint page if '--start' in sys.argv: idx = sys.argv.index('--start') sys.argv[idx+1] = str(br.resume_page) else: sys.argv.extend(['--start', str(br.resume_page)]) time.sleep(10) # Brief pause before restarting browser continue except Exception as top_err: err_s = str(top_err) # TargetClosedError / context closed → try browser restart from checkpoint if 'TargetClosed' in type(top_err).__name__ or 'Target page, context or browser has been closed' in err_s or 'browser has been closed' in err_s: restart_count += 1 print(f"\n[RESTART #{restart_count}] Browser closed unexpectedly: {err_s[:120]}", flush=True) if restart_count >= max_restarts: print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True) sys.exit(1) # Try to resume from checkpoint if it exists try: import json as _json import glob as _glob # Search for the progress checkpoint file used by main() _chk_files = sorted( _glob.glob('mt_green_fleet_progress.json') + _glob.glob('mt_green_fleet_checkpoint*.json'), key=lambda x: -os.path.getmtime(x) ) if _chk_files: with open(_chk_files[0]) as _f: _chk = _json.load(_f) _resume = _chk.get('page', 1) print(f"[RESTART] Resuming from checkpoint page {_resume}...", flush=True) if '--start' in sys.argv: idx = sys.argv.index('--start') sys.argv[idx+1] = str(_resume) else: sys.argv.extend(['--start', str(_resume)]) except Exception as chk_err: print(f"[RESTART] Could not read checkpoint: {chk_err}", flush=True) time.sleep(15) continue print(f"\nFATAL ERROR: {type(top_err).__name__}: {top_err}", flush=True) traceback.print_exc() sys.exit(1)