montana/Русский/Логистика/mt_green_fleet.py

1121 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
MT Green Fleet Scraper — Get ALL cargo vessels from MT Data page via direct API pagination.
Strategy:
1. Login to MT Pro
2. Navigate to Data page (establishes session context)
3. Capture the API URL from the first response
4. Page through ALL results via page.evaluate(fetch()) with page=N
5. Filter client-side: keep only cargo types (Cargo, Bulk, Container, General, Multipurpose, Ro-Ro)
6. Save to mt_bulk_staging with all available data (name, mmsi, imo, owner, position)
MT has ~2.8M vessels total. Cargo ~30-40% = ~900k vessels.
At 0.5s/page × 500/page → ~900k vessels in ~30 minutes!
Usage:
python mt_green_fleet.py --probe # Test: first 5 pages, show API format
python mt_green_fleet.py # Full scrape: all cargo vessels
python mt_green_fleet.py --start 100 # Resume from page 100
python mt_green_fleet.py --max-pages 500 # Limit pages
"""
import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse, re
import threading
import psycopg2
# ---- WATCHDOG: kills process if no heartbeat for 90 seconds ----
# This handles the case where asyncio.wait_for() hangs due to Playwright
# protocol issues (corrupted WebSocket state, frozen browser).
# The outer restart loop will restart from checkpoint.
_heartbeat_time = time.time()
_WATCHDOG_TIMEOUT = 180 # seconds — must be > login time (TOTP wait up to 30s + navigation 60-90s = ~150s)
def _update_heartbeat():
global _heartbeat_time
_heartbeat_time = time.time()
def _watchdog_thread():
while True:
time.sleep(10)
elapsed_since_heartbeat = time.time() - _heartbeat_time
if elapsed_since_heartbeat > _WATCHDOG_TIMEOUT:
print(f"\n[WATCHDOG] No heartbeat for {elapsed_since_heartbeat:.0f}s — exiting for restart!", flush=True)
sys.stdout.flush()
# Exit with code 2 — the outer bash wrapper will restart from checkpoint
os._exit(2) # code 2 = watchdog timeout, needs restart
_wd = threading.Thread(target=_watchdog_thread, daemon=True)
_wd.start()
# ---- END WATCHDOG ----
class BrowserRestartNeeded(Exception):
"""Raised when browser is completely dead and must be fully restarted."""
def __init__(self, resume_page):
self.resume_page = resume_page
super().__init__(f"Browser restart needed from page {resume_page}")
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
EMAIL = "operation@mrlogisticcorp.com"
PASSWORD = "NKh9i8Z!7fU9jfi"
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
PAGE_DELAY = 2.0 # seconds between pages (2s to avoid MT rate limiting / session hangs)
BATCH_SIZE = 500 # commit every N vessels
# Cargo vessel types to KEEP (filter out tankers, passenger, military, etc.)
CARGO_TYPES = {
# GREEN — all cargo vessel types
'cargo', 'bulk carrier', 'general cargo', 'container ship', 'container vessel',
'ro-ro cargo', 'reefer', 'multipurpose', 'heavy load carrier',
'wood chips carrier', 'cement carrier', 'livestock carrier', 'vehicle carrier',
'self-discharging bulk carrier', 'open hatch bulk carrier', 'bulk',
'ore carrier', 'aggregates carrier', 'obo carrier', 'barge',
'special cargo', 'other cargo', 'nuclear fuel carrier',
'inland cargo',
# RED — all tanker types
'tanker', 'oil products tanker', 'crude oil tanker', 'oil/chemical tanker',
'chemical tanker', 'lpg tanker', 'lng tanker', 'bunkering tanker',
'asphalt/bitumen tanker', 'water tanker', 'inland tanker',
'special tanker', 'other tanker', 'floating storage/production',
'oil', 'chemical', 'lpg', 'lng', 'bitumen', 'asphalt',
'fso', 'fpso', 'fsu',
}
# Columns to request — all ownership fields MT Pro provides
API_COLS = ('shipname,imo,mmsi,flag,ship_type,dwt,year_built,'
'lat_of_latest_position,lon_of_latest_position,'
'beneficial_owner,registered_owner,operator,commercial_manager')
# 500 rows per page — MT Pro default; 1000 causes memory pressure and Chromium crashes.
PER_PAGE = 500
def totp(secret):
s = secret.upper().replace(' ', '')
pad = (-len(s)) % 8
key = base64.b32decode(s + '=' * pad)
counter = int(time.time()) // 30
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff
return str(code % 1000000).zfill(6)
def db_connect():
return psycopg2.connect(
DB_URL, connect_timeout=15,
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5
)
def _restart_ssh_tunnel():
import subprocess
try:
subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5)
except Exception:
pass
time.sleep(2)
try:
subprocess.Popen(
['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120',
'-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no',
'-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
print(f" [SSH] Tunnel restarted, waiting 5s...")
time.sleep(5)
except Exception as e:
print(f" [SSH] Failed: {e}")
def db_reconnect(conn):
try:
conn.close()
except Exception:
pass
for attempt in range(5):
try:
time.sleep(3)
c = db_connect()
print(f" [DB] Reconnected (attempt {attempt+1})")
return c, c.cursor()
except Exception as e:
print(f" [DB] Attempt {attempt+1} failed: {e}")
print(f" [DB] Restarting SSH tunnel...")
_restart_ssh_tunnel()
for attempt in range(10):
try:
time.sleep(5)
c = db_connect()
print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})")
return c, c.cursor()
except Exception as e:
print(f" [DB] Post-restart attempt {attempt+1} failed: {e}")
if attempt == 4:
_restart_ssh_tunnel()
raise Exception("DB reconnect failed")
def db_safe_execute(conn, cur, query, params=None):
try:
cur.execute(query, params)
return conn, cur
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
print(f" [DB] Lost on execute ({e}), reconnecting...")
conn, cur = db_reconnect(conn)
cur.execute(query, params)
return conn, cur
except psycopg2.Error as e:
try:
conn.rollback() # Clear aborted transaction state
except Exception:
pass
raise # Re-raise so caller can skip this record
def db_safe_commit(conn):
try:
conn.commit()
return conn
except (psycopg2.InterfaceError, psycopg2.OperationalError):
print(f" [DB] Lost on commit, reconnecting...")
conn, _ = db_reconnect(conn)
return conn
def is_cargo(ship_type_str):
"""Return True if vessel is cargo (green) or tanker (red) type."""
if not ship_type_str:
return False
st = ship_type_str.lower().strip()
for ct in CARGO_TYPES:
if ct in st or st in ct:
return True
return False
def totp_wait_for_fresh(min_remaining=8):
"""Wait until TOTP window has at least min_remaining seconds left, then return code.
Avoids entering a code that will expire before the server validates it."""
while True:
remaining = 30 - (int(time.time()) % 30)
if remaining >= min_remaining:
break
wait = remaining + 1
print(f" [TOTP] Window too close to expiry ({remaining}s left), waiting {wait}s...", flush=True)
time.sleep(wait)
code = totp(TOTP_SECRET)
remaining = 30 - (int(time.time()) % 30)
print(f" [TOTP] Code={code} | {remaining}s remaining in window", flush=True)
return code
async def do_login(page, max_retries=3):
for attempt in range(max_retries):
_update_heartbeat() # Reset watchdog for each login attempt (can take 60-120s)
print(f"LOGIN (attempt {attempt+1}/{max_retries})...")
# Navigate to login page — use asyncio.wait_for as outer timeout guard
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/users/login',
wait_until='domcontentloaded', timeout=30000),
timeout=35.0
)
except asyncio.TimeoutError:
print(f" goto asyncio timeout (35s) — continuing with current page", flush=True)
except Exception as nav_err:
print(f" goto error: {nav_err}")
await asyncio.sleep(3)
_update_heartbeat() # Reset after navigation (wait_for_selector can be slow)
print(f" After goto: {page.url[:80]}")
# --- Step 1: Fill email and click Continue ---
try:
# Wait for username input to appear (the page may redirect to auth.kpler)
await page.wait_for_selector('input[name="username"]', timeout=20000)
await page.fill('input[name="username"]', EMAIL, timeout=15000)
print(f" Filled email", flush=True)
# JS click to avoid blocking on navigation
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
print(f" Clicked Continue (JS)", flush=True)
# Wait for URL to change to password page
try:
await page.wait_for_url('**/login/password**', timeout=20000)
print(f" Password page loaded", flush=True)
except Exception:
await asyncio.sleep(5)
print(f" URL after Continue: {page.url[:80]}", flush=True)
except Exception as fill_err:
print(f" email/continue step error: {fill_err}", flush=True)
await asyncio.sleep(5)
# --- Step 2: Fill password ---
try:
await page.wait_for_selector('input[name="password"]', timeout=20000)
print(f" Filling password... URL={page.url[:60]}", flush=True)
await page.fill('input[name="password"]', PASSWORD, timeout=15000)
# JS click to avoid Playwright blocking on navigation after password submit
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
print(f" Clicked password submit (JS)", flush=True)
except Exception as pass_err:
print(f" password step error: {pass_err}", flush=True)
# Wait for navigation to MFA options page
try:
await page.wait_for_url('**/mfa**', timeout=25000)
print(f" MFA page loaded: {page.url[:60]}", flush=True)
except Exception:
await asyncio.sleep(5)
url = page.url
print(f" After password: {url[:80]}")
# --- Step 3: MFA flow ---
if 'auth.kpler' in url:
# If on mfa-login-options page, click Google Authenticator button
if 'mfa-login-options' in url:
print(" Clicking Google Authenticator option...", flush=True)
try:
# JS click to avoid navigation blocking
clicked = await page.evaluate("""
() => {
const selectors = [
'button[value*="otp"]',
'button[aria-label*="Authenticator"]',
'button[aria-label*="authenticator"]',
];
for (const sel of selectors) {
const b = document.querySelector(sel);
if (b) { b.click(); return sel; }
}
// Fallback: find by text
for (const b of document.querySelectorAll('button')) {
if (b.textContent.includes('Authenticator') || b.textContent.includes('authenticator')) {
b.click(); return 'text:' + b.textContent.trim().slice(0,30);
}
}
return null;
}
""")
print(f" GA JS click: selector={clicked}", flush=True)
# Wait for navigation to OTP challenge page
await page.wait_for_url('**/mfa-otp-challenge**', timeout=20000)
print(f" GA clicked, OTP page loaded", flush=True)
except Exception as ex:
print(f" GA click error: {ex}", flush=True)
await asyncio.sleep(5)
print(f" After GA click: {page.url[:80]}", flush=True)
# Fill TOTP code (with timing check to avoid expiry during validation)
if 'mfa-otp-challenge' in page.url or 'auth.kpler' in page.url:
_update_heartbeat() # Reset watchdog — totp_wait_for_fresh can wait up to 30s
otp = totp_wait_for_fresh(min_remaining=8)
try:
await page.wait_for_selector('input[name="code"]', timeout=15000)
await page.fill('input[name="code"]', otp, timeout=15000)
print(f" OTP filled, clicking submit via JS...", flush=True)
# Use JS click to avoid Playwright navigation wait blocking
await page.evaluate("() => { const b = document.querySelector('button[type=\"submit\"]'); if(b) b.click(); }")
except Exception as ex:
print(f" 2FA fill/click error: {ex}", flush=True)
_update_heartbeat() # Reset watchdog before wait_for_url (25s wait)
# Wait for post-OTP navigation (MT home page or similar)
try:
await page.wait_for_url('**marinetraffic.com**', timeout=25000)
print(f" OTP submitted, redirected: {page.url[:60]}", flush=True)
except Exception:
await asyncio.sleep(15)
print(f" After OTP submit: {page.url[:80]}", flush=True)
_update_heartbeat() # Reset after redirect
ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
if ok:
print(f" Login: OK | {page.url[:60]}")
return True
print(f" Login failed | {page.url[:80]}")
if attempt < max_retries - 1:
# Clear auth cookies and start fresh for next attempt
try:
await page.context.clear_cookies()
print(f" Cleared cookies for retry", flush=True)
except Exception:
pass
await asyncio.sleep(10)
return False
async def fetch_page(page, api_base_url, page_num, timeout_ms=30000):
"""Fetch one page of vessel data via page.evaluate(fetch()) with session cookies."""
# Build URL: replace existing page param or append it
if 'page=' in api_base_url:
url = re.sub(r'page=\d+', f'page={page_num}', api_base_url)
else:
url = api_base_url + f'&page={page_num}'
js = f"""
async () => {{
const controller = new AbortController();
const tid = setTimeout(() => controller.abort(), {timeout_ms});
try {{
const r = await fetch({json.dumps(url)}, {{
signal: controller.signal,
credentials: 'include',
cache: 'no-store',
headers: {{
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json',
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
}}
}});
clearTimeout(tid);
if (r.status !== 200) return {{error: 'HTTP ' + r.status, status: r.status}};
const text = await r.text();
return {{status: r.status, body: text}};
}} catch(e) {{
clearTimeout(tid);
if (e.name === 'AbortError') return {{error: 'JS_TIMEOUT', status: -1}};
return {{error: e.message}};
}}
}}
"""
try:
# asyncio.wait_for() with 50s timeout.
# If it fires, the browser is completely frozen (no JS response possible).
# After this timeout we MUST do a full browser restart — we cannot use this
# browser context anymore (cancelling corrupts Playwright protocol state).
# The -3 return value signals BrowserRestartNeeded in the main loop.
_update_heartbeat() # Reset watchdog timer before potentially-blocking call
result = await asyncio.wait_for(page.evaluate(js), timeout=50.0)
_update_heartbeat() # Update after evaluate returns
if result.get('error'):
err_msg = result.get('error', '')
err_status = result.get('status', 0)
if err_msg == 'JS_TIMEOUT' or err_status == -1:
print(f" fetch JS_TIMEOUT page {page_num} (aborted after {timeout_ms}ms)", flush=True)
return None, -1 # timeout signal → page reload
print(f" fetch error page {page_num}: {err_msg} (HTTP {err_status})", flush=True)
return None, err_status
body = result.get('body', '')
data = json.loads(body)
total = data.get('totalCount', 0)
rows = data.get('data', [])
return rows, total
except asyncio.TimeoutError:
print(f" fetch HARD TIMEOUT page {page_num} (50s) — browser frozen, must restart", flush=True)
return None, -3 # -3 signals browser dead → must restart entire browser
except Exception as e:
err_str = str(e)
print(f" fetch error page {page_num}: {err_str[:120]}", flush=True)
if 'Target crashed' in err_str or 'target crashed' in err_str.lower():
return None, -2 # -2 signals tab crash → create new tab
if 'closed' in err_str.lower() or 'TargetClosed' in err_str:
return None, -2 # treat as tab crash
return None, 0
def parse_row(row):
"""Extract vessel data from one API row dict."""
if not isinstance(row, dict):
return None
v = {}
for k in ['SHIP_ID', 'SHIPID']:
if k in row and row[k]:
v['ship_id'] = str(row[k])
break
for k in ['SHIPNAME', 'NAME']:
if k in row and row[k]:
v['name'] = str(row[k]).strip()
break
for k in ['MMSI', 'mmsi']:
val = row.get(k)
if val and str(val) not in ('0', '', 'None'):
v['mmsi'] = str(val)
break
for k in ['IMO', 'imo']:
val = row.get(k)
if val and str(val) not in ('0', '', 'None'):
v['imo'] = str(val)
break
for k in ['FLAG', 'flag']:
if k in row and row[k]:
v['flag'] = str(row[k])
break
for k in ['TYPE_SUMMARY', 'SHIP_TYPE', 'SHIPTYPE']:
if k in row and row[k]:
v['ship_type'] = str(row[k])
break
for k in ['DWT', 'dwt']:
if k in row and row[k]:
try:
v['dwt'] = int(float(str(row[k]).replace(',', '')))
except Exception:
pass
break
for k in ['YEAR_BUILT', 'year_built']:
if k in row and row[k]:
try:
v['year_built'] = int(row[k])
except Exception:
pass
break
for k in ['LAT_OF_LATEST_POSITION', 'LAT', 'lat']:
if k in row and row[k]:
try:
v['lat'] = float(row[k])
except Exception:
pass
break
for k in ['LON_OF_LATEST_POSITION', 'LON', 'lon']:
if k in row and row[k]:
try:
v['lon'] = float(row[k])
except Exception:
pass
break
for k in ['BENEFICIAL_OWNER', 'MANAGER_OWNER']:
if k in row and row[k]:
v['owner'] = str(row[k])
break
for k in ['REGISTERED_OWNER']:
if k in row and row[k]:
v['registered_owner'] = str(row[k])
break
for k in ['OPERATOR']:
if k in row and row[k]:
v['operator'] = str(row[k])
break
for k in ['COMMERCIAL_MANAGER', 'commercial_manager']:
if k in row and row[k]:
v['commercial_manager'] = str(row[k])
break
if not v.get('ship_id') and not v.get('mmsi') and not v.get('name'):
return None
return v
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--probe', action='store_true', help='Test: first 5 pages only')
parser.add_argument('--start', type=int, default=1, help='Start from page N (resume)')
parser.add_argument('--max-pages', type=int, default=0, help='Max pages (0=unlimited)')
parser.add_argument('--delay', type=float, default=PAGE_DELAY)
args = parser.parse_args()
if args.probe:
args.max_pages = 5
# --- DB ---
try:
conn = db_connect()
cur = conn.cursor()
cur.execute('SELECT count(*) FROM mt_bulk_staging')
existing_count = cur.fetchone()[0]
cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL')
existing_ids = set(str(r[0]) for r in cur.fetchall())
print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids")
except Exception as e:
print(f"DB ERROR: {e}")
print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
return
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False, # Must be False: CF TLS fingerprint check + MT session cookies
args=[
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
]
)
ctx = await browser.new_context(
viewport={'width': 800, 'height': 600}, # Smaller viewport = less GPU memory
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
)
page = await ctx.new_page()
# NOTE: Do NOT set default_timeout here — it would break login flow.
# Crash/hang recovery is handled via JS AbortController in fetch_page()
# and the -2 crash signal → recover_page() in the main loop.
COOKIE_FILE = 'mt_session_cookies.json'
# Try loading saved cookies first (fast path: no login needed)
cookie_loaded = False
if args.start > 1 and os.path.exists(COOKIE_FILE):
try:
with open(COOKIE_FILE) as f:
saved_cookies = json.load(f)
await ctx.add_cookies(saved_cookies)
print(f"Loaded {len(saved_cookies)} cookies from {COOKIE_FILE}")
# Navigate to /data/ with 'commit' to activate session token server-side.
# Then navigate to /ais/home (lightweight, same-origin) to stay in valid context.
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='commit', timeout=30000),
timeout=35.0
)
except Exception:
pass
await asyncio.sleep(2)
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/ais/home',
wait_until='commit', timeout=20000),
timeout=25.0
)
except Exception:
pass
await asyncio.sleep(2)
# Quick API test to verify session validity from same-origin context
test_url = ('https://www.marinetraffic.com/en/reports/?asset_type=vessels'
'&columns=shipname&per_page=1&page=1')
test_rows, test_total = await fetch_page(page, test_url, 1)
if test_rows is not None:
print(f"Cookie session valid! Skipping login.")
cookie_loaded = True
else:
print(f"Cookie session invalid (status {test_total}). Will re-login.")
await ctx.clear_cookies()
except Exception as ck_err:
print(f"Cookie load failed: {ck_err}. Will login fresh.")
# --- Login (skip if cookies loaded successfully) ---
if not cookie_loaded:
if not await do_login(page):
print("LOGIN FAILED!")
await browser.close()
conn.close()
return
# Wait for OAuth auto-redirect to complete (MT redirects oauth/callback → /ais/home)
# This prevents the /data/ navigation from conflicting with the pending redirect.
try:
await page.wait_for_url('**/ais/home**', timeout=15000)
print(f" OAuth redirect complete: {page.url[:60]}", flush=True)
except Exception:
await asyncio.sleep(5) # Wait anyway even if URL match fails
print(f" After redirect wait: {page.url[:60]}", flush=True)
# Save cookies for fast restart
try:
cookies = await ctx.cookies()
with open(COOKIE_FILE, 'w') as f:
json.dump(cookies, f)
print(f"Saved {len(cookies)} session cookies to {COOKIE_FILE}")
except Exception as ck_err:
print(f"Cookie save failed (non-fatal): {ck_err}")
# After OAuth redirect we're already on marinetraffic.com with valid session.
# The reports API fetch() calls work when the browser is on /ais/home.
# In headless=True mode, /ais/home's WebSocket map does NOT consume significant
# resources (no GPU/canvas rendering), so it's safe to stay here.
# DO NOT navigate to /en/reports/... directly — that causes HTTP 403 on subsequent
# fetch() calls (MT server sees it as a direct API hit, not a browser XHR).
# Must visit /data/ before /reports/ API — MT sets a required session token
# (e.g. 'mt-data-access') when the /data/ page loads.
# Must visit /data/ to set the MT session token for the reports API.
# Without this visit, all /en/reports/ requests return 403.
# We navigate with domcontentloaded and wait briefly, then STAY on /data/.
# CDP garbage collection every 15 pages prevents V8 OOM crashes.
_update_heartbeat() # Reset watchdog before /data/ navigation (can take 30s+)
print("\nNavigating to /data/ (required to unlock reports API)...")
try:
# Use asyncio.wait_for() as a fallback in case Playwright's internal
# timeout doesn't fire (known issue when browser is under load).
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='commit', timeout=30000),
timeout=35.0
)
print(f" /data/ commit done", flush=True)
_update_heartbeat() # Reset after commit
except asyncio.TimeoutError:
print(f" /data/ nav asyncio timeout (35s) — continuing anyway", flush=True)
except Exception as nav_err:
print(f" /data/ nav error (non-fatal): {nav_err}", flush=True)
# Save cookies immediately after /data/ commit (before SPA JS executes and crashes renderer)
try:
cookies = await ctx.cookies()
with open(COOKIE_FILE, 'w') as f:
json.dump(cookies, f)
print(f"Saved {len(cookies)} cookies (after /data/ visit)")
except Exception:
pass
# Navigate to MT robots.txt — a plain text file, no JavaScript, no SPA,
# no crash risk. Same origin as marinetraffic.com so fetch() works without CORS.
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/robots.txt',
wait_until='domcontentloaded', timeout=15000),
timeout=18.0
)
print(f" Navigated to robots.txt (lightweight static page)", flush=True)
except Exception as nav_err:
print(f" robots.txt nav error: {nav_err} — continuing anyway", flush=True)
await asyncio.sleep(1)
try:
print(f" Page URL: {page.url[:80]}")
except Exception:
print(f" Page URL: (unavailable)")
# Hardcoded API base URL — confirmed working format
base_url = 'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
api_base = f'{base_url}&columns={API_COLS}&per_page={PER_PAGE}'
print(f" API base: {api_base[:120]}")
# --- Test page 1 (with retry on 403 — session might need extra time) ---
# MT reports API sometimes requires prior visit to /data/ page to unlock.
print(f"\nTesting API (page 1)...")
rows1, total1 = None, 0
data_page_visited = False
for api_attempt in range(6):
rows1, total1 = await fetch_page(page, api_base, 1)
if rows1 is not None:
break
if total1 == 403:
if api_attempt == 2 and not data_page_visited:
# After 2 failed attempts, revisit /data/ with 'commit' to unlock session
print(f" API 403 x{api_attempt+1} — re-visiting /data/ (commit) to unlock...")
try:
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='commit', timeout=30000),
timeout=35.0
)
await asyncio.sleep(3)
await asyncio.wait_for(
page.goto('https://www.marinetraffic.com/robots.txt',
wait_until='domcontentloaded', timeout=15000),
timeout=18.0
)
print(f" /data/ commit + back to robots.txt", flush=True)
data_page_visited = True
except Exception as data_err:
print(f" /data/ nav error: {data_err}")
else:
print(f" API 403 — waiting 15s for session to establish (attempt {api_attempt+1}/6)...")
await asyncio.sleep(15)
else:
print(f" API error (status {total1}) — waiting 5s...")
await asyncio.sleep(5)
if rows1 is None:
print("ERROR: API calls failing after retries. Exiting.")
await browser.close()
conn.close()
return
total_in_mt = total1
est_pages = (total_in_mt + len(rows1) - 1) // max(len(rows1), 1) if rows1 else 0
print(f" Page 1: {len(rows1)} rows | totalCount={total_in_mt} | est_pages={est_pages}")
# Sample row keys
if rows1:
print(f" Row keys: {list(rows1[0].keys())[:15]}")
v = parse_row(rows1[0])
print(f" Sample: {v}")
# --- Main pagination loop ---
total_saved = 0
total_cargo = 0
total_skipped = 0
batch_count = 0
t0 = time.time()
start_page = args.start
max_pages = args.max_pages if args.max_pages else est_pages + 10
consecutive_errors = 0
checkpoint_file = 'mt_green_fleet_progress.json'
# Load checkpoint for resume
if args.start > 1 and os.path.exists(checkpoint_file):
with open(checkpoint_file) as f:
cp = json.load(f)
total_saved = cp.get('total_saved', 0)
total_cargo = cp.get('total_cargo', 0)
print(f" Resuming from page {start_page} | saved={total_saved}")
# If starting from page 1, the test already fetched page 1.
# Process rows1 directly and start the loop from page 2 to avoid double-fetch
# (MT rate-limits duplicate requests immediately and hangs the connection).
if args.start == 1 and rows1:
print(f" Processing test page 1 results directly (no double-fetch)...", flush=True)
for row in rows1:
v = parse_row(row)
if not v:
continue
ship_type = v.get('ship_type', '')
if not is_cargo(ship_type):
total_skipped += 1
continue
total_cargo += 1
sid = v.get('ship_id')
if not sid:
continue
is_new = sid not in existing_ids
existing_ids.add(sid)
st_lower = ship_type.lower()
if 'bulk' in st_lower: gt, cat = '6', 'bulk'
elif 'container' in st_lower: gt, cat = '11', 'container'
elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower: gt, cat = '12', 'roro'
else: gt, cat = '9', 'general'
try:
conn, cur = db_safe_execute(conn, cur, """
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, mmsi, imo, owner, registered_owner, operator,
commercial_manager, scraped_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
name=COALESCE(EXCLUDED.name,mt_bulk_staging.name),
mmsi=COALESCE(EXCLUDED.mmsi,mt_bulk_staging.mmsi),
imo=COALESCE(EXCLUDED.imo,mt_bulk_staging.imo),
owner=COALESCE(EXCLUDED.owner,mt_bulk_staging.owner),
registered_owner=COALESCE(EXCLUDED.registered_owner,mt_bulk_staging.registered_owner),
operator=COALESCE(EXCLUDED.operator,mt_bulk_staging.operator),
commercial_manager=COALESCE(EXCLUDED.commercial_manager,mt_bulk_staging.commercial_manager),
lat=COALESCE(EXCLUDED.lat,mt_bulk_staging.lat),
lon=COALESCE(EXCLUDED.lon,mt_bulk_staging.lon),
scraped_at=NOW()
""", (sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat,
v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'),
v.get('owner'), v.get('registered_owner'), v.get('operator'),
v.get('commercial_manager'),))
if is_new: total_saved += 1
batch_count += 1
except Exception as e:
print(f" DB error (test-page1): {str(e)[:80]}")
# Skip page 1 in the loop since we already processed it
start_page = 2
print(f"\n{'='*60}")
print(f"STARTING SCRAPE: pages {start_page} to {min(max_pages, est_pages)}")
print(f"Delay: {args.delay}s/page | Batch: {BATCH_SIZE}")
print(f"{'='*60}\n")
# Reload interval: DISABLED — reloading the heavy MT SPA page crashes the browser.
# Session cookies are maintained in the browser context, so no reload needed.
# Recovery via recover_page() only when a crash is actually detected.
PAGE_RELOAD_INTERVAL = 999999
async def recover_page(reason="crash"):
"""Close crashed page and open a fresh one, re-establish /data/ session context.
If the browser context itself is dead, raises BrowserRestartNeeded so the outer
loop can restart the whole browser from the checkpoint."""
nonlocal page
print(f" [Recovery] Creating new tab ({reason})...", flush=True)
try:
await page.close()
except Exception:
pass
await asyncio.sleep(5)
try:
page = await ctx.new_page()
except Exception as ctx_err:
# Browser context is dead — escalate to full browser restart
err_s = str(ctx_err)
print(f" [Recovery] BrowserContext dead ({err_s[:80]}), escalating to browser restart...", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
try:
# Navigate to robots.txt — static text, no SPA, same origin as MT.
# fetch() calls to the reports API work from this page without CORS issues.
await page.goto('https://www.marinetraffic.com/robots.txt',
wait_until='domcontentloaded', timeout=15000)
await asyncio.sleep(1)
print(f" [Recovery] New tab ready at robots.txt", flush=True)
return True
except Exception as re_err:
print(f" [Recovery] New tab goto failed: {re_err}", flush=True)
return False
# V8 heap GC every N fetches via CDP — no page navigation needed.
PAGE_REFRESH_EVERY = 15 # run CDP GC every 15 API fetches
for page_num in range(start_page, max_pages + 1):
# Periodic CDP GC to clear V8 heap and prevent Chromium OOM crash.
# Does NOT navigate away — no CORS issues, no SPA reload.
fetch_count = page_num - start_page
if fetch_count > 0 and fetch_count % PAGE_REFRESH_EVERY == 0:
print(f" [HeapFlush] Requesting V8 GC at page {page_num}...", flush=True)
try:
# Request garbage collection via Playwright CDP (DevTools Protocol).
# This frees accumulated V8 heap WITHOUT navigating away from the page.
# No tab crash, no CORS issues, no SPA reload needed.
cdp = await page.context.new_cdp_session(page)
await cdp.send('HeapProfiler.collectGarbage')
await cdp.detach()
print(f" [HeapFlush] GC done", flush=True)
except Exception as flush_err:
print(f" [HeapFlush] GC error: {flush_err} — continuing anyway", flush=True)
# Fetch page
rows, total = await fetch_page(page, api_base, page_num)
if rows is None:
consecutive_errors += 1
is_browser_dead = total == -3 # hard timeout → browser must be restarted
is_crash = total == -2 # tab crash → new tab
is_timeout = total == -1 # JS timeout → new tab
need_new_tab = is_timeout or is_crash
if is_browser_dead:
# Save checkpoint and raise so outer loop restarts browser
print(f" Page {page_num}: BROWSER DEAD — saving checkpoint and restarting browser...", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
if need_new_tab:
reason = 'CRASH' if is_crash else 'TIMEOUT'
print(f" Page {page_num}: {reason} — creating new tab...", flush=True)
ok = await recover_page("tab_crash" if is_crash else "timeout")
if not ok:
# Recovery failed immediately — context dead, escalate to browser restart
print(f" Immediate recovery failed — restarting browser from page {page_num}", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
consecutive_errors = max(consecutive_errors - 1, 0)
continue
print(f" Page {page_num}: FAILED (error #{consecutive_errors})", flush=True)
if consecutive_errors >= 3:
# Create fresh tab after 3 consecutive errors
print(f" {consecutive_errors} consecutive errors — creating fresh tab...", flush=True)
ok = await recover_page(f"errors_{consecutive_errors}")
if ok:
consecutive_errors = 0
else:
# Recovery failed — escalate to full browser restart
print(f" Recovery failed — restarting browser from page {page_num}", flush=True)
with open(checkpoint_file, 'w') as f:
json.dump({'page': page_num, 'total_saved': total_saved,
'total_cargo': total_cargo, 'total_skipped': total_skipped,
'elapsed': time.time() - t0}, f)
try:
await browser.close()
except Exception:
pass
raise BrowserRestartNeeded(page_num)
continue
await asyncio.sleep(3)
continue
consecutive_errors = 0
if not rows:
print(f" Page {page_num}: empty — reached end")
break
# Parse and filter
page_cargo = 0
for row in rows:
v = parse_row(row)
if not v:
continue
# Filter: keep only cargo types
ship_type = v.get('ship_type', '')
if not is_cargo(ship_type):
total_skipped += 1
continue
total_cargo += 1
page_cargo += 1
sid = v.get('ship_id')
if not sid:
continue
is_new = sid not in existing_ids
existing_ids.add(sid)
# Map gt_shiptype code
st_lower = ship_type.lower()
if 'bulk' in st_lower:
gt = '6'
cat = 'bulk'
elif 'container' in st_lower:
gt = '11'
cat = 'container'
elif 'ro-ro' in st_lower or 'roro' in st_lower or 'vehicle' in st_lower:
gt = '12'
cat = 'roro'
else:
gt = '9'
cat = 'general'
try:
conn, cur = db_safe_execute(conn, cur, """
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, mmsi, imo, owner, registered_owner, operator,
commercial_manager, scraped_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
name = COALESCE(EXCLUDED.name, mt_bulk_staging.name),
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
registered_owner = COALESCE(EXCLUDED.registered_owner, mt_bulk_staging.registered_owner),
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
commercial_manager = COALESCE(EXCLUDED.commercial_manager, mt_bulk_staging.commercial_manager),
lat = COALESCE(EXCLUDED.lat, mt_bulk_staging.lat),
lon = COALESCE(EXCLUDED.lon, mt_bulk_staging.lon),
scraped_at = NOW()
""", (
sid, v.get('name'), v.get('flag'), v.get('dwt'), gt, cat,
v.get('lat'), v.get('lon'), v.get('mmsi'), v.get('imo'),
v.get('owner'), v.get('registered_owner'), v.get('operator'),
v.get('commercial_manager'),
))
if is_new:
total_saved += 1
batch_count += 1
except Exception as e:
print(f" DB error ({type(e).__name__}): {str(e)[:80]}")
# Commit batch
if batch_count >= BATCH_SIZE:
conn = db_safe_commit(conn)
cur = conn.cursor()
batch_count = 0
# Progress
elapsed = time.time() - t0
rate = (page_num - start_page + 1) / elapsed if elapsed > 0 else 0
remaining_pages = (est_pages - page_num)
eta_min = remaining_pages / rate / 60 if rate > 0 else 0
_update_heartbeat() # Reset watchdog on each page processed
if page_num <= 20 or page_num % 10 == 0:
print(f" Page {page_num}/{est_pages} | "
f"+{page_cargo} cargo | "
f"total_new={total_saved} | "
f"skipped={total_skipped} | "
f"{elapsed:.0f}s | ETA: {eta_min:.0f}m", flush=True)
# Save checkpoint every 5 pages so restarts (watchdog or browser crash) resume close to current position
if page_num % 5 == 0:
with open(checkpoint_file, 'w') as f:
json.dump({
'page': page_num,
'total_saved': total_saved,
'total_cargo': total_cargo,
'total_skipped': total_skipped,
'elapsed': elapsed,
}, f)
await asyncio.sleep(args.delay)
# Final commit
conn = db_safe_commit(conn)
cur = conn.cursor()
# Final stats
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging")
t, m, o = cur.fetchone()
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6'")
tb, mb, ob = cur.fetchone()
elapsed = time.time() - t0
print(f"\n{'='*60}")
print(f"SCRAPE COMPLETE — {elapsed/60:.1f} minutes")
print(f" Pages done: {page_num - start_page + 1}")
print(f" New vessels: {total_saved}")
print(f" Cargo found: {total_cargo}")
print(f" Non-cargo skipped: {total_skipped}")
print(f"\nGLOBAL: total={t} | mmsi={m} ({m*100//max(t,1)}%) | owner={o} ({o*100//max(t,1)}%)")
print(f"BULK: total={tb} | mmsi={mb} ({mb*100//max(tb,1)}%) | owner={ob} ({ob*100//max(tb,1)}%)")
print(f"{'='*60}")
conn.close()
await browser.close()
import traceback
# Outer restart loop — handles BrowserRestartNeeded by restarting from checkpoint
restart_count = 0
max_restarts = 200 # allow up to 200 browser restarts (plenty for 5752 pages)
while True:
try:
asyncio.run(main())
break # Normal completion
except BrowserRestartNeeded as br:
restart_count += 1
print(f"\n[RESTART #{restart_count}] Browser dead at page {br.resume_page}. Restarting from checkpoint...", flush=True)
if restart_count >= max_restarts:
print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True)
sys.exit(1)
# Patch sys.argv to resume from the checkpoint page
if '--start' in sys.argv:
idx = sys.argv.index('--start')
sys.argv[idx+1] = str(br.resume_page)
else:
sys.argv.extend(['--start', str(br.resume_page)])
time.sleep(10) # Brief pause before restarting browser
continue
except Exception as top_err:
err_s = str(top_err)
# TargetClosedError / context closed → try browser restart from checkpoint
if 'TargetClosed' in type(top_err).__name__ or 'Target page, context or browser has been closed' in err_s or 'browser has been closed' in err_s:
restart_count += 1
print(f"\n[RESTART #{restart_count}] Browser closed unexpectedly: {err_s[:120]}", flush=True)
if restart_count >= max_restarts:
print(f"[RESTART] Max restarts ({max_restarts}) reached. Stopping.", flush=True)
sys.exit(1)
# Try to resume from checkpoint if it exists
try:
import json as _json
import glob as _glob
# Search for the progress checkpoint file used by main()
_chk_files = sorted(
_glob.glob('mt_green_fleet_progress.json') + _glob.glob('mt_green_fleet_checkpoint*.json'),
key=lambda x: -os.path.getmtime(x)
)
if _chk_files:
with open(_chk_files[0]) as _f:
_chk = _json.load(_f)
_resume = _chk.get('page', 1)
print(f"[RESTART] Resuming from checkpoint page {_resume}...", flush=True)
if '--start' in sys.argv:
idx = sys.argv.index('--start')
sys.argv[idx+1] = str(_resume)
else:
sys.argv.extend(['--start', str(_resume)])
except Exception as chk_err:
print(f"[RESTART] Could not read checkpoint: {chk_err}", flush=True)
time.sleep(15)
continue
print(f"\nFATAL ERROR: {type(top_err).__name__}: {top_err}", flush=True)
traceback.print_exc()
sys.exit(1)