montana/Русский/Логистика/mt_green_fleet.py

1121 lines
51 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
MT Green Fleet Scraper Get ALL cargo vessels from MT Data page via direct API pagination.
Strategy:
1. Login to MT Pro
2. Navigate to Data page (establishes session context)
3. Capture the API URL from the first response
4. Page through ALL results via page.evaluate(fetch()) with page=N
5. Filter client-side: keep only cargo types (Cargo, Bulk, Container, General, Multipurpose, Ro-Ro)
6. Save to mt_bulk_staging with all available data (name, mmsi, imo, owner, position)
MT has ~2.8M vessels total. Cargo ~30-40% = ~900k vessels.
At 0.5s/page × 500/page ~900k vessels in ~30 minutes!
Usage:
python mt_green_fleet.py --probe # Test: first 5 pages, show API format
python mt_green_fleet.py # Full scrape: all cargo vessels
python mt_green_fleet.py --start 100 # Resume from page 100
python mt_green_fleet.py --max-pages 500 # Limit pages
"""
import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse, re
import threading
import psycopg2
# ---- WATCHDOG: kills process if no heartbeat for 90 seconds ----
# This handles the case where asyncio.wait_for() hangs due to Playwright
# protocol issues (corrupted WebSocket state, frozen browser).
# The outer restart loop will restart from checkpoint.
_heartbeat_time = time.time()
_WATCHDOG_TIMEOUT = 180 # seconds — must be > login time (TOTP wait up to 30s + navigation 60-90s = ~150s)
def _update_heartbeat():
global _heartbeat_time
_heartbeat_time = time.time()
def _watchdog_thread():
while True:
time.sleep(10)
elapsed_since_heartbeat = time.time() - _heartbeat_time
if elapsed_since_heartbeat > _WATCHDOG_TIMEOUT:
print(f"\n[WATCHDOG] No heartbeat for {elapsed_since_heartbeat:.0f}s — exiting for restart!", flush=True)
sys.stdout.flush()
# Exit with code 2 — the outer bash wrapper will restart from checkpoint
os._exit(2) # code 2 = watchdog timeout, needs restart
_wd = threading.Thread(target=_watchdog_thread, daemon=True)
_wd.start()
# ---- END WATCHDOG ----
class BrowserRestartNeeded(Exception):
"""Raised when browser is completely dead and must be fully restarted."""
def __init__(self, resume_page):
self.resume_page = resume_page
super().__init__(f"Browser restart needed from page {resume_page}")
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
EMAIL = "operation@mrlogisticcorp.com"
PASSWORD = "NKh9i8Z!7fU9jfi"
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
PAGE_DELAY = 2.0 # seconds between pages (2s to avoid MT rate limiting / session hangs)
BATCH_SIZE = 500 # commit every N vessels
# Cargo vessel types to KEEP (filter out tankers, passenger, military, etc.)
CARGO_TYPES = {
# GREEN — all cargo vessel types
'cargo', 'bulk carrier', 'general cargo', 'container ship', 'container vessel',
'ro-ro cargo', 'reefer', 'multipurpose', 'heavy load carrier',
'wood chips carrier', 'cement carrier', 'livestock carrier', 'vehicle carrier',
'self-discharging bulk carrier', 'open hatch bulk carrier', 'bulk',
'ore carrier', 'aggregates carrier', 'obo carrier', 'barge',
'special cargo', 'other cargo', 'nuclear fuel carrier',
'inland cargo',
# RED — all tanker types
'tanker', 'oil products tanker', 'crude oil tanker', 'oil/chemical tanker',
'chemical tanker', 'lpg tanker', 'lng tanker', 'bunkering tanker',
'asphalt/bitumen tanker', 'water tanker', 'inland tanker',
'special tanker', 'other tanker', 'floating storage/production',
'oil', 'chemical', 'lpg', 'lng', 'bitumen', 'asphalt',
'fso', 'fpso', 'fsu',
}
# Columns to request — all ownership fields MT Pro provides
API_COLS = ('shipname,imo,mmsi,flag,ship_type,dwt,year_built,'
'lat_of_latest_position,lon_of_latest_position,'
'beneficial_owner,registered_owner,operator,commercial_manager')
# 500 rows per page — MT Pro default; 1000 causes memory pressure and Chromium crashes.
PER_PAGE = 500
def totp(secret):
s = secret.upper().replace(' ', '')
pad = (-len(s)) % 8
key = base64.b32decode(s + '=' * pad)
counter = int(time.time()) // 30
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff
return str(code % 1000000).zfill(6)
def db_connect():
return psycopg2.connect(
DB_URL, connect_timeout=15,
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5
)
def _restart_ssh_tunnel():
import subprocess
try:
subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5)
except Exception:
pass
time.sleep(2)
try:
subprocess.Popen(
['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120',
'-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no',
'-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
print(f" [SSH] Tunnel restarted, waiting 5s...")
time.sleep(5)
except Exception as e:
print(f" [SSH] Failed: {e}")
def db_reconnect(conn):
try:
conn.close()
except Exception:
pass
for attempt in range(5):
try:
time.sleep(3)
c = db_connect()
print(f" [DB] Reconnected (attempt {attempt+1})")
return c, c.cursor()
except Exception as e:
print(f" [DB] Attempt {attempt+1} failed: {e}")
print(f" [DB] Restarting SSH tunnel...")
_restart_ssh_tunnel()
for attempt in range(10):
try:
time.sleep(5)
c = db_connect()
print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})")
return c, c.cursor()
except Exception as e:
print(f" [DB] Post-restart attempt {attempt+1} failed: {e}")
if attempt == 4:
_restart_ssh_tunnel()
raise Exception("DB reconnect failed")
def db_safe_execute(conn, cur, query, params=None):
try:
cur.execute(query, params)
return conn, cur
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
print(f" [DB] Lost on execute ({e}), reconnecting...")
conn, cur = db_reconnect(conn)
cur.execute(query, params)
return conn, cur
except psycopg2.Error as e:
try:
conn.rollback() # Clear aborted transaction state
except Exception:
pass
raise # Re-raise so caller can skip this record
def db_safe_commit(conn):
try:
conn.commit()
return conn
except (psycopg2.InterfaceError, psycopg2.OperationalError):
print(f" [DB] Lost on commit, reconnecting...")
conn, _ = db_reconnect(conn)
return conn
def is_cargo(ship_type_str):
"""Return True if vessel is cargo (green) or tanker (red) type."""
if not ship_type_str:
return False
st = ship_type_str.lower().strip()
for ct in CARGO_TYPES:
if ct in st or st in ct:
return True
return False
def totp_wait_for_fresh(min_remaining=8):
"""Wait until TOTP window has at least min_remaining seconds left, then return code.
Avoids entering a code that will expire before the server validates it."""
while True:
remaining = 30 - (int(time.time()) % 30)
if remaining >= min_remaining:
break
wait = remaining + 1
print(f" [TOTP] Window too close to expiry ({remaining}s left), waiting {wait}s...", flush=True)
time.sleep(wait)
code = totp(TOTP_SECRET)
remaining = 30 - (int(time.time()) % 30)
print(f" [TOTP] Code={code} | {remaining}s remaining in window", flush=True)
return code
async def do_login(page, max_retries=3):
for attempt in range(max_retries):
_update_heartbeat() # Reset watchdog for each login attempt (can take 60-120s)
print(f"LOGIN (attempt {attempt+1}/{max_retries})...")
# Navigate to login page — use asyncio.wait_for as outer timeout guard
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/users/login',
wait_until='domcontentloaded', timeout=30000),
timeout=35.0
)
except asyncio.TimeoutError:
print(f" goto asyncio timeout (35s) — continuing with current page", flush=True)
except Exception as nav_err:
print(f" goto error: {nav_err}")
await asyncio.sleep(3)
_update_heartbeat() # Reset after navigation (wait_for_selector can be slow)
print(f" After goto: {page.url[:80]}")
# --- Step 1: Fill email and click Continue ---
try:
# Wait for username input to appear (the page may redirect to auth.kpler)
await page.wait_for_selector('input[name="username"]', timeout=20000)
await page.fill('input[name="username"]', EMAIL, timeout=15000)
print(f" Filled email", flush=True)
# JS click to avoid blocking on navigation
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
print(f" Clicked Continue (JS)", flush=True)
# Wait for URL to change to password page
try:
await page.wait_for_url('**/login/password**', timeout=20000)
print(f" Password page loaded", flush=True)
except Exception:
await asyncio.sleep(5)
print(f" URL after Continue: {page.url[:80]}", flush=True)
except Exception as fill_err:
print(f" email/continue step error: {fill_err}", flush=True)
await asyncio.sleep(5)
# --- Step 2: Fill password ---
try:
await page.wait_for_selector('input[name="password"]', timeout=20000)
print(f" Filling password... URL={page.url[:60]}", flush=True)
await page.fill('input[name="password"]', PASSWORD, timeout=15000)
# JS click to avoid Playwright blocking on navigation after password submit
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
print(f" Clicked password submit (JS)", flush=True)
except Exception as pass_err:
print(f" password step error: {pass_err}", flush=True)
# Wait for navigation to MFA options page
try:
await page.wait_for_url('**/mfa**', timeout=25000)
print(f" MFA page loaded: {page.url[:60]}", flush=True)
except Exception:
await asyncio.sleep(5)
url = page.url
print(f" After password: {url[:80]}")
# --- Step 3: MFA flow ---
if 'auth.kpler' in url:
# If on mfa-login-options page, click Google Authenticator button
if 'mfa-login-options' in url:
print(" Clicking Google Authenticator option...", flush=True)
try:
# JS click to avoid navigation blocking
clicked = await page.evaluate("""
() => {
const selectors = [
'button[value*="otp"]',
'button[aria-label*="Authenticator"]',
'button[aria-label*="authenticator"]',
];
for (const sel of selectors) {
const b = document.querySelector(sel);
if (b) { b.click(); return sel; }
}
// Fallback: find by text
for (const b of document.querySelectorAll('button')) {
if (b.textContent.includes('Authenticator') || b.textContent.includes('authenticator')) {
b.click(); return 'text:' + b.textContent.trim().slice(0,30);
}
}
return null;
}
""")
print(f" GA JS click: selector={clicked}", flush=True)
# Wait for navigation to OTP challenge page
await page.wait_for_url('**/mfa-otp-challenge**', timeout=20000)
print(f" GA clicked, OTP page loaded", flush=True)
except Exception as ex:
print(f" GA click error: {ex}", flush=True)
await asyncio.sleep(5)
print(f" After GA click: {page.url[:80]}", flush=True)
# Fill TOTP code (with timing check to avoid expiry during validation)
if 'mfa-otp-challenge' in page.url or 'auth.kpler' in page.url:
_update_heartbeat() # Reset watchdog — totp_wait_for_fresh can wait up to 30s
otp = totp_wait_for_fresh(min_remaining=8)
try:
await page.wait_for_selector('input[name="code"]', timeout=15000)
await page.fill('input[name="code"]', otp, timeout=15000)
print(f" OTP filled, clicking submit via JS...", flush=True)
# Use JS click to avoid Playwright navigation wait blocking
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
except Exception as ex:
print(f" 2FA fill/click error: {ex}", flush=True)
_update_heartbeat() # Reset watchdog before wait_for_url (25s wait)
# Wait for post-OTP navigation (MT home page or similar)
try:
await page.wait_for_url('**marinetraffic.com**', timeout=25000)
print(f" OTP submitted, redirected: {page.url[:60]}", flush=True)
except Exception:
await asyncio.sleep(15)
print(f" After OTP submit: {page.url[:80]}", flush=True)
_update_heartbeat() # Reset after redirect
ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
if ok:
print(f" Login: OK | {page.url[:60]}")
return True
print(f" Login failed | {page.url[:80]}")
if attempt < max_retries - 1:
# Clear auth cookies and start fresh for next attempt
try:
await page.context.clear_cookies()
print(f" Cleared cookies for retry", flush=True)
except Exception:
pass
await asyncio.sleep(10)
return False
async def fetch_page(page, api_base_url, page_num, timeout_ms=30000):
"""Fetch one page of vessel data via page.evaluate(fetch()) with session cookies."""
# Build URL: replace existing page param or append it
if 'page=' in api_base_url:
url = re.sub(r'page=\d+', f'page={page_num}', api_base_url)
else:
url = api_base_url + f'&page={page_num}'
js = f"""
async () => {{
const controller = new AbortController();
const tid = setTimeout(() => controller.abort(), {timeout_ms});
try {{
const r = await fetch({json.dumps(url)}, {{
signal: controller.signal,
credentials: 'include',
cache: 'no-store',
headers: {{
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json',
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
}}
}});
clearTimeout(tid);
if (r.status !== 200) return {{error: 'HTTP ' + r.status, status: r.status}};
const text = await r.text();
return {{status: r.status, body: text}};
}} catch(e) {{
clearTimeout(tid);
if (e.name === 'AbortError') return {{error: 'JS_TIMEOUT', status: -1}};
return {{error: e.message}};
}}
}}
"""
try:
# asyncio.wait_for() with 50s timeout.
# If it fires, the browser is completely frozen (no JS response possible).
# After this timeout we MUST do a full browser restart — we cannot use this
# browser context anymore (cancelling corrupts Playwright protocol state).
# The -3 return value signals BrowserRestartNeeded in the main loop.
_update_heartbeat() # Reset watchdog timer before potentially-blocking call
result = await asyncio.wait_for(page.evaluate(js), timeout=50.0)
_update_heartbeat() # Update after evaluate returns
if result.get('error'):
err_msg = result.get('error', '')
err_status = result.get('status', 0)
if err_msg == 'JS_TIMEOUT' or err_status == -1:
print(f" fetch JS_TIMEOUT page {page_num} (aborted after {timeout_ms}ms)", flush=True)
return None, -1 # timeout signal → page reload
print(f" fetch error page {page_num}: {err_msg} (HTTP {err_status})", flush=True)
return None, err_status
body = result.get('body', '')
data = json.loads(body)
total = data.get('totalCount', 0)
rows = data.get('data', [])
return rows, total
except asyncio.TimeoutError:
print(f" fetch HARD TIMEOUT page {page_num} (50s) — browser frozen, must restart", flush=True)
return None, -3 # -3 signals browser dead → must restart entire browser
except Exception as e:
err_str = str(e)
print(f" fetch error page {page_num}: {err_str[:120]}", flush=True)
if 'Target crashed' in err_str or 'target crashed' in err_str.lower():
return None, -2 # -2 signals tab crash → create new tab
if 'closed' in err_str.lower() or 'TargetClosed' in err_str:
return None, -2 # treat as tab crash
return None, 0
def parse_row(row):
"""Extract vessel data from one API row dict."""
if not isinstance(row, dict):
return None
v = {}
for k in ['SHIP_ID', 'SHIPID']:
if k in row and row[k]:
v['ship_id'] = str(row[k])
break
for k in ['SHIPNAME', 'NAME']:
if k in row and row[k]:
v['name'] = str(row[k]).strip()
break
for k in ['MMSI', 'mmsi']:
val = row.get(k)
if val and str(val) not in ('0', '', 'None'):
v['mmsi'] = str(val)
break
for k in ['IMO', 'imo']:
val = row.get(k)
if val and str(val) not in ('0', '', 'None'):
v['imo'] = str(val)
break
for k in ['FLAG', 'flag']:
if k in row and row[k]:
v['flag'] = str(row[k])
break
for k in ['TYPE_SUMMARY', 'SHIP_TYPE', 'SHIPTYPE']:
if k in row and row[k]:
v['ship_type'] = str(row[k])
break
for k in ['DWT', 'dwt']:
if k in row and row[k]:
try:
v['dwt'] = int(float(str(row[k]).replace(',', '')))
except Exception:
pass
break
for k in ['YEAR_BUILT', 'year_built']:
if k in row and row[k]:
try:
v['year_built'] = int(row[k])
except Exception:
pass
break
for k in ['LAT_OF_LATEST_POSITION', 'LAT', 'lat']:
if k in row and row[k]:
try:
v['lat'] = float(row[k])
except Exception:
pass
break
for k in ['LON_OF_LATEST_POSITION', 'LON', 'lon']:
if k in row and row[k]:
try:
v['lon'] = float(row[k])
except Exception:
pass
break
for k in ['BENEFICIAL_OWNER', 'MANAGER_OWNER']:
if k in row and row[k]:
v['owner'] = str(row[k])
break
for k in ['REGISTERED_OWNER']:
if k in row and row[k]:
v['registered_owner'] = str(row[k])
break
for k in ['OPERATOR']:
if k in row and row[k]:
v['operator'] = str(row[k])
break
for k in ['COMMERCIAL_MANAGER', 'commercial_manager']:
if k in row and row[k]:
v['commercial_manager'] = str(row[k])
break
if not v.get('ship_id') and not v.get('mmsi') and not v.get('name'):
return None
return v
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--probe', action='store_true', help='Test: first 5 pages only')
parser.add_argument('--start', type=int, default=1, help='Start from page N (resume)')
parser.add_argument('--max-pages', type=int, default=0, help='Max pages (0=unlimited)')
parser.add_argument('--delay', type=float, default=PAGE_DELAY)
args = parser.parse_args()
if args.probe:
args.max_pages = 5
# --- DB ---
try:
conn = db_connect()
cur = conn.cursor()
cur.execute('SELECT count(*) FROM mt_bulk_staging')
existing_count = cur.fetchone()[0]
cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL')
existing_ids = set(str(r[0]) for r in cur.fetchall())
print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids")
except Exception as e:
print(f"DB ERROR: {e}")
print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
return
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False, # Must be False: CF TLS fingerprint check + MT session cookies
args=[
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
]
)
ctx = await browser.new_context(
viewport={'width': 800, 'height': 600}, # Smaller viewport = less GPU memory
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
)
page = await ctx.new_page()
# NOTE: Do NOT set default_timeout here — it would break login flow.
# Crash/hang recovery is handled via JS AbortController in fetch_page()
# and the -2 crash signal → recover_page() in the main loop.
COOKIE_FILE = 'mt_session_cookies.json'
# Try loading saved cookies first (fast path: no login needed)
cookie_loaded = False
if args.start > 1 and os.path.exists(COOKIE_FILE):
try:
with open(COOKIE_FILE) as f:
saved_cookies = json.load(f)
await ctx.add_cookies(saved_cookies)
print(f"Loaded {len(saved_cookies)} cookies from {COOKIE_FILE}")
# Navigate to /data/ with 'commit' to activate session token server-side.
# Then navigate to /ais/home (lightweight, same-origin) to stay in valid context.
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='commit', timeout=30000),
timeout=35.0
)
except Exception:
pass
await asyncio.sleep(2)
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/ais/home',
wait_until='commit', timeout=20000),
timeout=25.0
)
except Exception:
pass
await asyncio.sleep(2)
# Quick API test to verify session validity from same-origin context
test_url = ('https://www.marinetraffic.com/en/reports/?asset_type=vessels'
'&columns=shipname&per_page=1&page=1')
test_rows, test_total = await fetch_page(page, test_url, 1)
if test_rows is not None:
print(f"Cookie session valid! Skipping login.")
cookie_loaded = True
else:
print(f"Cookie session invalid (status {test_total}). Will re-login.")
await ctx.clear_cookies()
except Exception as ck_err:
print(f"Cookie load failed: {ck_err}. Will login fresh.")
# --- Login (skip if cookies loaded successfully) ---
if not cookie_loaded:
if not await do_login(page):
print("LOGIN FAILED!")
await browser.close()
conn.close()
return
# Wait for OAuth auto-redirect to complete (MT redirects oauth/callback → /ais/home)
# This prevents the /data/ navigation from conflicting with the pending redirect.
try:
await page.wait_for_url('**/ais/home**', timeout=15000)
print(f" OAuth redirect complete: {page.url[:60]}", flush=True)
except Exception:
await asyncio.sleep(5) # Wait anyway even if URL match fails
print(f" After redirect wait: {page.url[:60]}", flush=True)
# Save cookies for fast restart
try:
cookies = await ctx.cookies()
with open(COOKIE_FILE, 'w') as f:
json.dump(cookies, f)
print(f"Saved {len(cookies)} session cookies to {COOKIE_FILE}")
except Exception as ck_err:
print(f"Cookie save failed (non-fatal): {ck_err}")
# After OAuth redirect we're already on marinetraffic.com with valid session.
# The reports API fetch() calls work when the browser is on /ais/home.
# In headless=True mode, /ais/home's WebSocket map does NOT consume significant
# resources (no GPU/canvas rendering), so it's safe to stay here.
# DO NOT navigate to /en/reports/... directly — that causes HTTP 403 on subsequent
# fetch() calls (MT server sees it as a direct API hit, not a browser XHR).
# Must visit /data/ before /reports/ API — MT sets a required session token
# (e.g. 'mt-data-access') when the /data/ page loads.
# Must visit /data/ to set the MT session token for the reports API.
# Without this visit, all /en/reports/ requests return 403.
# We navigate with domcontentloaded and wait briefly, then STAY on /data/.
# CDP garbage collection every 15 pages prevents V8 OOM crashes.
_update_heartbeat() # Reset watchdog before /data/ navigation (can take 30s+)
print("\nNavigating to /data/ (required to unlock reports API)...")
try:
# Use asyncio.wait_for() as a fallback in case Playwright's internal
# timeout doesn't fire (known issue when browser is under load).
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='commit', timeout=30000),
timeout=35.0
)
print(f" /data/ commit done", flush=True)
_update_heartbeat() # Reset after commit
except asyncio.TimeoutError:
print(f" /data/ nav asyncio timeout (35s) — continuing anyway", flush=True)
except Exception as nav_err:
print(f" /data/ nav error (non-fatal): {nav_err}", flush=True)
# Save cookies immediately after /data/ commit (before SPA JS executes and crashes renderer)
try:
cookies = await ctx.cookies()
with open(COOKIE_FILE, 'w') as f:
json.dump(cookies, f)
print(f"Saved {len(cookies)} cookies (after /data/ visit)")
except Exception:
pass
# Navigate to MT robots.txt — a plain text file, no JavaScript, no SPA,
# no crash risk. Same origin as marinetraffic.com so fetch() works without CORS.
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/robots.txt',
wait_until='domcontentloaded', timeout=15000),
timeout=18.0
)
print(f" Navigated to robots.txt (lightweight static page)", flush=True)
except Exception as nav_err:
print(f" robots.txt nav error: {nav_err} — continuing anyway", flush=True)
await asyncio.sleep(1)
try:
print(f" Page URL: {page.url[:80]}")
except Exception:
print(f" Page URL: (unavailable)")
# Hardcoded API base URL — confirmed working format
base_url = 'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
api_base = f'{base_url}&columns={API_COLS}&per_page={PER_PAGE}'
print(f" API base: {api_base[:120]}")
# --- Test page 1 (with retry on 403 — session might need extra time) ---
# MT reports API sometimes requires prior visit to /data/ page to unlock.
print(f"\nTesting API (page 1)...")
rows1, total1 = None, 0
data_page_visited = False
for api_attempt in range(6):
rows1, total1 = await fetch_page(page, api_base, 1)
if rows1 is not None:
break
if total1 == 403:
if api_attempt == 2 and not data_page_visited:
# After 2 failed attempts, revisit /data/ with 'commit' to unlock session
print(f" API 403 x{api_attempt+1} — re-visiting /data/ (commit) to unlock...")
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='commit', timeout=30000),
timeout=35.0
)
await asyncio.sleep(3)
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/robots.txt',
wait_until='domcontentloaded', timeout=15000),
timeout=18.0
)
print(f" /data/ commit + back to robots.txt", flush=True)
data_page_visited = True
except Exception as data_err:
print(f" /data/ nav error: {data_err}")
else:
print(f" API 403 — waiting 15s for session to establish (attempt {api_attempt+1}/6)...")
await asyncio.sleep(15)
else:
print(f" API error (status {total1}) — waiting 5s...")
await asyncio.sleep(5)
if rows1 is None:
print("ERROR: API calls failing after retries. Exiting.")
await browser.close()
conn.close()
return
total_in_mt = total1
est_pages = (total_in_mt + len(rows1) - 1) // max(len(rows1), 1) if rows1 else 0
print(f" Page 1: {len(rows1)} rows | totalCount={total_in_mt} | est_pages={est_pages}")
# Sample row keys
if rows1:
print(f" Row keys: {list(rows1[0].keys())[:15]}")
v = parse_row(rows1[0])
print(f" Sample: {v}")
# --- Main pagination loop ---
total_saved = 0
total_cargo = 0
total_skipped = 0
batch_count = 0
t0 = time.time()
start_page = args.start
max_pages = args.max_pages if args.max_pages else est_pages + 10
consecutive_errors = 0
checkpoint_file = 'mt_green_fleet_progress.json'
# Load checkpoint for resume
if args.start > 1 and os.path.exists(checkpoint_file):
with open(checkpoint_file) as f:
cp = json.load(f)
total_saved = cp.get('total_saved', 0)
total_cargo = cp.get('total_cargo', 0)
print(f" Resuming from page {start_page} | saved={total_saved}")
# If starting from page 1, the test already fetched page 1.
# Process rows1 directly and start the loop from page 2 to avoid double-fetch
# (MT rate-limits duplicate requests immediately and hangs the connection).
if args.start == 1 and rows1:
print(f" Processing test page 1 results directly (no double-fetch)...", flush=True)
for row in rows1:
v = parse_row(row)
if not v:
continue
ship_type = v.get('ship_type', '')
if not is_cargo(ship_type):
total_skipped += 1
continue
total_cargo += 1
sid = v.get('ship_id')
if not sid:
continue
is_new = sid not in existing_ids
existing_ids.add(sid)
st_lower = ship_type.lower()
if 'bulk' in st_lower: gt, cat = '6', 'bulk'
elif 'container' in st_lower: gt, cat = '11', 'container'
elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower: gt, cat = '12', 'roro'
else: gt, cat = '9', 'general'
try:
conn, cur = db_safe_execute(conn, cur, """
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, mmsi, imo, owner, registered_owner, operator,
commercial_manager, scraped_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
name=COALESCE(EXCLUDED.name,mt_bulk_staging.name),
mmsi=COALESCE(EXCLUDED.mmsi,mt_bulk_staging.mmsi),
imo=COALESCE(EXCLUDED.imo,mt_bulk_staging.imo),
owner=COALESCE(EXCLUDED.owner,mt_bulk_staging.owner),
registered_owner=COALESCE(EXCLUDED.registered_owner,mt_bulk_staging.registered_owner),
operator=COALESCE(EXCLUDED.operator,mt_bulk_staging.operator),
commercial_manager=COALESCE(EXCLUDED.commercial_manager,mt_bulk_staging.commercial_manager),
lat=COALESCE(EXCLUDED.lat,mt_bulk_staging.lat),
lon=COALESCE(EXCLUDED.lon,mt_bulk_staging.lon),
scraped_at=NOW()
""", (sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat,
v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'),
v.get('owner'), v.get('registered_owner'), v.get('operator'),
v.get('commercial_manager'),))
if is_new: total_saved += 1
batch_count += 1
except Exception as e:
print(f" DB error (test-page1): {str(e)[:80]}")
# Skip page 1 in the loop since we already processed it
start_page = 2
print(f"\n{'='*60}")
print(f"STARTING SCRAPE: pages {start_page} to {min(max_pages, est_pages)}")
print(f"Delay: {args.delay}s/page | Batch: {BATCH_SIZE}")
print(f"{'='*60}\n")
# Reload interval: DISABLED — reloading the heavy MT SPA page crashes the browser.
# Session cookies are maintained in the browser context, so no reload needed.
# Recovery via recover_page() only when a crash is actually detected.
PAGE_RELOAD_INTERVAL = 999999
async def recover_page(reason="crash"):
"""Close crashed page and open a fresh one, re-establish /data/ session context.
If the browser context itself is dead, raises BrowserRestartNeeded so the outer
loop can restart the whole browser from the checkpoint."""
nonlocal page
print(f" [Recovery] Creating new tab ({reason})...", flush=True)
try:
await page.close()
except Exception:
pass
await asyncio.sleep(5)
try:
page = await ctx.new_page()
except Exception as ctx_err:
# Browser context is dead — escalate to full browser restart
err_s = str(ctx_err)
print(f" [Recovery] BrowserContext dead ({err_s[:80]}), escalating to browser restart...", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
try:
# Navigate to robots.txt — static text, no SPA, same origin as MT.
# fetch() calls to the reports API work from this page without CORS issues.
await page.goto('https://www.marinetraffic.com/robots.txt',
wait_until='domcontentloaded', timeout=15000)
await asyncio.sleep(1)
print(f" [Recovery] New tab ready at robots.txt", flush=True)
return True
except Exception as re_err:
print(f" [Recovery] New tab goto failed: {re_err}", flush=True)
return False
# V8 heap GC every N fetches via CDP — no page navigation needed.
PAGE_REFRESH_EVERY = 15 # run CDP GC every 15 API fetches
for page_num in range(start_page, max_pages + 1):
# Periodic CDP GC to clear V8 heap and prevent Chromium OOM crash.
# Does NOT navigate away — no CORS issues, no SPA reload.
fetch_count = page_num - start_page
if fetch_count > 0 and fetch_count % PAGE_REFRESH_EVERY == 0:
print(f" [HeapFlush] Requesting V8 GC at page {page_num}...", flush=True)
try:
# Request garbage collection via Playwright CDP (DevTools Protocol).
# This frees accumulated V8 heap WITHOUT navigating away from the page.
# No tab crash, no CORS issues, no SPA reload needed.
cdp = await page.context.new_cdp_session(page)
await cdp.send('HeapProfiler.collectGarbage')
await cdp.detach()
print(f" [HeapFlush] GC done", flush=True)
except Exception as flush_err:
print(f" [HeapFlush] GC error: {flush_err} — continuing anyway", flush=True)
# Fetch page
rows, total = await fetch_page(page, api_base, page_num)
if rows is None:
consecutive_errors += 1
is_browser_dead = total == -3 # hard timeout → browser must be restarted
is_crash = total == -2 # tab crash → new tab
is_timeout = total == -1 # JS timeout → new tab
need_new_tab = is_timeout or is_crash
if is_browser_dead:
# Save checkpoint and raise so outer loop restarts browser
print(f" Page {page_num}: BROWSER DEAD — saving checkpoint and restarting browser...", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
if need_new_tab:
reason = 'CRASH' if is_crash else 'TIMEOUT'
print(f" Page {page_num}: {reason} — creating new tab...", flush=True)
ok = await recover_page("tab_crash" if is_crash else "timeout")
if not ok:
# Recovery failed immediately — context dead, escalate to browser restart
print(f" Immediate recovery failed — restarting browser from page {page_num}", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
consecutive_errors = max(consecutive_errors - 1, 0)
continue
print(f" Page {page_num}: FAILED (error #{consecutive_errors})", flush=True)
if consecutive_errors >= 3:
# Create fresh tab after 3 consecutive errors
print(f" {consecutive_errors} consecutive errors — creating fresh tab...", flush=True)
ok = await recover_page(f"errors_{consecutive_errors}")
if ok:
consecutive_errors = 0
else:
# Recovery failed — escalate to full browser restart
print(f" Recovery failed — restarting browser from page {page_num}", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
continue
await asyncio.sleep(3)
continue
consecutive_errors = 0
if not rows:
print(f" Page {page_num}: empty — reached end")
break
# Parse and filter
page_cargo = 0
for row in rows:
v = parse_row(row)
if not v:
continue
# Filter: keep only cargo types
ship_type = v.get('ship_type', '')
if not is_cargo(ship_type):
total_skipped += 1
continue
total_cargo += 1
page_cargo += 1
sid = v.get('ship_id')
if not sid:
continue
is_new = sid not in existing_ids
existing_ids.add(sid)
# Map gt_shiptype code
st_lower = ship_type.lower()
if 'bulk' in st_lower:
gt = '6'
cat = 'bulk'
elif 'container' in st_lower:
gt = '11'
cat = 'container'
elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower:
gt = '12'
cat = 'roro'
else:
gt = '9'
cat = 'general'
try:
conn, cur = db_safe_execute(conn, cur, """
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, mmsi, imo, owner, registered_owner, operator,
commercial_manager, scraped_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
name = COALESCE(EXCLUDED.name, mt_bulk_staging.name),
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
registered_owner = COALESCE(EXCLUDED.registered_owner, mt_bulk_staging.registered_owner),
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
commercial_manager = COALESCE(EXCLUDED.commercial_manager, mt_bulk_staging.commercial_manager),
lat = COALESCE(EXCLUDED.lat, mt_bulk_staging.lat),
lon = COALESCE(EXCLUDED.lon, mt_bulk_staging.lon),
scraped_at = NOW()
""", (
sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat,
v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'),
v.get('owner'), v.get('registered_owner'), v.get('operator'),
v.get('commercial_manager'),
))
if is_new:
total_saved += 1
batch_count += 1
except Exception as e:
print(f" DB error ({type(e).__name__}): {str(e)[:80]}")
# Commit batch
if batch_count >= BATCH_SIZE:
conn = db_safe_commit(conn)
cur = conn.cursor()
batch_count = 0
# Progress
elapsed = time.time() - t0
rate = (page_num - start_page + 1) / elapsed if elapsed > 0 else 0
remaining_pages = (est_pages - page_num)
eta_min = remaining_pages / rate / 60 if rate > 0 else 0
_update_heartbeat() # Reset watchdog on each page processed
if page_num <= 20 or page_num % 10 == 0:
print(f" Page {page_num}/{est_pages} | "
f"+{page_cargo} cargo | "
f"total_new={total_saved} | "
f"skipped={total_skipped} | "
f"{elapsed:.0f}s | ETA: {eta_min:.0f}m", flush=True)
# Save checkpoint every 5 pages so restarts (watchdog or browser crash) resume close to current position
if page_num % 5 == 0:
with open(checkpoint_file, 'w') as f:
json.dump({
'page': page_num,
'total_saved': total_saved,
'total_cargo': total_cargo,
'total_skipped': total_skipped,
'elapsed': elapsed,
}, f)
await asyncio.sleep(args.delay)
# Final commit
conn = db_safe_commit(conn)
cur = conn.cursor()
# Final stats
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging")
t, m, o = cur.fetchone()
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6'")
tb, mb, ob = cur.fetchone()
elapsed = time.time() - t0
print(f"\n{'='*60}")
print(f"SCRAPE COMPLETE — {elapsed/60:.1f} minutes")
print(f" Pages done: {page_num - start_page + 1}")
print(f" New vessels: {total_saved}")
print(f" Cargo found: {total_cargo}")
print(f" Non-cargo skipped: {total_skipped}")
print(f"\nGLOBAL: total={t} | mmsi={m} ({m*100//max(t,1)}%) | owner={o} ({o*100//max(t,1)}%)")
print(f"BULK: total={tb} | mmsi={mb} ({mb*100//max(tb,1)}%) | owner={ob} ({ob*100//max(tb,1)}%)")
print(f"{'='*60}")
conn.close()
await browser.close()
import traceback
# Outer restart loop — handles BrowserRestartNeeded by restarting from checkpoint
restart_count = 0
max_restarts = 200 # allow up to 200 browser restarts (plenty for 5752 pages)
while True:
try:
asyncio.run(main())
break # Normal completion
except BrowserRestartNeeded as br:
restart_count += 1
print(f"\n[RESTART #{restart_count}] Browser dead at page {br.resume_page}. Restarting from checkpoint...", flush=True)
if restart_count >= max_restarts:
print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True)
sys.exit(1)
# Patch sys.argv to resume from the checkpoint page
if '--start' in sys.argv:
idx = sys.argv.index('--start')
sys.argv[idx+1] = str(br.resume_page)
else:
sys.argv.extend(['--start', str(br.resume_page)])
time.sleep(10) # Brief pause before restarting browser
continue
except Exception as top_err:
err_s = str(top_err)
# TargetClosedError / context closed → try browser restart from checkpoint
if 'TargetClosed' in type(top_err).__name__ or 'Target page, context or browser has been closed' in err_s or 'browser has been closed' in err_s:
restart_count += 1
print(f"\n[RESTART #{restart_count}] Browser closed unexpectedly: {err_s[:120]}", flush=True)
if restart_count >= max_restarts:
print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True)
sys.exit(1)
# Try to resume from checkpoint if it exists
try:
import json as _json
import glob as _glob
# Search for the progress checkpoint file used by main()
_chk_files = sorted(
_glob.glob('mt_green_fleet_progress.json') + _glob.glob('mt_green_fleet_checkpoint*.json'),
key=lambda x: -os.path.getmtime(x)
)
if _chk_files:
with open(_chk_files[0]) as _f:
_chk = _json.load(_f)
_resume = _chk.get('page', 1)
print(f"[RESTART] Resuming from checkpoint page {_resume}...", flush=True)
if '--start' in sys.argv:
idx = sys.argv.index('--start')
sys.argv[idx+1] = str(_resume)
else:
sys.argv.extend(['--start', str(_resume)])
except Exception as chk_err:
print(f"[RESTART] Could not read checkpoint: {chk_err}", flush=True)
time.sleep(15)
continue
print(f"\nFATAL ERROR: {type(top_err).__name__}: {top_err}", flush=True)
traceback.print_exc()
sys.exit(1)