523 lines
20 KiB
Python
523 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MT Reports Scraper — Paginate /en/reports/ to get ALL vessels with MMSI + Ownership
|
|
|
|
Endpoint discovered: https://www.marinetraffic.com/en/reports/?asset_type=vessels&columns=...
|
|
Returns JSON with: SHIP_ID, MMSI, IMO, SHIPNAME, FLAG, LAT, LON, SPEED, COURSE, TYPE_SUMMARY
|
|
+ ownership columns (manager, operator, beneficial_owner, etc.) if available in Pro account
|
|
|
|
Uses page.evaluate(fetch()) from browser context to bypass Cloudflare.
|
|
|
|
Usage:
|
|
python mt_reports_scraper.py # all vessels, auto-paginate
|
|
python mt_reports_scraper.py --probe # just discover fields + pagination
|
|
python mt_reports_scraper.py --limit 500 # stop after 500 vessels
|
|
python mt_reports_scraper.py --type 6 # bulk carriers only
|
|
"""
|
|
import asyncio, json, sys, os, time, re, struct, hmac, hashlib, base64, argparse
|
|
import psycopg2
|
|
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
|
if hasattr(sys.stdout, 'reconfigure'):
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
|
if hasattr(sys.stderr, 'reconfigure'):
|
|
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
|
|
|
|
EMAIL = "operation@mrlogisticcorp.com"
|
|
PASSWORD = "NKh9i8Z!7fU9jfi"
|
|
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
|
|
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
|
|
|
# The columns we want from MT reports
|
|
# Ownership columns: manager, operator - these require MT Pro Ownership service
|
|
BASE_COLUMNS = (
|
|
'flag,shipname,imo,ship_type,time_of_latest_position,'
|
|
'lat_of_latest_position,lon_of_latest_position,'
|
|
'speed,course,reported_destination'
|
|
)
|
|
OWNERSHIP_COLUMNS = 'manager,operator' # MT Pro ownership fields
|
|
ALL_COLUMNS = BASE_COLUMNS + ',' + OWNERSHIP_COLUMNS
|
|
|
|
BATCH_SIZE = 500 # commit every N rows
|
|
PAGE_DELAY = 2.0 # seconds between pages
|
|
|
|
# Checkpoint
|
|
CKPT_FILE = 'mt_reports_checkpoint.json'
|
|
|
|
|
|
def totp(secret):
|
|
s = secret.upper().replace(' ', '')
|
|
pad = (-len(s)) % 8
|
|
key = base64.b32decode(s + '=' * pad)
|
|
counter = int(time.time()) // 30
|
|
msg = struct.pack('>Q', counter)
|
|
h = hmac.new(key, msg, hashlib.sha1).digest()
|
|
offset = h[-1] & 0x0f
|
|
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
|
|
return str(code % 1000000).zfill(6)
|
|
|
|
|
|
async def do_login(page):
|
|
print("Login to MT Pro...")
|
|
await page.goto('https://www.marinetraffic.com/en/users/login',
|
|
wait_until='domcontentloaded', timeout=30000)
|
|
await asyncio.sleep(3)
|
|
await page.fill('input[name="username"]', EMAIL)
|
|
await page.click('button[type="submit"]')
|
|
await asyncio.sleep(3)
|
|
await page.fill('input[type="password"]', PASSWORD)
|
|
await page.click('button[type="submit"]')
|
|
await asyncio.sleep(4)
|
|
if 'mfa-login-options' in page.url or 'mfa' in page.url.lower():
|
|
print(" 2FA: Google Authenticator...")
|
|
try:
|
|
await page.click('button:has-text("Google Authenticator")', timeout=5000)
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(2)
|
|
otp = totp(TOTP_SECRET)
|
|
print(f" TOTP: {otp}")
|
|
await page.fill('input[name="code"]', otp)
|
|
await page.click('button[type="submit"]')
|
|
await asyncio.sleep(5)
|
|
logged_in = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
|
|
print(f" Logged in: {logged_in} URL: {page.url}")
|
|
return logged_in
|
|
|
|
|
|
async def fetch_reports_page(page, columns, vessel_type=None, page_num=1,
|
|
page_size=100, extra_filters=''):
|
|
"""
|
|
Fetch one page of vessel data from /en/reports/ via page.evaluate(fetch()).
|
|
Returns parsed JSON or None on error.
|
|
"""
|
|
url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
|
|
f'&columns={columns}')
|
|
if vessel_type:
|
|
url += f'&typefilter={vessel_type}'
|
|
url += f'&page={page_num}&pageSize={page_size}'
|
|
if extra_filters:
|
|
url += f'&{extra_filters}'
|
|
|
|
js_code = f"""
|
|
async () => {{
|
|
try {{
|
|
const resp = await fetch({json.dumps(url)}, {{
|
|
credentials: 'include',
|
|
headers: {{
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
'Accept': 'application/json, text/javascript, */*; q=0.01',
|
|
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
}}
|
|
}});
|
|
const text = await resp.text();
|
|
return {{status: resp.status, url: {json.dumps(url)}, body: text}};
|
|
}} catch(e) {{
|
|
return {{status: 0, url: {json.dumps(url)}, error: e.message}};
|
|
}}
|
|
}}
|
|
"""
|
|
try:
|
|
result = await page.evaluate(js_code)
|
|
return result
|
|
except Exception as e:
|
|
print(f" evaluate error page {page_num}: {e}")
|
|
return None
|
|
|
|
|
|
async def probe_pagination(page):
|
|
"""Try different pagination approaches to find what works."""
|
|
print("\n=== Probing pagination ===")
|
|
|
|
# Try different URL param formats for pagination
|
|
# NOTE: no typefilter to get any vessel data
|
|
# DataTables.js format: draw=N&start=N&length=N is very common
|
|
param_tests = [
|
|
'page=1&pageSize=100',
|
|
'page=2&pageSize=100',
|
|
'draw=1&start=0&length=100',
|
|
'draw=2&start=100&length=100',
|
|
'draw=3&start=200&length=100',
|
|
]
|
|
|
|
for params in param_tests:
|
|
url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
|
|
f'&columns={BASE_COLUMNS}&{params}')
|
|
js = f"""
|
|
async () => {{
|
|
const resp = await fetch({json.dumps(url)}, {{
|
|
credentials: 'include',
|
|
headers: {{
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
'Accept': 'application/json',
|
|
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
|
}}
|
|
}});
|
|
const text = await resp.text();
|
|
// Return full body so we can see total count and all rows
|
|
try {{
|
|
const parsed = JSON.parse(text);
|
|
return {{
|
|
status: resp.status,
|
|
total: parsed.total || parsed.totalCount || parsed.recordsTotal || '?',
|
|
rows: (parsed.data || []).length,
|
|
firstShip: (parsed.data || [])[0] ? (parsed.data[0].SHIPNAME || '') : '',
|
|
keys: Object.keys((parsed.data || [])[0] || {{}}),
|
|
raw: text.substring(0, 300),
|
|
}};
|
|
}} catch(e) {{
|
|
return {{status: resp.status, error: e.message, raw: text.substring(0, 300)}};
|
|
}}
|
|
}}
|
|
"""
|
|
try:
|
|
r = await page.evaluate(js)
|
|
status = r.get('status', 0)
|
|
print(f" {params}: status={status} rows={r.get('rows','?')} "
|
|
f"total={r.get('total','?')} first={r.get('firstShip','?')}")
|
|
if r.get('keys'):
|
|
print(f" Keys: {r['keys']}")
|
|
if r.get('error'):
|
|
print(f" Error: {r['error']}")
|
|
print(f" Raw: {r.get('raw','')[:200]}")
|
|
except Exception as e:
|
|
print(f" {params}: error {e}")
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
async def probe_ownership_columns(page):
|
|
"""Try to fetch ownership columns and see what's returned."""
|
|
print("\n=== Probing ownership columns ===")
|
|
|
|
ownership_variants = [
|
|
'manager',
|
|
'operator',
|
|
'beneficial_owner',
|
|
'registered_owner',
|
|
'commercial_manager',
|
|
'charterer',
|
|
'manager,operator',
|
|
'manager,operator,beneficial_owner',
|
|
]
|
|
|
|
for cols in ownership_variants:
|
|
url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
|
|
f'&columns={BASE_COLUMNS},{cols}&typefilter=6&page=1&pageSize=10')
|
|
js = f"""
|
|
async () => {{
|
|
const resp = await fetch({json.dumps(url)}, {{
|
|
credentials: 'include',
|
|
headers: {{
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
'Accept': 'application/json',
|
|
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
|
}}
|
|
}});
|
|
const text = await resp.text();
|
|
return {{status: resp.status, body: text.substring(0, 2000)}};
|
|
}}
|
|
"""
|
|
try:
|
|
r = await page.evaluate(js)
|
|
status = r.get('status', 0)
|
|
body = r.get('body', '')
|
|
if status == 200 and body.startswith('{'):
|
|
parsed = json.loads(r.get('body', body))
|
|
data = parsed.get('data', [])
|
|
if data:
|
|
keys = list(data[0].keys())
|
|
# Check if any ownership field is present
|
|
own_keys = [k for k in keys if any(x in k.upper()
|
|
for x in ['OWNER', 'OPERATOR', 'MANAGER', 'CHARTER'])]
|
|
print(f" cols={cols}: {len(data)} rows, own_keys={own_keys}")
|
|
if own_keys:
|
|
print(f" Sample: {{{k}: {data[0].get(k)} for k in own_keys}}")
|
|
else:
|
|
print(f" cols={cols}: {status} no data rows")
|
|
else:
|
|
print(f" cols={cols}: {status} -> {body[:100]}")
|
|
except Exception as e:
|
|
print(f" cols={cols}: error {e}")
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
def parse_vessel_row(row):
|
|
"""Extract standardized vessel data from MT reports row."""
|
|
if not isinstance(row, dict):
|
|
return {}
|
|
r = {}
|
|
# Identity
|
|
for k in ['MMSI']: r['mmsi'] = str(row[k]) if row.get(k) else None
|
|
for k in ['IMO']: r['imo'] = str(row[k]) if row.get(k) else None
|
|
for k in ['SHIP_ID']: r['ship_id'] = str(row[k]) if row.get(k) else None
|
|
r['name'] = row.get('SHIPNAME') or row.get('NAME') or ''
|
|
# Flag
|
|
r['flag'] = row.get('CODE2') or row.get('FLAG') or row.get('COUNTRY') or ''
|
|
# Type
|
|
r['gt_shiptype'] = str(row.get('TYPE_COLOR') or row.get('TYPE_ID') or '')
|
|
r['shiptype'] = row.get('TYPE_SUMMARY') or ''
|
|
# DWT
|
|
for k in ['DWT', 'DEADWEIGHT']:
|
|
if row.get(k):
|
|
try: r['dwt'] = int(row[k])
|
|
except Exception: pass
|
|
break
|
|
# Position
|
|
for k in ['LAT', 'lat_of_latest_position']:
|
|
if row.get(k):
|
|
try: r['lat'] = float(row[k])
|
|
except Exception: pass
|
|
break
|
|
for k in ['LON', 'lon_of_latest_position']:
|
|
if row.get(k):
|
|
try: r['lon'] = float(row[k])
|
|
except Exception: pass
|
|
break
|
|
for k in ['SPEED']:
|
|
if row.get(k):
|
|
try: r['speed'] = float(row[k])
|
|
except Exception: pass
|
|
break
|
|
for k in ['COURSE']:
|
|
if row.get(k):
|
|
try: r['course'] = float(row[k])
|
|
except Exception: pass
|
|
break
|
|
r['destination'] = row.get('DESTINATION') or row.get('reported_destination') or ''
|
|
# Ownership — try various column name formats
|
|
for k in ['MANAGER', 'manager', 'COMMERCIAL_MANAGER', 'BENEFICIAL_OWNER']:
|
|
if row.get(k):
|
|
r['owner'] = str(row[k])
|
|
break
|
|
for k in ['OPERATOR', 'operator', 'CHARTERER']:
|
|
if row.get(k):
|
|
r['operator'] = str(row[k])
|
|
break
|
|
return r
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--probe', action='store_true', help='Probe pagination + ownership columns')
|
|
parser.add_argument('--type', type=int, default=0, help='Vessel type filter (e.g. 6=bulk)')
|
|
parser.add_argument('--limit', type=int, default=0, help='Max vessels to collect (0=all)')
|
|
parser.add_argument('--page_size', type=int, default=100, help='Rows per page (default 100)')
|
|
parser.add_argument('--max_pages', type=int, default=0, help='Max pages (0=all)')
|
|
parser.add_argument('--columns', type=str, default=ALL_COLUMNS, help='Columns to fetch')
|
|
args = parser.parse_args()
|
|
|
|
conn = None
|
|
cur = None
|
|
if not args.probe:
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
from playwright.async_api import async_playwright
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=False,
|
|
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
|
)
|
|
context = await browser.new_context(
|
|
viewport={'width': 1440, 'height': 900},
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
|
|
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
|
)
|
|
page = await context.new_page()
|
|
|
|
# ---- Login ----
|
|
logged_in = await do_login(page)
|
|
if not logged_in:
|
|
print("ERROR: Login failed!")
|
|
await browser.close()
|
|
conn.close()
|
|
return
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
# ---- Load data page to establish session ----
|
|
data_url = 'https://www.marinetraffic.com/en/data/?asset_type=vessels'
|
|
print(f"\nLoading data page: {data_url}")
|
|
await page.goto(data_url, wait_until='load', timeout=40000)
|
|
await asyncio.sleep(5)
|
|
print(f" Data page loaded: {page.url}")
|
|
|
|
# ---- PROBE MODE ----
|
|
if args.probe:
|
|
await probe_pagination(page)
|
|
await probe_ownership_columns(page)
|
|
await browser.close()
|
|
conn.close()
|
|
return
|
|
|
|
# ---- SCRAPE MODE ----
|
|
vessel_type = args.type if args.type else None
|
|
page_size = args.page_size
|
|
max_pages = args.max_pages
|
|
limit = args.limit
|
|
columns = args.columns
|
|
|
|
total_collected = 0
|
|
total_pages = 0
|
|
current_page = 1
|
|
all_vessels = {}
|
|
|
|
print(f"\nStarting scrape: type={vessel_type}, pageSize={page_size}, "
|
|
f"max_pages={max_pages}, limit={limit}")
|
|
|
|
while True:
|
|
result = await fetch_reports_page(
|
|
page, columns, vessel_type, current_page, page_size)
|
|
|
|
if not result:
|
|
print(f" Page {current_page}: no result, stopping")
|
|
break
|
|
|
|
status = result.get('status', 0)
|
|
body = result.get('body', '')
|
|
|
|
if status != 200:
|
|
print(f" Page {current_page}: status={status}, stopping")
|
|
if body:
|
|
print(f" Body: {body[:200]}")
|
|
break
|
|
|
|
if not body.startswith('{'):
|
|
print(f" Page {current_page}: non-JSON response, stopping")
|
|
print(f" Body: {body[:200]}")
|
|
break
|
|
|
|
try:
|
|
parsed = json.loads(body)
|
|
except Exception as e:
|
|
print(f" Page {current_page}: parse error {e}")
|
|
break
|
|
|
|
rows = parsed.get('data', [])
|
|
total_count = (parsed.get('total') or parsed.get('totalCount') or
|
|
parsed.get('count') or 0)
|
|
|
|
if not rows:
|
|
print(f" Page {current_page}: empty data, stopping")
|
|
break
|
|
|
|
# Process rows
|
|
new_this_page = 0
|
|
for row in rows:
|
|
v = parse_vessel_row(row)
|
|
key = v.get('mmsi') or v.get('ship_id') or v.get('name')
|
|
if key and key not in all_vessels:
|
|
all_vessels[key] = v
|
|
new_this_page += 1
|
|
total_collected += 1
|
|
|
|
print(f" Page {current_page}: {len(rows)} rows, {new_this_page} new, "
|
|
f"total={total_count}, collected={total_collected}")
|
|
|
|
# Sample first page
|
|
if current_page == 1 and rows:
|
|
print(f" Columns: {list(rows[0].keys())}")
|
|
print(f" Sample: {json.dumps(rows[0])[:300]}")
|
|
|
|
# Commit batch to DB
|
|
if total_collected % BATCH_SIZE == 0 and total_collected > 0:
|
|
_upsert_vessels(cur, list(all_vessels.values()))
|
|
conn.commit()
|
|
all_vessels.clear()
|
|
print(f" Committed batch, total in DB now...")
|
|
|
|
total_pages += 1
|
|
|
|
# Stop conditions
|
|
if limit and total_collected >= limit:
|
|
print(f" Reached limit {limit}, stopping")
|
|
break
|
|
if max_pages and total_pages >= max_pages:
|
|
print(f" Reached max_pages {max_pages}, stopping")
|
|
break
|
|
if total_count and total_collected >= total_count:
|
|
print(f" Collected all {total_count} vessels, stopping")
|
|
break
|
|
if len(rows) < page_size:
|
|
print(f" Last page (fewer than pageSize rows), stopping")
|
|
break
|
|
|
|
current_page += 1
|
|
await asyncio.sleep(PAGE_DELAY)
|
|
|
|
# Final commit
|
|
if all_vessels:
|
|
_upsert_vessels(cur, list(all_vessels.values()))
|
|
conn.commit()
|
|
|
|
# Final stats
|
|
cur.execute('SELECT count(*) FROM mt_bulk_staging')
|
|
total_in_db = cur.fetchone()[0]
|
|
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL")
|
|
with_mmsi = cur.fetchone()[0]
|
|
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE owner IS NOT NULL")
|
|
with_owner = cur.fetchone()[0]
|
|
|
|
print(f"\n=== DONE ===")
|
|
print(f" Collected this run: {total_collected}")
|
|
print(f" Pages processed: {total_pages}")
|
|
print(f" mt_bulk_staging total: {total_in_db}")
|
|
print(f" With MMSI: {with_mmsi}")
|
|
print(f" With owner: {with_owner}")
|
|
|
|
conn.close()
|
|
await browser.close()
|
|
print("\nReports scraper complete!")
|
|
|
|
|
|
def _upsert_vessels(cur, vessels):
|
|
"""Upsert list of vessel dicts into mt_bulk_staging."""
|
|
inserted = 0
|
|
for v in vessels:
|
|
ship_id = v.get('ship_id') or v.get('mmsi') or v.get('name', '')[:20]
|
|
if not ship_id:
|
|
continue
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO mt_bulk_staging
|
|
(ship_id, name, flag, dwt, gt_shiptype, type_category,
|
|
lat, lon, speed, course, destination, mmsi, imo, owner, operator,
|
|
scraped_at)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
|
|
ON CONFLICT (ship_id) DO UPDATE SET
|
|
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
|
|
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
|
|
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
|
|
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
|
|
lat = COALESCE(EXCLUDED.lat, mt_bulk_staging.lat),
|
|
lon = COALESCE(EXCLUDED.lon, mt_bulk_staging.lon),
|
|
flag = COALESCE(EXCLUDED.flag, mt_bulk_staging.flag),
|
|
name = COALESCE(EXCLUDED.name, mt_bulk_staging.name),
|
|
scraped_at = NOW()
|
|
""", (
|
|
ship_id,
|
|
v.get('name'),
|
|
v.get('flag'),
|
|
v.get('dwt'),
|
|
v.get('gt_shiptype'),
|
|
'bulk' if v.get('gt_shiptype') == '6' else 'general',
|
|
v.get('lat'),
|
|
v.get('lon'),
|
|
v.get('speed'),
|
|
v.get('course'),
|
|
v.get('destination'),
|
|
v.get('mmsi'),
|
|
v.get('imo'),
|
|
v.get('owner'),
|
|
v.get('operator'),
|
|
))
|
|
inserted += 1
|
|
except Exception as e:
|
|
print(f" DB error for {ship_id}: {e}")
|
|
return inserted
|
|
|
|
|
|
asyncio.run(main())
|