#!/usr/bin/env python3 """ MT Detail Sweep — Collect vessel specs + owner details (website, address) per vessel. For each vessel in mt_bulk_staging: 1. Reports API (quicksearch_shipid) → year_built, draught, loa, beam 2. Ownership API (/en/vessels/{id}/ownership) → owner_website, owner_address, companies_json Usage: python mt_detail_sweep.py # All vessels needing detail data python mt_detail_sweep.py --probe # Test 10 vessels python mt_detail_sweep.py --limit 100 # Process 100 vessels python mt_detail_sweep.py --type bulk # Only gt_shiptype=6 (bulk carriers) python mt_detail_sweep.py --delay 0.5 # Custom delay """ import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse, threading import psycopg2 os.chdir(os.path.dirname(os.path.abspath(__file__))) if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True) if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True) EMAIL = "operation@mrlogisticcorp.com" PASSWORD = "NKh9i8Z!7fU9jfi" TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH" DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db' DELAY = 0.8 BATCH = 5 # Reports API columns — includes year_built + physical specs REPORTS_COLS = 'shipname,imo,mmsi,flag,ship_type,dwt,year_built,draught,length,breadth,gross_tonnage' def totp(secret): s = secret.upper().replace(' ', '') pad = (-len(s)) % 8 key = base64.b32decode(s + '=' * pad) counter = int(time.time()) // 30 msg = struct.pack('>Q', counter) h = hmac.new(key, msg, hashlib.sha1).digest() offset = h[-1] & 0x0f code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff return str(code % 1000000).zfill(6) def db_connect(): return psycopg2.connect( DB_URL, connect_timeout=15, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5 ) def _restart_ssh_tunnel(): import subprocess try: subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5) except Exception: pass time.sleep(2) try: subprocess.Popen( ['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120', '-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no', '-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) print(f" [SSH] Tunnel restarted, waiting 5s...") time.sleep(5) except Exception as e: print(f" [SSH] Failed to restart: {e}") def db_reconnect(conn): try: conn.close() except Exception: pass for attempt in range(5): try: time.sleep(3) new_conn = db_connect() print(f" [DB] Reconnected (attempt {attempt+1})") return new_conn, new_conn.cursor() except Exception as e: print(f" [DB] Attempt {attempt+1} failed: {e}") print(f" [DB] Restarting SSH tunnel...") _restart_ssh_tunnel() for attempt in range(10): try: time.sleep(5) new_conn = db_connect() print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})") return new_conn, new_conn.cursor() except Exception as e: print(f" [DB] Post-restart attempt {attempt+1} failed: {e}") if attempt == 4: _restart_ssh_tunnel() raise Exception("DB reconnect failed") def db_safe_execute(conn, cur, query, params=None): try: cur.execute(query, params) return conn, cur except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: print(f" [DB] Lost on execute ({e}), reconnecting...") conn, cur = db_reconnect(conn) cur.execute(query, params) return conn, cur except psycopg2.Error as e: try: conn.rollback() except Exception: pass raise def db_safe_commit(conn): try: conn.commit() return conn except (psycopg2.InterfaceError, psycopg2.OperationalError): conn, _ = db_reconnect(conn) return conn async def do_login(page, max_retries=3): for attempt in range(max_retries): print(f"LOGIN (attempt {attempt+1}/{max_retries})...") await page.goto('https://www.marinetraffic.com/en/users/login', wait_until='domcontentloaded', timeout=30000) await asyncio.sleep(3) await page.fill('input[name="username"]', EMAIL) await page.click('button[type="submit"]') await asyncio.sleep(3) await page.fill('input[type="password"]', PASSWORD) await page.click('button[type="submit"]') await asyncio.sleep(4) if 'mfa' in page.url.lower() or 'auth.kpler' in page.url: try: await page.click('button:has-text("Google Authenticator")', timeout=3000) await asyncio.sleep(2) except Exception: pass await asyncio.sleep(1) # Wait for fresh TOTP window if we're in the last 3 seconds of the current one remaining_in_window = 30 - (int(time.time()) % 30) if remaining_in_window <= 3: print(f" TOTP: waiting {remaining_in_window+1}s for fresh window...", flush=True) await asyncio.sleep(remaining_in_window + 1) otp = totp(TOTP_SECRET) print(f" TOTP: {otp} (window expires in {30 - (int(time.time()) % 30)}s)") filled = False for selector in ['input[name="code"]', 'input[type="text"]', 'input[autocomplete="one-time-code"]', 'input[inputmode="numeric"]']: try: await page.fill(selector, otp, timeout=3000) filled = True print(f" Filled OTP via {selector}") break except Exception: continue if not filled: print(f" 2FA: could not fill OTP input", flush=True) else: print(f" 2FA: submitting via JS click...", flush=True) try: # Use JS click to avoid Playwright's navigation-wait blocking event loop await page.evaluate('() => { const btn = document.querySelector("button[type=submit]"); if(btn) btn.click(); }') print(f" 2FA: JS click done", flush=True) except Exception as e: print(f" 2FA JS click error: {e}", flush=True) # Fallback to Playwright click try: await page.click('button[type="submit"]', timeout=8000) print(f" 2FA: fallback click done", flush=True) except Exception as e2: print(f" 2FA fallback: {e2}", flush=True) print(f" Waiting 12s for redirect...", flush=True) # Use threading sleep instead of asyncio to avoid event loop issues import threading done = threading.Event() threading.Timer(12, done.set).start() while not done.is_set(): await asyncio.sleep(0.5) print(f" Current URL: {page.url[:80]}", flush=True) ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url if ok: print(f" Login: OK") return True print(f" Login attempt {attempt+1} failed | {page.url[:80]}") if attempt < max_retries - 1: await asyncio.sleep(5) return False _page_interrupted = False # global flag set by watchdog _vessel_start_time = 0.0 # updated before each vessel; watchdog monitors _current_ship_id = 0 # current vessel being processed SKIP_FILE = '/tmp/mt_detail_skip.txt' # ship_ids to skip (one per line) def load_skip_ids(): """Load set of ship_ids that caused hangs.""" try: with open(SKIP_FILE) as f: ids = set(int(line.strip()) for line in f if line.strip().isdigit()) if ids: print(f" [SKIP] Loaded {len(ids)} skip IDs: {sorted(ids)[:10]}", flush=True) return ids except FileNotFoundError: return set() except Exception as e: print(f" [SKIP] Error loading skip file: {e}", flush=True) return set() def _start_process_watchdog(timeout_sec=45): """Background thread: if a vessel takes > timeout_sec, write ship_id to skip file and force-exit.""" def _watcher(): while True: time.sleep(5) elapsed = time.time() - _vessel_start_time if _vessel_start_time > 0 and elapsed > timeout_sec: sid = _current_ship_id print(f"\n [WATCHDOG] Vessel ship_id={sid} hung for {elapsed:.0f}s — adding to skip list and exiting", flush=True) # Write to skip file BEFORE exit try: with open(SKIP_FILE, 'a') as f: f.write(f"{sid}\n") except Exception: pass sys.stdout.flush() sys.stderr.flush() os._exit(2) # Force terminate immediately break t = threading.Thread(target=_watcher, daemon=True) t.start() async def safe_evaluate(page, js, timeout_sec=22): """Run page.evaluate(js) — falls back to None on error.""" try: result = await page.evaluate(js) return result except Exception: return None async def fetch_reports(page, ship_id): """Get vessel specs via Reports API (year_built, draught, loa, beam).""" url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels' f'&columns={REPORTS_COLS}&quicksearch_shipid={ship_id}') js = f""" async () => {{ const controller = new AbortController(); const tid = setTimeout(() => controller.abort(), 20000); try {{ const r = await fetch({json.dumps(url)}, {{ signal: controller.signal, credentials: 'include', cache: 'no-store', headers: {{ 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json', 'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels', }} }}); clearTimeout(tid); if (r.status !== 200) return {{error: 'HTTP ' + r.status}}; const d = JSON.parse(await r.text()); const rows = d.data || []; if (rows.length === 0) return {{error: 'no rows'}}; return rows[0]; }} catch(e) {{ clearTimeout(tid); return {{error: e.message}}; }} }} """ result = await safe_evaluate(page, js, timeout_sec=22) if result and isinstance(result, dict) and not result.get('error'): return result return None async def fetch_ownership_detail(page, ship_id): """Get full ownership with address + website from /en/vessels/{id}/ownership JSON API.""" url = f'https://www.marinetraffic.com/en/vessels/{ship_id}/ownership' js = f""" async () => {{ const controller = new AbortController(); const tid = setTimeout(() => controller.abort(), 18000); try {{ const r = await fetch({json.dumps(url)}, {{ signal: controller.signal, credentials: 'include', cache: 'no-store', headers: {{ 'X-Requested-With': 'XMLHttpRequest', 'Accept': 'application/json', 'Referer': 'https://www.marinetraffic.com/en/ais/details/ships/shipid:{ship_id}', }} }}); clearTimeout(tid); if (r.status !== 200) return {{error: 'HTTP ' + r.status}}; return JSON.parse(await r.text()); }} catch(e) {{ clearTimeout(tid); return {{error: e.message}}; }} }} """ result = await safe_evaluate(page, js, timeout_sec=22) if result and isinstance(result, dict) and not result.get('error'): return result return None def parse_ownership(data): """Parse /en/vessels/{id}/ownership JSON → extract ALL company details per role. Stores EVERYTHING including companies with empty Contact (-). Address and country are valuable even without website. """ if not data: return {} current = data.get('current', []) result = { 'companies': [], # Full list for companies_json — includes ALL raw fields # Beneficial owner quick-access fields 'owner_website': None, 'owner_address': None, 'owner_country': None, 'owner_phone': None, 'owner_email': None, # Registered owner 'registered_owner_address': None, 'registered_owner_country': None, 'registered_owner_website': None, 'registered_owner_phone': None, 'registered_owner_email': None, # Commercial manager 'commercial_manager_address': None, 'commercial_manager_country': None, 'commercial_manager_website': None, # Operator 'operator_address': None, 'operator_country': None, 'operator_website': None, } ROLE_MAP = { 'beneficialOwner': 'beneficial_owner', 'registeredOwner': 'registered_owner', 'commercialManager': 'commercial_manager', 'disponentOwner': 'commercial_manager', 'operator': 'operator', 'technicalManager': 'technical_manager', 'ismManager': 'ism_manager', } for entry in current: name = entry.get('name', '') or '' types = entry.get('types', []) address = entry.get('address') or None country = entry.get('country') or None contact = entry.get('contact') or None # website URL email = entry.get('email') or None phone = entry.get('phone') or entry.get('telephone') or None since = entry.get('since') or entry.get('dateFrom') or None # Store one record per role (expand multi-role entries) # Include FULL raw entry so nothing from the API is lost for t in types: role = ROLE_MAP.get(t, t) company_entry = { 'role': role, 'name': name, 'address': address, 'country': country, 'website': contact, 'email': email, 'phone': phone, 'since': since, } # Add any extra fields MT might return that we don't explicitly handle for k, v in entry.items(): if k not in ('name', 'types', 'address', 'country', 'contact', 'email', 'phone', 'telephone', 'since', 'dateFrom', 'shipId', 'id'): company_entry[f'raw_{k}'] = v result['companies'].append(company_entry) # Populate per-role quick-access fields if role == 'beneficial_owner': result['owner_address'] = result['owner_address'] or address result['owner_country'] = result['owner_country'] or country result['owner_website'] = result['owner_website'] or contact result['owner_phone'] = result['owner_phone'] or phone result['owner_email'] = result['owner_email'] or email elif role == 'registered_owner': result['registered_owner_address'] = result['registered_owner_address'] or address result['registered_owner_country'] = result['registered_owner_country'] or country result['registered_owner_website'] = result['registered_owner_website'] or contact result['registered_owner_phone'] = result['registered_owner_phone'] or phone result['registered_owner_email'] = result['registered_owner_email'] or email elif role == 'commercial_manager': result['commercial_manager_address'] = result['commercial_manager_address'] or address result['commercial_manager_country'] = result['commercial_manager_country'] or country result['commercial_manager_website'] = result['commercial_manager_website'] or contact elif role == 'operator': result['operator_address'] = result['operator_address'] or address result['operator_country'] = result['operator_country'] or country result['operator_website'] = result['operator_website'] or contact return result def safe_float(v): try: f = float(v) return f if f > 0 else None except Exception: return None def safe_int(v): try: i = int(str(v).replace(',', '').strip()) return i if i > 0 else None except Exception: return None async def main(): parser = argparse.ArgumentParser() parser.add_argument('--probe', action='store_true') parser.add_argument('--limit', type=int, default=0) parser.add_argument('--type', choices=['bulk', 'all'], default='all') parser.add_argument('--delay', type=float, default=DELAY) args = parser.parse_args() try: conn = db_connect() cur = conn.cursor() # Show current coverage cur.execute('SELECT count(*) FROM mt_bulk_staging') total_all = cur.fetchone()[0] cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE gt_shiptype='6'") total_bulk = cur.fetchone()[0] cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE year_built IS NULL') no_year = cur.fetchone()[0] cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE owner_website IS NULL AND owner IS NOT NULL') no_web = cur.fetchone()[0] cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE draught IS NULL') no_draught = cur.fetchone()[0] print(f"DB: {total_all} total | {total_bulk} bulk") print(f" Missing year_built: {no_year}") print(f" Missing owner_website: {no_web}") print(f" Missing draught: {no_draught}") except Exception as e: print(f"DB ERROR: {e}") return from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=['--no-sandbox', '--disable-blink-features=AutomationControlled'] ) context = await browser.new_context( viewport={'width': 1440, 'height': 900}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36', ) page = await context.new_page() if not await do_login(page): await browser.close() conn.close() return await asyncio.sleep(3) # Navigate to data page to establish full session context try: await page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels', wait_until='domcontentloaded', timeout=40000) except Exception as e: print(f" [WARN] data page nav failed ({e}) — continuing anyway") await asyncio.sleep(5) # Build query — only vessels not yet fully processed # NOTE: year_built excluded from condition — Reports API never returns it, # so it's always NULL and would cause every vessel to be re-processed on restart. skip_ids = load_skip_ids() # ship_id column is TEXT in DB — use string literals in NOT IN clause skip_clause = f"AND ship_id NOT IN ({','.join(repr(str(s)) for s in skip_ids)})" if skip_ids else "" type_filter = "AND gt_shiptype='6'" if args.type == 'bulk' else "" query = f""" SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo, year_built, owner, owner_website FROM mt_bulk_staging WHERE (draught IS NULL OR companies_json IS NULL) {type_filter} {skip_clause} ORDER BY CASE WHEN gt_shiptype = '6' THEN 0 ELSE 1 END, dwt DESC NULLS LAST """ if args.limit: query += f" LIMIT {args.limit}" elif args.probe: query += " LIMIT 10" cur.execute(query) vessels = cur.fetchall() total = len(vessels) print(f"\nVessels to process: {total}") if total == 0: print("Nothing to do!") await browser.close() conn.close() return eta_sec = total * (args.delay * 2 + 0.8) print(f"Delay: {args.delay}s | ETA: ~{eta_sec/60:.0f} min") print(f"{'='*60}") got_year = 0 got_draught = 0 got_loa = 0 got_website = 0 no_reports = 0 no_ownership = 0 errors = 0 batch_count = 0 t0 = time.time() # Start process-level watchdog (kills process if vessel hangs > 45s) global _vessel_start_time _start_process_watchdog(timeout_sec=45) _debug_printed = False for i, (ship_id, name, gt, dwt, flag, mmsi, imo, existing_year, existing_owner, existing_website) in enumerate(vessels): _vessel_start_time = time.time() # watchdog monitors this global _current_ship_id _current_ship_id = ship_id # watchdog writes this to skip file on hang try: if i < 100: print(f" [START {i+1}/{total}] {name} (ship_id={ship_id}) +{time.time()-t0:.0f}s", flush=True) # 1. Reports API — specs reports = await fetch_reports(page, ship_id) if reports and not _debug_printed: print(f" [DEBUG] Reports API keys: {list(reports.keys())[:20]}") _debug_printed = True await asyncio.sleep(args.delay * 0.5) # 2. Ownership API — website + address ownership = await fetch_ownership_detail(page, ship_id) ownership_parsed = parse_ownership(ownership) updates = [] params = [] if reports: yr = safe_int(reports.get('YEAR_BUILT') or reports.get('year_built')) dr = safe_float(reports.get('DRAUGHT') or reports.get('draught')) lo = safe_float(reports.get('LENGTH') or reports.get('length') or reports.get('LOA')) bm = safe_float(reports.get('BREADTH') or reports.get('breadth') or reports.get('BEAM')) if yr and not existing_year: updates.append("year_built = %s"); params.append(yr); got_year += 1 if dr: updates.append("draught = COALESCE(draught, %s)"); params.append(dr); got_draught += 1 if lo: updates.append("loa = COALESCE(loa, %s)"); params.append(lo); got_loa += 1 if bm: updates.append("beam = COALESCE(beam, %s)"); params.append(bm) else: no_reports += 1 if ownership_parsed.get('companies'): companies_json = json.dumps(ownership_parsed['companies'], ensure_ascii=False) updates.append("companies_json = %s"); params.append(companies_json) web = ownership_parsed.get('owner_website') addr = ownership_parsed.get('owner_address') country_v = ownership_parsed.get('owner_country') if web and not existing_website: updates.append("owner_website = %s"); params.append(web); got_website += 1 if addr: updates.append("owner_address = COALESCE(owner_address, %s)"); params.append(addr) if country_v: updates.append("owner_country = COALESCE(owner_country, %s)"); params.append(country_v) # Beneficial owner phone/email if ownership_parsed.get('owner_phone'): updates.append("owner_phone = COALESCE(owner_phone, %s)") params.append(ownership_parsed['owner_phone']) if ownership_parsed.get('owner_email'): updates.append("owner_email = COALESCE(owner_email, %s)") params.append(ownership_parsed['owner_email']) # Per-role: address, country, website, phone, email for field, key in [ ('registered_owner_address', 'registered_owner_address'), ('registered_owner_country', 'registered_owner_country'), ('registered_owner_website', 'registered_owner_website'), ('registered_owner_phone', 'registered_owner_phone'), ('registered_owner_email', 'registered_owner_email'), ('commercial_manager_address', 'commercial_manager_address'), ('commercial_manager_country', 'commercial_manager_country'), ('commercial_manager_website', 'commercial_manager_website'), ('operator_address', 'operator_address'), ('operator_country', 'operator_country'), ('operator_website', 'operator_website'), ]: val = ownership_parsed.get(key) if val: updates.append(f"{field} = COALESCE({field}, %s)") params.append(val) else: no_ownership += 1 # Mark vessel as "processed" even if APIs returned no data, # to avoid endless retries. Sentinel values: # draught = 0.0 (means "checked, no draught data available") # companies_json = '[]' (means "checked, no company data available") # Apply sentinels only for the fields still NULL after processing: has_draught_update = any('draught' in u for u in updates) has_companies_update = any('companies_json' in u for u in updates) if not has_draught_update: updates.append("draught = COALESCE(draught, 0.0)") if not has_companies_update: updates.append("companies_json = COALESCE(companies_json, '[]')") if updates: updates.append("scraped_at = NOW()") params.append(ship_id) conn, cur = db_safe_execute(conn, cur, f"UPDATE mt_bulk_staging SET {', '.join(updates)} WHERE ship_id = %s", tuple(params)) # Progress show = i < 100 or (i + 1) % 50 == 0 if show: yr_val = reports.get('YEAR_BUILT') if reports else '?' web_val = (ownership_parsed.get('owner_website') or '—')[:40] if ownership_parsed else '—' dr_val = reports.get('DRAUGHT') if reports else '?' elapsed_v = time.time() - t0 print(f" [{i+1}/{total}] {name} | yr={yr_val} dr={dr_val} | {web_val} | +{elapsed_v:.0f}s") except Exception as e: errors += 1 print(f" [{i+1}/{total}] {name} ERROR: {e}") _vessel_start_time = 0.0 # vessel done — reset watchdog timer batch_count += 1 if batch_count >= BATCH: conn = db_safe_commit(conn) cur = conn.cursor() elapsed = time.time() - t0 rate = (i + 1) / elapsed if elapsed > 0 else 0 remaining = (total - i - 1) / rate if rate > 0 else 0 print(f"\n=== CHECKPOINT [{i+1}/{total}] {elapsed:.0f}s | " f"year={got_year} draught={got_draught} loa={got_loa} web={got_website} " f"no_rep={no_reports} no_own={no_ownership} err={errors} | " f"ETA: {remaining/60:.0f}m ===\n") batch_count = 0 await asyncio.sleep(args.delay) # Final commit conn = db_safe_commit(conn) cur = conn.cursor() # Summary cur.execute("SELECT count(*), count(year_built), count(draught), count(owner_website) FROM mt_bulk_staging WHERE gt_shiptype='6'") t, yr, dr, web = cur.fetchone() elapsed = time.time() - t0 print(f"\n{'='*60}") print(f"DONE in {elapsed/60:.1f} minutes!") print(f" Processed: {total}") print(f" New year_built: {got_year}") print(f" New draught: {got_draught}") print(f" New loa: {got_loa}") print(f" New websites: {got_website}") print(f" No reports data: {no_reports}") print(f" No ownership data: {no_ownership}") print(f" Errors: {errors}") print(f"\nBULK CARRIERS: total={t} year_built={yr} draught={dr} website={web}") print(f"{'='*60}") conn.close() await browser.close() asyncio.run(main())