1121 lines
51 KiB
Python
1121 lines
51 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
MT Green Fleet Scraper — Get ALL cargo vessels from MT Data page via direct API pagination.
|
||
|
||
Strategy:
|
||
1. Login to MT Pro
|
||
2. Navigate to Data page (establishes session context)
|
||
3. Capture the API URL from the first response
|
||
4. Page through ALL results via page.evaluate(fetch()) with page=N
|
||
5. Filter client-side: keep only cargo types (Cargo, Bulk, Container, General, Multipurpose, Ro-Ro)
|
||
6. Save to mt_bulk_staging with all available data (name, mmsi, imo, owner, position)
|
||
|
||
MT has ~2.8M vessels total. Cargo ~30-40% = ~900k vessels.
|
||
At 0.5s/page × 500/page → ~900k vessels in ~30 minutes!
|
||
|
||
Usage:
|
||
python mt_green_fleet.py --probe # Test: first 5 pages, show API format
|
||
python mt_green_fleet.py # Full scrape: all cargo vessels
|
||
python mt_green_fleet.py --start 100 # Resume from page 100
|
||
python mt_green_fleet.py --max-pages 500 # Limit pages
|
||
"""
|
||
import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse, re
|
||
import threading
|
||
import psycopg2
|
||
|
||
|
||
# ---- WATCHDOG: kills process if no heartbeat for 90 seconds ----
|
||
# This handles the case where asyncio.wait_for() hangs due to Playwright
|
||
# protocol issues (corrupted WebSocket state, frozen browser).
|
||
# The outer restart loop will restart from checkpoint.
|
||
_heartbeat_time = time.time()
|
||
_WATCHDOG_TIMEOUT = 180 # seconds — must be > login time (TOTP wait up to 30s + navigation 60-90s = ~150s)
|
||
|
||
def _update_heartbeat():
|
||
global _heartbeat_time
|
||
_heartbeat_time = time.time()
|
||
|
||
def _watchdog_thread():
|
||
while True:
|
||
time.sleep(10)
|
||
elapsed_since_heartbeat = time.time() - _heartbeat_time
|
||
if elapsed_since_heartbeat > _WATCHDOG_TIMEOUT:
|
||
print(f"\n[WATCHDOG] No heartbeat for {elapsed_since_heartbeat:.0f}s — exiting for restart!", flush=True)
|
||
sys.stdout.flush()
|
||
# Exit with code 2 — the outer bash wrapper will restart from checkpoint
|
||
os._exit(2) # code 2 = watchdog timeout, needs restart
|
||
|
||
_wd = threading.Thread(target=_watchdog_thread, daemon=True)
|
||
_wd.start()
|
||
# ---- END WATCHDOG ----
|
||
|
||
|
||
class BrowserRestartNeeded(Exception):
|
||
"""Raised when browser is completely dead and must be fully restarted."""
|
||
def __init__(self, resume_page):
|
||
self.resume_page = resume_page
|
||
super().__init__(f"Browser restart needed from page {resume_page}")
|
||
|
||
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
||
if hasattr(sys.stdout, 'reconfigure'):
|
||
sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
|
||
if hasattr(sys.stderr, 'reconfigure'):
|
||
sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
|
||
|
||
EMAIL = "operation@mrlogisticcorp.com"
|
||
PASSWORD = "NKh9i8Z!7fU9jfi"
|
||
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
|
||
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
||
|
||
PAGE_DELAY = 2.0 # seconds between pages (2s to avoid MT rate limiting / session hangs)
|
||
BATCH_SIZE = 500 # commit every N vessels
|
||
|
||
# Cargo vessel types to KEEP (filter out tankers, passenger, military, etc.)
|
||
CARGO_TYPES = {
|
||
# GREEN — all cargo vessel types
|
||
'cargo', 'bulk carrier', 'general cargo', 'container ship', 'container vessel',
|
||
'ro-ro cargo', 'reefer', 'multipurpose', 'heavy load carrier',
|
||
'wood chips carrier', 'cement carrier', 'livestock carrier', 'vehicle carrier',
|
||
'self-discharging bulk carrier', 'open hatch bulk carrier', 'bulk',
|
||
'ore carrier', 'aggregates carrier', 'obo carrier', 'barge',
|
||
'special cargo', 'other cargo', 'nuclear fuel carrier',
|
||
'inland cargo',
|
||
# RED — all tanker types
|
||
'tanker', 'oil products tanker', 'crude oil tanker', 'oil/chemical tanker',
|
||
'chemical tanker', 'lpg tanker', 'lng tanker', 'bunkering tanker',
|
||
'asphalt/bitumen tanker', 'water tanker', 'inland tanker',
|
||
'special tanker', 'other tanker', 'floating storage/production',
|
||
'oil', 'chemical', 'lpg', 'lng', 'bitumen', 'asphalt',
|
||
'fso', 'fpso', 'fsu',
|
||
}
|
||
|
||
# Columns to request — all ownership fields MT Pro provides
|
||
API_COLS = ('shipname,imo,mmsi,flag,ship_type,dwt,year_built,'
|
||
'lat_of_latest_position,lon_of_latest_position,'
|
||
'beneficial_owner,registered_owner,operator,commercial_manager')
|
||
# 500 rows per page — MT Pro default; 1000 causes memory pressure and Chromium crashes.
|
||
PER_PAGE = 500
|
||
|
||
|
||
def totp(secret):
|
||
s = secret.upper().replace(' ', '')
|
||
pad = (-len(s)) % 8
|
||
key = base64.b32decode(s + '=' * pad)
|
||
counter = int(time.time()) // 30
|
||
msg = struct.pack('>Q', counter)
|
||
h = hmac.new(key, msg, hashlib.sha1).digest()
|
||
offset = h[-1] & 0x0f
|
||
code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff
|
||
return str(code % 1000000).zfill(6)
|
||
|
||
|
||
def db_connect():
|
||
return psycopg2.connect(
|
||
DB_URL, connect_timeout=15,
|
||
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5
|
||
)
|
||
|
||
|
||
def _restart_ssh_tunnel():
|
||
import subprocess
|
||
try:
|
||
subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5)
|
||
except Exception:
|
||
pass
|
||
time.sleep(2)
|
||
try:
|
||
subprocess.Popen(
|
||
['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120',
|
||
'-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no',
|
||
'-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'],
|
||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
||
)
|
||
print(f" [SSH] Tunnel restarted, waiting 5s...")
|
||
time.sleep(5)
|
||
except Exception as e:
|
||
print(f" [SSH] Failed: {e}")
|
||
|
||
|
||
def db_reconnect(conn):
|
||
try:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
for attempt in range(5):
|
||
try:
|
||
time.sleep(3)
|
||
c = db_connect()
|
||
print(f" [DB] Reconnected (attempt {attempt+1})")
|
||
return c, c.cursor()
|
||
except Exception as e:
|
||
print(f" [DB] Attempt {attempt+1} failed: {e}")
|
||
print(f" [DB] Restarting SSH tunnel...")
|
||
_restart_ssh_tunnel()
|
||
for attempt in range(10):
|
||
try:
|
||
time.sleep(5)
|
||
c = db_connect()
|
||
print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})")
|
||
return c, c.cursor()
|
||
except Exception as e:
|
||
print(f" [DB] Post-restart attempt {attempt+1} failed: {e}")
|
||
if attempt == 4:
|
||
_restart_ssh_tunnel()
|
||
raise Exception("DB reconnect failed")
|
||
|
||
|
||
def db_safe_execute(conn, cur, query, params=None):
|
||
try:
|
||
cur.execute(query, params)
|
||
return conn, cur
|
||
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
|
||
print(f" [DB] Lost on execute ({e}), reconnecting...")
|
||
conn, cur = db_reconnect(conn)
|
||
cur.execute(query, params)
|
||
return conn, cur
|
||
except psycopg2.Error as e:
|
||
try:
|
||
conn.rollback() # Clear aborted transaction state
|
||
except Exception:
|
||
pass
|
||
raise # Re-raise so caller can skip this record
|
||
|
||
|
||
def db_safe_commit(conn):
|
||
try:
|
||
conn.commit()
|
||
return conn
|
||
except (psycopg2.InterfaceError, psycopg2.OperationalError):
|
||
print(f" [DB] Lost on commit, reconnecting...")
|
||
conn, _ = db_reconnect(conn)
|
||
return conn
|
||
|
||
|
||
def is_cargo(ship_type_str):
|
||
"""Return True if vessel is cargo (green) or tanker (red) type."""
|
||
if not ship_type_str:
|
||
return False
|
||
st = ship_type_str.lower().strip()
|
||
for ct in CARGO_TYPES:
|
||
if ct in st or st in ct:
|
||
return True
|
||
return False
|
||
|
||
|
||
def totp_wait_for_fresh(min_remaining=8):
|
||
"""Wait until TOTP window has at least min_remaining seconds left, then return code.
|
||
Avoids entering a code that will expire before the server validates it."""
|
||
while True:
|
||
remaining = 30 - (int(time.time()) % 30)
|
||
if remaining >= min_remaining:
|
||
break
|
||
wait = remaining + 1
|
||
print(f" [TOTP] Window too close to expiry ({remaining}s left), waiting {wait}s...", flush=True)
|
||
time.sleep(wait)
|
||
code = totp(TOTP_SECRET)
|
||
remaining = 30 - (int(time.time()) % 30)
|
||
print(f" [TOTP] Code={code} | {remaining}s remaining in window", flush=True)
|
||
return code
|
||
|
||
|
||
async def do_login(page, max_retries=3):
|
||
for attempt in range(max_retries):
|
||
_update_heartbeat() # Reset watchdog for each login attempt (can take 60-120s)
|
||
print(f"LOGIN (attempt {attempt+1}/{max_retries})...")
|
||
# Navigate to login page — use asyncio.wait_for as outer timeout guard
|
||
try:
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/en/users/login',
|
||
wait_until='domcontentloaded', timeout=30000),
|
||
timeout=35.0
|
||
)
|
||
except asyncio.TimeoutError:
|
||
print(f" goto asyncio timeout (35s) — continuing with current page", flush=True)
|
||
except Exception as nav_err:
|
||
print(f" goto error: {nav_err}")
|
||
await asyncio.sleep(3)
|
||
_update_heartbeat() # Reset after navigation (wait_for_selector can be slow)
|
||
print(f" After goto: {page.url[:80]}")
|
||
|
||
# --- Step 1: Fill email and click Continue ---
|
||
try:
|
||
# Wait for username input to appear (the page may redirect to auth.kpler)
|
||
await page.wait_for_selector('input[name="username"]', timeout=20000)
|
||
await page.fill('input[name="username"]', EMAIL, timeout=15000)
|
||
print(f" Filled email", flush=True)
|
||
# JS click to avoid blocking on navigation
|
||
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
|
||
print(f" Clicked Continue (JS)", flush=True)
|
||
# Wait for URL to change to password page
|
||
try:
|
||
await page.wait_for_url('**/login/password**', timeout=20000)
|
||
print(f" Password page loaded", flush=True)
|
||
except Exception:
|
||
await asyncio.sleep(5)
|
||
print(f" URL after Continue: {page.url[:80]}", flush=True)
|
||
except Exception as fill_err:
|
||
print(f" email/continue step error: {fill_err}", flush=True)
|
||
await asyncio.sleep(5)
|
||
|
||
# --- Step 2: Fill password ---
|
||
try:
|
||
await page.wait_for_selector('input[name="password"]', timeout=20000)
|
||
print(f" Filling password... URL={page.url[:60]}", flush=True)
|
||
await page.fill('input[name="password"]', PASSWORD, timeout=15000)
|
||
# JS click to avoid Playwright blocking on navigation after password submit
|
||
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
|
||
print(f" Clicked password submit (JS)", flush=True)
|
||
except Exception as pass_err:
|
||
print(f" password step error: {pass_err}", flush=True)
|
||
|
||
# Wait for navigation to MFA options page
|
||
try:
|
||
await page.wait_for_url('**/mfa**', timeout=25000)
|
||
print(f" MFA page loaded: {page.url[:60]}", flush=True)
|
||
except Exception:
|
||
await asyncio.sleep(5)
|
||
|
||
url = page.url
|
||
print(f" After password: {url[:80]}")
|
||
|
||
# --- Step 3: MFA flow ---
|
||
if 'auth.kpler' in url:
|
||
# If on mfa-login-options page, click Google Authenticator button
|
||
if 'mfa-login-options' in url:
|
||
print(" Clicking Google Authenticator option...", flush=True)
|
||
try:
|
||
# JS click to avoid navigation blocking
|
||
clicked = await page.evaluate("""
|
||
() => {
|
||
const selectors = [
|
||
'button[value*="otp"]',
|
||
'button[aria-label*="Authenticator"]',
|
||
'button[aria-label*="authenticator"]',
|
||
];
|
||
for (const sel of selectors) {
|
||
const b = document.querySelector(sel);
|
||
if (b) { b.click(); return sel; }
|
||
}
|
||
// Fallback: find by text
|
||
for (const b of document.querySelectorAll('button')) {
|
||
if (b.textContent.includes('Authenticator') || b.textContent.includes('authenticator')) {
|
||
b.click(); return 'text:' + b.textContent.trim().slice(0,30);
|
||
}
|
||
}
|
||
return null;
|
||
}
|
||
""")
|
||
print(f" GA JS click: selector={clicked}", flush=True)
|
||
# Wait for navigation to OTP challenge page
|
||
await page.wait_for_url('**/mfa-otp-challenge**', timeout=20000)
|
||
print(f" GA clicked, OTP page loaded", flush=True)
|
||
except Exception as ex:
|
||
print(f" GA click error: {ex}", flush=True)
|
||
await asyncio.sleep(5)
|
||
print(f" After GA click: {page.url[:80]}", flush=True)
|
||
|
||
# Fill TOTP code (with timing check to avoid expiry during validation)
|
||
if 'mfa-otp-challenge' in page.url or 'auth.kpler' in page.url:
|
||
_update_heartbeat() # Reset watchdog — totp_wait_for_fresh can wait up to 30s
|
||
otp = totp_wait_for_fresh(min_remaining=8)
|
||
try:
|
||
await page.wait_for_selector('input[name="code"]', timeout=15000)
|
||
await page.fill('input[name="code"]', otp, timeout=15000)
|
||
print(f" OTP filled, clicking submit via JS...", flush=True)
|
||
# Use JS click to avoid Playwright navigation wait blocking
|
||
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
|
||
except Exception as ex:
|
||
print(f" 2FA fill/click error: {ex}", flush=True)
|
||
_update_heartbeat() # Reset watchdog before wait_for_url (25s wait)
|
||
# Wait for post-OTP navigation (MT home page or similar)
|
||
try:
|
||
await page.wait_for_url('**marinetraffic.com**', timeout=25000)
|
||
print(f" OTP submitted, redirected: {page.url[:60]}", flush=True)
|
||
except Exception:
|
||
await asyncio.sleep(15)
|
||
print(f" After OTP submit: {page.url[:80]}", flush=True)
|
||
_update_heartbeat() # Reset after redirect
|
||
|
||
ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
|
||
if ok:
|
||
print(f" Login: OK | {page.url[:60]}")
|
||
return True
|
||
print(f" Login failed | {page.url[:80]}")
|
||
if attempt < max_retries - 1:
|
||
# Clear auth cookies and start fresh for next attempt
|
||
try:
|
||
await page.context.clear_cookies()
|
||
print(f" Cleared cookies for retry", flush=True)
|
||
except Exception:
|
||
pass
|
||
await asyncio.sleep(10)
|
||
return False
|
||
|
||
|
||
async def fetch_page(page, api_base_url, page_num, timeout_ms=30000):
|
||
"""Fetch one page of vessel data via page.evaluate(fetch()) with session cookies."""
|
||
# Build URL: replace existing page param or append it
|
||
if 'page=' in api_base_url:
|
||
url = re.sub(r'page=\d+', f'page={page_num}', api_base_url)
|
||
else:
|
||
url = api_base_url + f'&page={page_num}'
|
||
|
||
js = f"""
|
||
async () => {{
|
||
const controller = new AbortController();
|
||
const tid = setTimeout(() => controller.abort(), {timeout_ms});
|
||
try {{
|
||
const r = await fetch({json.dumps(url)}, {{
|
||
signal: controller.signal,
|
||
credentials: 'include',
|
||
cache: 'no-store',
|
||
headers: {{
|
||
'X-Requested-With': 'XMLHttpRequest',
|
||
'Accept': 'application/json',
|
||
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
||
}}
|
||
}});
|
||
clearTimeout(tid);
|
||
if (r.status !== 200) return {{error: 'HTTP ' + r.status, status: r.status}};
|
||
const text = await r.text();
|
||
return {{status: r.status, body: text}};
|
||
}} catch(e) {{
|
||
clearTimeout(tid);
|
||
if (e.name === 'AbortError') return {{error: 'JS_TIMEOUT', status: -1}};
|
||
return {{error: e.message}};
|
||
}}
|
||
}}
|
||
"""
|
||
try:
|
||
# asyncio.wait_for() with 50s timeout.
|
||
# If it fires, the browser is completely frozen (no JS response possible).
|
||
# After this timeout we MUST do a full browser restart — we cannot use this
|
||
# browser context anymore (cancelling corrupts Playwright protocol state).
|
||
# The -3 return value signals BrowserRestartNeeded in the main loop.
|
||
_update_heartbeat() # Reset watchdog timer before potentially-blocking call
|
||
result = await asyncio.wait_for(page.evaluate(js), timeout=50.0)
|
||
_update_heartbeat() # Update after evaluate returns
|
||
if result.get('error'):
|
||
err_msg = result.get('error', '')
|
||
err_status = result.get('status', 0)
|
||
if err_msg == 'JS_TIMEOUT' or err_status == -1:
|
||
print(f" fetch JS_TIMEOUT page {page_num} (aborted after {timeout_ms}ms)", flush=True)
|
||
return None, -1 # timeout signal → page reload
|
||
print(f" fetch error page {page_num}: {err_msg} (HTTP {err_status})", flush=True)
|
||
return None, err_status
|
||
body = result.get('body', '')
|
||
data = json.loads(body)
|
||
total = data.get('totalCount', 0)
|
||
rows = data.get('data', [])
|
||
return rows, total
|
||
except asyncio.TimeoutError:
|
||
print(f" fetch HARD TIMEOUT page {page_num} (50s) — browser frozen, must restart", flush=True)
|
||
return None, -3 # -3 signals browser dead → must restart entire browser
|
||
except Exception as e:
|
||
err_str = str(e)
|
||
print(f" fetch error page {page_num}: {err_str[:120]}", flush=True)
|
||
if 'Target crashed' in err_str or 'target crashed' in err_str.lower():
|
||
return None, -2 # -2 signals tab crash → create new tab
|
||
if 'closed' in err_str.lower() or 'TargetClosed' in err_str:
|
||
return None, -2 # treat as tab crash
|
||
return None, 0
|
||
|
||
|
||
def parse_row(row):
|
||
"""Extract vessel data from one API row dict."""
|
||
if not isinstance(row, dict):
|
||
return None
|
||
v = {}
|
||
for k in ['SHIP_ID', 'SHIPID']:
|
||
if k in row and row[k]:
|
||
v['ship_id'] = str(row[k])
|
||
break
|
||
for k in ['SHIPNAME', 'NAME']:
|
||
if k in row and row[k]:
|
||
v['name'] = str(row[k]).strip()
|
||
break
|
||
for k in ['MMSI', 'mmsi']:
|
||
val = row.get(k)
|
||
if val and str(val) not in ('0', '', 'None'):
|
||
v['mmsi'] = str(val)
|
||
break
|
||
for k in ['IMO', 'imo']:
|
||
val = row.get(k)
|
||
if val and str(val) not in ('0', '', 'None'):
|
||
v['imo'] = str(val)
|
||
break
|
||
for k in ['FLAG', 'flag']:
|
||
if k in row and row[k]:
|
||
v['flag'] = str(row[k])
|
||
break
|
||
for k in ['TYPE_SUMMARY', 'SHIP_TYPE', 'SHIPTYPE']:
|
||
if k in row and row[k]:
|
||
v['ship_type'] = str(row[k])
|
||
break
|
||
for k in ['DWT', 'dwt']:
|
||
if k in row and row[k]:
|
||
try:
|
||
v['dwt'] = int(float(str(row[k]).replace(',', '')))
|
||
except Exception:
|
||
pass
|
||
break
|
||
for k in ['YEAR_BUILT', 'year_built']:
|
||
if k in row and row[k]:
|
||
try:
|
||
v['year_built'] = int(row[k])
|
||
except Exception:
|
||
pass
|
||
break
|
||
for k in ['LAT_OF_LATEST_POSITION', 'LAT', 'lat']:
|
||
if k in row and row[k]:
|
||
try:
|
||
v['lat'] = float(row[k])
|
||
except Exception:
|
||
pass
|
||
break
|
||
for k in ['LON_OF_LATEST_POSITION', 'LON', 'lon']:
|
||
if k in row and row[k]:
|
||
try:
|
||
v['lon'] = float(row[k])
|
||
except Exception:
|
||
pass
|
||
break
|
||
for k in ['BENEFICIAL_OWNER', 'MANAGER_OWNER']:
|
||
if k in row and row[k]:
|
||
v['owner'] = str(row[k])
|
||
break
|
||
for k in ['REGISTERED_OWNER']:
|
||
if k in row and row[k]:
|
||
v['registered_owner'] = str(row[k])
|
||
break
|
||
for k in ['OPERATOR']:
|
||
if k in row and row[k]:
|
||
v['operator'] = str(row[k])
|
||
break
|
||
for k in ['COMMERCIAL_MANAGER', 'commercial_manager']:
|
||
if k in row and row[k]:
|
||
v['commercial_manager'] = str(row[k])
|
||
break
|
||
if not v.get('ship_id') and not v.get('mmsi') and not v.get('name'):
|
||
return None
|
||
return v
|
||
|
||
|
||
async def main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--probe', action='store_true', help='Test: first 5 pages only')
|
||
parser.add_argument('--start', type=int, default=1, help='Start from page N (resume)')
|
||
parser.add_argument('--max-pages', type=int, default=0, help='Max pages (0=unlimited)')
|
||
parser.add_argument('--delay', type=float, default=PAGE_DELAY)
|
||
args = parser.parse_args()
|
||
|
||
if args.probe:
|
||
args.max_pages = 5
|
||
|
||
# --- DB ---
|
||
try:
|
||
conn = db_connect()
|
||
cur = conn.cursor()
|
||
cur.execute('SELECT count(*) FROM mt_bulk_staging')
|
||
existing_count = cur.fetchone()[0]
|
||
cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL')
|
||
existing_ids = set(str(r[0]) for r in cur.fetchall())
|
||
print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids")
|
||
except Exception as e:
|
||
print(f"DB ERROR: {e}")
|
||
print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
|
||
return
|
||
|
||
from playwright.async_api import async_playwright
|
||
|
||
async with async_playwright() as p:
|
||
browser = await p.chromium.launch(
|
||
headless=False, # Must be False: CF TLS fingerprint check + MT session cookies
|
||
args=[
|
||
'--no-sandbox',
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--disable-dev-shm-usage',
|
||
]
|
||
)
|
||
ctx = await browser.new_context(
|
||
viewport={'width': 800, 'height': 600}, # Smaller viewport = less GPU memory
|
||
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
|
||
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
||
)
|
||
page = await ctx.new_page()
|
||
# NOTE: Do NOT set default_timeout here — it would break login flow.
|
||
# Crash/hang recovery is handled via JS AbortController in fetch_page()
|
||
# and the -2 crash signal → recover_page() in the main loop.
|
||
|
||
COOKIE_FILE = 'mt_session_cookies.json'
|
||
|
||
# Try loading saved cookies first (fast path: no login needed)
|
||
cookie_loaded = False
|
||
if args.start > 1 and os.path.exists(COOKIE_FILE):
|
||
try:
|
||
with open(COOKIE_FILE) as f:
|
||
saved_cookies = json.load(f)
|
||
await ctx.add_cookies(saved_cookies)
|
||
print(f"Loaded {len(saved_cookies)} cookies from {COOKIE_FILE}")
|
||
# Navigate to /data/ with 'commit' to activate session token server-side.
|
||
# Then navigate to /ais/home (lightweight, same-origin) to stay in valid context.
|
||
try:
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
||
wait_until='commit', timeout=30000),
|
||
timeout=35.0
|
||
)
|
||
except Exception:
|
||
pass
|
||
await asyncio.sleep(2)
|
||
try:
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/en/ais/home',
|
||
wait_until='commit', timeout=20000),
|
||
timeout=25.0
|
||
)
|
||
except Exception:
|
||
pass
|
||
await asyncio.sleep(2)
|
||
# Quick API test to verify session validity from same-origin context
|
||
test_url = ('https://www.marinetraffic.com/en/reports/?asset_type=vessels'
|
||
'&columns=shipname&per_page=1&page=1')
|
||
test_rows, test_total = await fetch_page(page, test_url, 1)
|
||
if test_rows is not None:
|
||
print(f"Cookie session valid! Skipping login.")
|
||
cookie_loaded = True
|
||
else:
|
||
print(f"Cookie session invalid (status {test_total}). Will re-login.")
|
||
await ctx.clear_cookies()
|
||
except Exception as ck_err:
|
||
print(f"Cookie load failed: {ck_err}. Will login fresh.")
|
||
|
||
# --- Login (skip if cookies loaded successfully) ---
|
||
if not cookie_loaded:
|
||
if not await do_login(page):
|
||
print("LOGIN FAILED!")
|
||
await browser.close()
|
||
conn.close()
|
||
return
|
||
# Wait for OAuth auto-redirect to complete (MT redirects oauth/callback → /ais/home)
|
||
# This prevents the /data/ navigation from conflicting with the pending redirect.
|
||
try:
|
||
await page.wait_for_url('**/ais/home**', timeout=15000)
|
||
print(f" OAuth redirect complete: {page.url[:60]}", flush=True)
|
||
except Exception:
|
||
await asyncio.sleep(5) # Wait anyway even if URL match fails
|
||
print(f" After redirect wait: {page.url[:60]}", flush=True)
|
||
# Save cookies for fast restart
|
||
try:
|
||
cookies = await ctx.cookies()
|
||
with open(COOKIE_FILE, 'w') as f:
|
||
json.dump(cookies, f)
|
||
print(f"Saved {len(cookies)} session cookies to {COOKIE_FILE}")
|
||
except Exception as ck_err:
|
||
print(f"Cookie save failed (non-fatal): {ck_err}")
|
||
|
||
# After OAuth redirect we're already on marinetraffic.com with valid session.
|
||
# The reports API fetch() calls work when the browser is on /ais/home.
|
||
# In headless=True mode, /ais/home's WebSocket map does NOT consume significant
|
||
# resources (no GPU/canvas rendering), so it's safe to stay here.
|
||
# DO NOT navigate to /en/reports/... directly — that causes HTTP 403 on subsequent
|
||
# fetch() calls (MT server sees it as a direct API hit, not a browser XHR).
|
||
# Must visit /data/ before /reports/ API — MT sets a required session token
|
||
# (e.g. 'mt-data-access') when the /data/ page loads.
|
||
# Must visit /data/ to set the MT session token for the reports API.
|
||
# Without this visit, all /en/reports/ requests return 403.
|
||
# We navigate with domcontentloaded and wait briefly, then STAY on /data/.
|
||
# CDP garbage collection every 15 pages prevents V8 OOM crashes.
|
||
_update_heartbeat() # Reset watchdog before /data/ navigation (can take 30s+)
|
||
print("\nNavigating to /data/ (required to unlock reports API)...")
|
||
try:
|
||
# Use asyncio.wait_for() as a fallback in case Playwright's internal
|
||
# timeout doesn't fire (known issue when browser is under load).
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
||
wait_until='commit', timeout=30000),
|
||
timeout=35.0
|
||
)
|
||
print(f" /data/ commit done", flush=True)
|
||
_update_heartbeat() # Reset after commit
|
||
except asyncio.TimeoutError:
|
||
print(f" /data/ nav asyncio timeout (35s) — continuing anyway", flush=True)
|
||
except Exception as nav_err:
|
||
print(f" /data/ nav error (non-fatal): {nav_err}", flush=True)
|
||
# Save cookies immediately after /data/ commit (before SPA JS executes and crashes renderer)
|
||
try:
|
||
cookies = await ctx.cookies()
|
||
with open(COOKIE_FILE, 'w') as f:
|
||
json.dump(cookies, f)
|
||
print(f"Saved {len(cookies)} cookies (after /data/ visit)")
|
||
except Exception:
|
||
pass
|
||
# Navigate to MT robots.txt — a plain text file, no JavaScript, no SPA,
|
||
# no crash risk. Same origin as marinetraffic.com so fetch() works without CORS.
|
||
try:
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/robots.txt',
|
||
wait_until='domcontentloaded', timeout=15000),
|
||
timeout=18.0
|
||
)
|
||
print(f" Navigated to robots.txt (lightweight static page)", flush=True)
|
||
except Exception as nav_err:
|
||
print(f" robots.txt nav error: {nav_err} — continuing anyway", flush=True)
|
||
await asyncio.sleep(1)
|
||
try:
|
||
print(f" Page URL: {page.url[:80]}")
|
||
except Exception:
|
||
print(f" Page URL: (unavailable)")
|
||
|
||
# Hardcoded API base URL — confirmed working format
|
||
base_url = 'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
|
||
api_base = f'{base_url}&columns={API_COLS}&per_page={PER_PAGE}'
|
||
print(f" API base: {api_base[:120]}")
|
||
|
||
# --- Test page 1 (with retry on 403 — session might need extra time) ---
|
||
# MT reports API sometimes requires prior visit to /data/ page to unlock.
|
||
print(f"\nTesting API (page 1)...")
|
||
rows1, total1 = None, 0
|
||
data_page_visited = False
|
||
for api_attempt in range(6):
|
||
rows1, total1 = await fetch_page(page, api_base, 1)
|
||
if rows1 is not None:
|
||
break
|
||
if total1 == 403:
|
||
if api_attempt == 2 and not data_page_visited:
|
||
# After 2 failed attempts, revisit /data/ with 'commit' to unlock session
|
||
print(f" API 403 x{api_attempt+1} — re-visiting /data/ (commit) to unlock...")
|
||
try:
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
||
wait_until='commit', timeout=30000),
|
||
timeout=35.0
|
||
)
|
||
await asyncio.sleep(3)
|
||
await asyncio.wait_for(
|
||
page.goto('https://www.marinetraffic.com/robots.txt',
|
||
wait_until='domcontentloaded', timeout=15000),
|
||
timeout=18.0
|
||
)
|
||
print(f" /data/ commit + back to robots.txt", flush=True)
|
||
data_page_visited = True
|
||
except Exception as data_err:
|
||
print(f" /data/ nav error: {data_err}")
|
||
else:
|
||
print(f" API 403 — waiting 15s for session to establish (attempt {api_attempt+1}/6)...")
|
||
await asyncio.sleep(15)
|
||
else:
|
||
print(f" API error (status {total1}) — waiting 5s...")
|
||
await asyncio.sleep(5)
|
||
|
||
if rows1 is None:
|
||
print("ERROR: API calls failing after retries. Exiting.")
|
||
await browser.close()
|
||
conn.close()
|
||
return
|
||
|
||
total_in_mt = total1
|
||
est_pages = (total_in_mt + len(rows1) - 1) // max(len(rows1), 1) if rows1 else 0
|
||
print(f" Page 1: {len(rows1)} rows | totalCount={total_in_mt} | est_pages={est_pages}")
|
||
|
||
# Sample row keys
|
||
if rows1:
|
||
print(f" Row keys: {list(rows1[0].keys())[:15]}")
|
||
v = parse_row(rows1[0])
|
||
print(f" Sample: {v}")
|
||
|
||
# --- Main pagination loop ---
|
||
total_saved = 0
|
||
total_cargo = 0
|
||
total_skipped = 0
|
||
batch_count = 0
|
||
t0 = time.time()
|
||
|
||
start_page = args.start
|
||
max_pages = args.max_pages if args.max_pages else est_pages + 10
|
||
consecutive_errors = 0
|
||
checkpoint_file = 'mt_green_fleet_progress.json'
|
||
|
||
# Load checkpoint for resume
|
||
if args.start > 1 and os.path.exists(checkpoint_file):
|
||
with open(checkpoint_file) as f:
|
||
cp = json.load(f)
|
||
total_saved = cp.get('total_saved', 0)
|
||
total_cargo = cp.get('total_cargo', 0)
|
||
print(f" Resuming from page {start_page} | saved={total_saved}")
|
||
|
||
# If starting from page 1, the test already fetched page 1.
|
||
# Process rows1 directly and start the loop from page 2 to avoid double-fetch
|
||
# (MT rate-limits duplicate requests immediately and hangs the connection).
|
||
if args.start == 1 and rows1:
|
||
print(f" Processing test page 1 results directly (no double-fetch)...", flush=True)
|
||
for row in rows1:
|
||
v = parse_row(row)
|
||
if not v:
|
||
continue
|
||
ship_type = v.get('ship_type', '')
|
||
if not is_cargo(ship_type):
|
||
total_skipped += 1
|
||
continue
|
||
total_cargo += 1
|
||
sid = v.get('ship_id')
|
||
if not sid:
|
||
continue
|
||
is_new = sid not in existing_ids
|
||
existing_ids.add(sid)
|
||
st_lower = ship_type.lower()
|
||
if 'bulk' in st_lower: gt, cat = '6', 'bulk'
|
||
elif 'container' in st_lower: gt, cat = '11', 'container'
|
||
elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower: gt, cat = '12', 'roro'
|
||
else: gt, cat = '9', 'general'
|
||
try:
|
||
conn, cur = db_safe_execute(conn, cur, """
|
||
INSERT INTO mt_bulk_staging
|
||
(ship_id, name, flag, dwt, gt_shiptype, type_category,
|
||
lat, lon, mmsi, imo, owner, registered_owner, operator,
|
||
commercial_manager, scraped_at)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
|
||
ON CONFLICT (ship_id) DO UPDATE SET
|
||
name=COALESCE(EXCLUDED.name,mt_bulk_staging.name),
|
||
mmsi=COALESCE(EXCLUDED.mmsi,mt_bulk_staging.mmsi),
|
||
imo=COALESCE(EXCLUDED.imo,mt_bulk_staging.imo),
|
||
owner=COALESCE(EXCLUDED.owner,mt_bulk_staging.owner),
|
||
registered_owner=COALESCE(EXCLUDED.registered_owner,mt_bulk_staging.registered_owner),
|
||
operator=COALESCE(EXCLUDED.operator,mt_bulk_staging.operator),
|
||
commercial_manager=COALESCE(EXCLUDED.commercial_manager,mt_bulk_staging.commercial_manager),
|
||
lat=COALESCE(EXCLUDED.lat,mt_bulk_staging.lat),
|
||
lon=COALESCE(EXCLUDED.lon,mt_bulk_staging.lon),
|
||
scraped_at=NOW()
|
||
""", (sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat,
|
||
v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'),
|
||
v.get('owner'), v.get('registered_owner'), v.get('operator'),
|
||
v.get('commercial_manager'),))
|
||
if is_new: total_saved += 1
|
||
batch_count += 1
|
||
except Exception as e:
|
||
print(f" DB error (test-page1): {str(e)[:80]}")
|
||
# Skip page 1 in the loop since we already processed it
|
||
start_page = 2
|
||
|
||
print(f"\n{'='*60}")
|
||
print(f"STARTING SCRAPE: pages {start_page} to {min(max_pages, est_pages)}")
|
||
print(f"Delay: {args.delay}s/page | Batch: {BATCH_SIZE}")
|
||
print(f"{'='*60}\n")
|
||
|
||
# Reload interval: DISABLED — reloading the heavy MT SPA page crashes the browser.
|
||
# Session cookies are maintained in the browser context, so no reload needed.
|
||
# Recovery via recover_page() only when a crash is actually detected.
|
||
PAGE_RELOAD_INTERVAL = 999999
|
||
|
||
async def recover_page(reason="crash"):
|
||
"""Close crashed page and open a fresh one, re-establish /data/ session context.
|
||
If the browser context itself is dead, raises BrowserRestartNeeded so the outer
|
||
loop can restart the whole browser from the checkpoint."""
|
||
nonlocal page
|
||
print(f" [Recovery] Creating new tab ({reason})...", flush=True)
|
||
try:
|
||
await page.close()
|
||
except Exception:
|
||
pass
|
||
await asyncio.sleep(5)
|
||
try:
|
||
page = await ctx.new_page()
|
||
except Exception as ctx_err:
|
||
# Browser context is dead — escalate to full browser restart
|
||
err_s = str(ctx_err)
|
||
print(f" [Recovery] BrowserContext dead ({err_s[:80]}), escalating to browser restart...", flush=True)
|
||
with open(checkpoint_file, 'w') as f:
|
||
json.dump({'page': page_num, 'total_saved': total_saved,
|
||
'total_cargo': total_cargo, 'total_skipped': total_skipped,
|
||
'elapsed': time.time() - t0}, f)
|
||
try:
|
||
await browser.close()
|
||
except Exception:
|
||
pass
|
||
raise BrowserRestartNeeded(page_num)
|
||
try:
|
||
# Navigate to robots.txt — static text, no SPA, same origin as MT.
|
||
# fetch() calls to the reports API work from this page without CORS issues.
|
||
await page.goto('https://www.marinetraffic.com/robots.txt',
|
||
wait_until='domcontentloaded', timeout=15000)
|
||
await asyncio.sleep(1)
|
||
print(f" [Recovery] New tab ready at robots.txt", flush=True)
|
||
return True
|
||
except Exception as re_err:
|
||
print(f" [Recovery] New tab goto failed: {re_err}", flush=True)
|
||
return False
|
||
|
||
# V8 heap GC every N fetches via CDP — no page navigation needed.
|
||
PAGE_REFRESH_EVERY = 15 # run CDP GC every 15 API fetches
|
||
|
||
for page_num in range(start_page, max_pages + 1):
|
||
# Periodic CDP GC to clear V8 heap and prevent Chromium OOM crash.
|
||
# Does NOT navigate away — no CORS issues, no SPA reload.
|
||
fetch_count = page_num - start_page
|
||
if fetch_count > 0 and fetch_count % PAGE_REFRESH_EVERY == 0:
|
||
print(f" [HeapFlush] Requesting V8 GC at page {page_num}...", flush=True)
|
||
try:
|
||
# Request garbage collection via Playwright CDP (DevTools Protocol).
|
||
# This frees accumulated V8 heap WITHOUT navigating away from the page.
|
||
# No tab crash, no CORS issues, no SPA reload needed.
|
||
cdp = await page.context.new_cdp_session(page)
|
||
await cdp.send('HeapProfiler.collectGarbage')
|
||
await cdp.detach()
|
||
print(f" [HeapFlush] GC done", flush=True)
|
||
except Exception as flush_err:
|
||
print(f" [HeapFlush] GC error: {flush_err} — continuing anyway", flush=True)
|
||
|
||
# Fetch page
|
||
rows, total = await fetch_page(page, api_base, page_num)
|
||
|
||
if rows is None:
|
||
consecutive_errors += 1
|
||
is_browser_dead = total == -3 # hard timeout → browser must be restarted
|
||
is_crash = total == -2 # tab crash → new tab
|
||
is_timeout = total == -1 # JS timeout → new tab
|
||
need_new_tab = is_timeout or is_crash
|
||
if is_browser_dead:
|
||
# Save checkpoint and raise so outer loop restarts browser
|
||
print(f" Page {page_num}: BROWSER DEAD — saving checkpoint and restarting browser...", flush=True)
|
||
with open(checkpoint_file, 'w') as f:
|
||
json.dump({'page': page_num, 'total_saved': total_saved,
|
||
'total_cargo': total_cargo, 'total_skipped': total_skipped,
|
||
'elapsed': time.time() - t0}, f)
|
||
try:
|
||
await browser.close()
|
||
except Exception:
|
||
pass
|
||
raise BrowserRestartNeeded(page_num)
|
||
if need_new_tab:
|
||
reason = 'CRASH' if is_crash else 'TIMEOUT'
|
||
print(f" Page {page_num}: {reason} — creating new tab...", flush=True)
|
||
ok = await recover_page("tab_crash" if is_crash else "timeout")
|
||
if not ok:
|
||
# Recovery failed immediately — context dead, escalate to browser restart
|
||
print(f" Immediate recovery failed — restarting browser from page {page_num}", flush=True)
|
||
with open(checkpoint_file, 'w') as f:
|
||
json.dump({'page': page_num, 'total_saved': total_saved,
|
||
'total_cargo': total_cargo, 'total_skipped': total_skipped,
|
||
'elapsed': time.time() - t0}, f)
|
||
try:
|
||
await browser.close()
|
||
except Exception:
|
||
pass
|
||
raise BrowserRestartNeeded(page_num)
|
||
consecutive_errors = max(consecutive_errors - 1, 0)
|
||
continue
|
||
print(f" Page {page_num}: FAILED (error #{consecutive_errors})", flush=True)
|
||
if consecutive_errors >= 3:
|
||
# Create fresh tab after 3 consecutive errors
|
||
print(f" {consecutive_errors} consecutive errors — creating fresh tab...", flush=True)
|
||
ok = await recover_page(f"errors_{consecutive_errors}")
|
||
if ok:
|
||
consecutive_errors = 0
|
||
else:
|
||
# Recovery failed — escalate to full browser restart
|
||
print(f" Recovery failed — restarting browser from page {page_num}", flush=True)
|
||
with open(checkpoint_file, 'w') as f:
|
||
json.dump({'page': page_num, 'total_saved': total_saved,
|
||
'total_cargo': total_cargo, 'total_skipped': total_skipped,
|
||
'elapsed': time.time() - t0}, f)
|
||
try:
|
||
await browser.close()
|
||
except Exception:
|
||
pass
|
||
raise BrowserRestartNeeded(page_num)
|
||
continue
|
||
await asyncio.sleep(3)
|
||
continue
|
||
|
||
consecutive_errors = 0
|
||
|
||
if not rows:
|
||
print(f" Page {page_num}: empty — reached end")
|
||
break
|
||
|
||
# Parse and filter
|
||
page_cargo = 0
|
||
for row in rows:
|
||
v = parse_row(row)
|
||
if not v:
|
||
continue
|
||
|
||
# Filter: keep only cargo types
|
||
ship_type = v.get('ship_type', '')
|
||
if not is_cargo(ship_type):
|
||
total_skipped += 1
|
||
continue
|
||
|
||
total_cargo += 1
|
||
page_cargo += 1
|
||
|
||
sid = v.get('ship_id')
|
||
if not sid:
|
||
continue
|
||
|
||
is_new = sid not in existing_ids
|
||
existing_ids.add(sid)
|
||
|
||
# Map gt_shiptype code
|
||
st_lower = ship_type.lower()
|
||
if 'bulk' in st_lower:
|
||
gt = '6'
|
||
cat = 'bulk'
|
||
elif 'container' in st_lower:
|
||
gt = '11'
|
||
cat = 'container'
|
||
elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower:
|
||
gt = '12'
|
||
cat = 'roro'
|
||
else:
|
||
gt = '9'
|
||
cat = 'general'
|
||
|
||
try:
|
||
conn, cur = db_safe_execute(conn, cur, """
|
||
INSERT INTO mt_bulk_staging
|
||
(ship_id, name, flag, dwt, gt_shiptype, type_category,
|
||
lat, lon, mmsi, imo, owner, registered_owner, operator,
|
||
commercial_manager, scraped_at)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
|
||
ON CONFLICT (ship_id) DO UPDATE SET
|
||
name = COALESCE(EXCLUDED.name, mt_bulk_staging.name),
|
||
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
|
||
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
|
||
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
|
||
registered_owner = COALESCE(EXCLUDED.registered_owner, mt_bulk_staging.registered_owner),
|
||
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
|
||
commercial_manager = COALESCE(EXCLUDED.commercial_manager, mt_bulk_staging.commercial_manager),
|
||
lat = COALESCE(EXCLUDED.lat, mt_bulk_staging.lat),
|
||
lon = COALESCE(EXCLUDED.lon, mt_bulk_staging.lon),
|
||
scraped_at = NOW()
|
||
""", (
|
||
sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat,
|
||
v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'),
|
||
v.get('owner'), v.get('registered_owner'), v.get('operator'),
|
||
v.get('commercial_manager'),
|
||
))
|
||
if is_new:
|
||
total_saved += 1
|
||
batch_count += 1
|
||
except Exception as e:
|
||
print(f" DB error ({type(e).__name__}): {str(e)[:80]}")
|
||
|
||
# Commit batch
|
||
if batch_count >= BATCH_SIZE:
|
||
conn = db_safe_commit(conn)
|
||
cur = conn.cursor()
|
||
batch_count = 0
|
||
|
||
# Progress
|
||
elapsed = time.time() - t0
|
||
rate = (page_num - start_page + 1) / elapsed if elapsed > 0 else 0
|
||
remaining_pages = (est_pages - page_num)
|
||
eta_min = remaining_pages / rate / 60 if rate > 0 else 0
|
||
|
||
_update_heartbeat() # Reset watchdog on each page processed
|
||
if page_num <= 20 or page_num % 10 == 0:
|
||
print(f" Page {page_num}/{est_pages} | "
|
||
f"+{page_cargo} cargo | "
|
||
f"total_new={total_saved} | "
|
||
f"skipped={total_skipped} | "
|
||
f"{elapsed:.0f}s | ETA: {eta_min:.0f}m", flush=True)
|
||
|
||
# Save checkpoint every 5 pages so restarts (watchdog or browser crash) resume close to current position
|
||
if page_num % 5 == 0:
|
||
with open(checkpoint_file, 'w') as f:
|
||
json.dump({
|
||
'page': page_num,
|
||
'total_saved': total_saved,
|
||
'total_cargo': total_cargo,
|
||
'total_skipped': total_skipped,
|
||
'elapsed': elapsed,
|
||
}, f)
|
||
|
||
await asyncio.sleep(args.delay)
|
||
|
||
# Final commit
|
||
conn = db_safe_commit(conn)
|
||
cur = conn.cursor()
|
||
|
||
# Final stats
|
||
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging")
|
||
t, m, o = cur.fetchone()
|
||
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6'")
|
||
tb, mb, ob = cur.fetchone()
|
||
|
||
elapsed = time.time() - t0
|
||
print(f"\n{'='*60}")
|
||
print(f"SCRAPE COMPLETE — {elapsed/60:.1f} minutes")
|
||
print(f" Pages done: {page_num - start_page + 1}")
|
||
print(f" New vessels: {total_saved}")
|
||
print(f" Cargo found: {total_cargo}")
|
||
print(f" Non-cargo skipped: {total_skipped}")
|
||
print(f"\nGLOBAL: total={t} | mmsi={m} ({m*100//max(t,1)}%) | owner={o} ({o*100//max(t,1)}%)")
|
||
print(f"BULK: total={tb} | mmsi={mb} ({mb*100//max(tb,1)}%) | owner={ob} ({ob*100//max(tb,1)}%)")
|
||
print(f"{'='*60}")
|
||
|
||
conn.close()
|
||
await browser.close()
|
||
|
||
|
||
import traceback
|
||
|
||
# Outer restart loop — handles BrowserRestartNeeded by restarting from checkpoint
|
||
restart_count = 0
|
||
max_restarts = 200 # allow up to 200 browser restarts (plenty for 5752 pages)
|
||
|
||
while True:
|
||
try:
|
||
asyncio.run(main())
|
||
break # Normal completion
|
||
except BrowserRestartNeeded as br:
|
||
restart_count += 1
|
||
print(f"\n[RESTART #{restart_count}] Browser dead at page {br.resume_page}. Restarting from checkpoint...", flush=True)
|
||
if restart_count >= max_restarts:
|
||
print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True)
|
||
sys.exit(1)
|
||
# Patch sys.argv to resume from the checkpoint page
|
||
if '--start' in sys.argv:
|
||
idx = sys.argv.index('--start')
|
||
sys.argv[idx+1] = str(br.resume_page)
|
||
else:
|
||
sys.argv.extend(['--start', str(br.resume_page)])
|
||
time.sleep(10) # Brief pause before restarting browser
|
||
continue
|
||
except Exception as top_err:
|
||
err_s = str(top_err)
|
||
# TargetClosedError / context closed → try browser restart from checkpoint
|
||
if 'TargetClosed' in type(top_err).__name__ or 'Target page, context or browser has been closed' in err_s or 'browser has been closed' in err_s:
|
||
restart_count += 1
|
||
print(f"\n[RESTART #{restart_count}] Browser closed unexpectedly: {err_s[:120]}", flush=True)
|
||
if restart_count >= max_restarts:
|
||
print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True)
|
||
sys.exit(1)
|
||
# Try to resume from checkpoint if it exists
|
||
try:
|
||
import json as _json
|
||
import glob as _glob
|
||
# Search for the progress checkpoint file used by main()
|
||
_chk_files = sorted(
|
||
_glob.glob('mt_green_fleet_progress.json') + _glob.glob('mt_green_fleet_checkpoint*.json'),
|
||
key=lambda x: -os.path.getmtime(x)
|
||
)
|
||
if _chk_files:
|
||
with open(_chk_files[0]) as _f:
|
||
_chk = _json.load(_f)
|
||
_resume = _chk.get('page', 1)
|
||
print(f"[RESTART] Resuming from checkpoint page {_resume}...", flush=True)
|
||
if '--start' in sys.argv:
|
||
idx = sys.argv.index('--start')
|
||
sys.argv[idx+1] = str(_resume)
|
||
else:
|
||
sys.argv.extend(['--start', str(_resume)])
|
||
except Exception as chk_err:
|
||
print(f"[RESTART] Could not read checkpoint: {chk_err}", flush=True)
|
||
time.sleep(15)
|
||
continue
|
||
print(f"\nFATAL ERROR: {type(top_err).__name__}: {top_err}", flush=True)
|
||
traceback.print_exc()
|
||
sys.exit(1)
|