349 lines
13 KiB
Python
349 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MT Ownership Scraper — Get Beneficial Owner, Registered Owner, Operator
|
|
for vessels in mt_bulk_staging that already have MMSI.
|
|
|
|
Uses reports endpoint with quicksearch_shipid + ownership columns.
|
|
Fast: ~0.5s per vessel (no page navigation, just fetch() API call).
|
|
|
|
Usage:
|
|
python mt_ownership_scraper.py --probe # Test on 10 vessels
|
|
python mt_ownership_scraper.py --gt6only # Only bulk carriers (GT=6)
|
|
python mt_ownership_scraper.py --limit N # Process N vessels
|
|
python mt_ownership_scraper.py # Process all with MMSI, no owner yet
|
|
"""
|
|
import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse
|
|
import psycopg2
|
|
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
|
if hasattr(sys.stdout, 'reconfigure'):
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
|
if hasattr(sys.stderr, 'reconfigure'):
|
|
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
|
|
|
|
EMAIL = "operation@mrlogisticcorp.com"
|
|
PASSWORD = "NKh9i8Z!7fU9jfi"
|
|
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
|
|
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
|
DELAY = 0.5 # seconds between API calls (fast, no page navigation)
|
|
CKPT_FILE = 'mt_ownership_checkpoint.json'
|
|
BATCH = 200 # commit every N vessels
|
|
|
|
# Reports columns that return ownership data
|
|
OWNERSHIP_COLS = 'shipname,imo,flag,beneficial_owner,operator,registered_owner'
|
|
|
|
|
|
def totp(secret):
|
|
s = secret.upper().replace(' ', '')
|
|
pad = (-len(s)) % 8
|
|
key = base64.b32decode(s + '=' * pad)
|
|
counter = int(time.time()) // 30
|
|
msg = struct.pack('>Q', counter)
|
|
h = hmac.new(key, msg, hashlib.sha1).digest()
|
|
offset = h[-1] & 0x0f
|
|
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
|
|
return str(code % 1000000).zfill(6)
|
|
|
|
|
|
async def do_login(page):
|
|
print("Logging in to MT Pro...")
|
|
await page.goto('https://www.marinetraffic.com/en/users/login',
|
|
wait_until='domcontentloaded', timeout=30000)
|
|
await asyncio.sleep(3)
|
|
await page.fill('input[name="username"]', EMAIL)
|
|
await page.click('button[type="submit"]')
|
|
await asyncio.sleep(3)
|
|
await page.fill('input[type="password"]', PASSWORD)
|
|
await page.click('button[type="submit"]')
|
|
await asyncio.sleep(4)
|
|
if 'mfa' in page.url.lower() or 'auth.kpler' in page.url:
|
|
print(" 2FA required...")
|
|
try:
|
|
await page.click('button:has-text("Google Authenticator")', timeout=5000)
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(2)
|
|
otp = totp(TOTP_SECRET)
|
|
print(f" TOTP: {otp}")
|
|
await page.fill('input[name="code"]', otp)
|
|
await page.click('button[type="submit"]')
|
|
await asyncio.sleep(5)
|
|
logged_in = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
|
|
print(f" Login OK: {logged_in} URL: {page.url}")
|
|
return logged_in
|
|
|
|
|
|
async def fetch_ownership(page, ship_id):
|
|
"""
|
|
Single API call via page.evaluate(fetch()) to get ownership data.
|
|
Uses quicksearch_shipid on reports endpoint with ownership columns.
|
|
Returns dict with BENEFICIAL_OWNER, REGISTERED_OWNER, OPERATOR, etc. or None.
|
|
"""
|
|
url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
|
|
f'&columns={OWNERSHIP_COLS}&quicksearch_shipid={ship_id}')
|
|
js = f"""
|
|
async () => {{
|
|
try {{
|
|
const r = await fetch({json.dumps(url)}, {{
|
|
credentials: 'include',
|
|
cache: 'no-store',
|
|
headers: {{
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
'Accept': 'application/json',
|
|
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
|
}}
|
|
}});
|
|
if (r.status !== 200) return {{error: 'HTTP ' + r.status}};
|
|
const text = await r.text();
|
|
const d = JSON.parse(text);
|
|
const rows = d.data || [];
|
|
if (rows.length === 0) return {{error: 'no rows'}};
|
|
return rows[0];
|
|
}} catch(e) {{
|
|
return {{error: e.message}};
|
|
}}
|
|
}}
|
|
"""
|
|
try:
|
|
result = await page.evaluate(js)
|
|
if result and not result.get('error'):
|
|
return result
|
|
return None
|
|
except Exception as e:
|
|
print(f" evaluate error: {e}")
|
|
return None
|
|
|
|
|
|
def extract_ownership(row):
|
|
"""Extract owner/operator from reports API row."""
|
|
if not row or not isinstance(row, dict):
|
|
return None
|
|
|
|
owner = (row.get('BENEFICIAL_OWNER') or row.get('REGISTERED_OWNER') or
|
|
row.get('MANAGER') or None)
|
|
reg_owner = row.get('REGISTERED_OWNER') or None
|
|
operator = row.get('OPERATOR') or None
|
|
imo = row.get('IMO') or None
|
|
|
|
# Skip if no meaningful data
|
|
if not owner and not operator:
|
|
return None
|
|
|
|
result = {}
|
|
if owner:
|
|
result['owner'] = str(owner).strip()
|
|
if reg_owner and reg_owner != owner:
|
|
result['registered_owner'] = str(reg_owner).strip()
|
|
if operator:
|
|
result['operator'] = str(operator).strip()
|
|
if imo and str(imo) != '0':
|
|
result['imo'] = str(imo).strip()
|
|
return result
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--probe', action='store_true', help='Test on 10 vessels')
|
|
parser.add_argument('--gt6only', action='store_true', help='Only bulk carriers (GT=6)')
|
|
parser.add_argument('--limit', type=int, default=0, help='Max vessels')
|
|
parser.add_argument('--delay', type=float, default=DELAY, help=f'Delay (default {DELAY}s)')
|
|
args = parser.parse_args()
|
|
|
|
# Test DB connection
|
|
try:
|
|
conn = psycopg2.connect(DB_URL, connect_timeout=10)
|
|
cur = conn.cursor()
|
|
cur.execute('SELECT 1')
|
|
print(f"DB connected: {DB_URL.split('@')[1]}")
|
|
except Exception as e:
|
|
print(f"DB ERROR: {e}")
|
|
print("Is SSH tunnel running? ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
|
|
return
|
|
|
|
from playwright.async_api import async_playwright
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(
|
|
headless=False,
|
|
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
|
)
|
|
context = await browser.new_context(
|
|
viewport={'width': 1440, 'height': 900},
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
|
|
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
|
|
)
|
|
page = await context.new_page()
|
|
|
|
if not await do_login(page):
|
|
print("ERROR: Login failed!")
|
|
await browser.close()
|
|
conn.close()
|
|
return
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
# Warm up: navigate to data page (establishes session for reports API)
|
|
print("Warming up session on data page...")
|
|
await page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
|
|
wait_until='load', timeout=40000)
|
|
await asyncio.sleep(5)
|
|
print(f"Session ready: {page.url}")
|
|
|
|
# ── PROBE ──────────────────────────────────────────────────────────
|
|
if args.probe:
|
|
print("\n=== PROBE: Testing ownership fetch for 10 bulk carriers ===\n")
|
|
cur.execute("""
|
|
SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo
|
|
FROM mt_bulk_staging
|
|
WHERE mmsi IS NOT NULL AND gt_shiptype = '6'
|
|
ORDER BY dwt DESC NULLS LAST
|
|
LIMIT 10
|
|
""")
|
|
vessels = cur.fetchall()
|
|
found = 0
|
|
for ship_id, name, gt, dwt, flag, mmsi, imo in vessels:
|
|
print(f"[GT={gt}] {name} ({flag}, DWT={dwt}, SHIP_ID={ship_id})")
|
|
row = await fetch_ownership(page, ship_id)
|
|
if row:
|
|
parsed = extract_ownership(row)
|
|
if parsed:
|
|
found += 1
|
|
print(f" owner={parsed.get('owner')}")
|
|
print(f" reg_owner={parsed.get('registered_owner')}")
|
|
print(f" operator={parsed.get('operator')}")
|
|
else:
|
|
print(f" No ownership fields (row has keys: {list(row.keys())[:8]})")
|
|
else:
|
|
print(f" No data returned")
|
|
await asyncio.sleep(0.3)
|
|
|
|
print(f"\n=== Probe done: {found}/{len(vessels)} have ownership data ===")
|
|
await browser.close()
|
|
conn.close()
|
|
return
|
|
|
|
# ── MAIN SCRAPE ─────────────────────────────────────────────────────
|
|
processed_ids = set()
|
|
if os.path.exists(CKPT_FILE):
|
|
with open(CKPT_FILE) as f:
|
|
cp = json.load(f)
|
|
processed_ids = set(str(x) for x in cp.get('processed', []))
|
|
print(f"Checkpoint: {len(processed_ids)} already processed")
|
|
|
|
conditions = ["mmsi IS NOT NULL", "owner IS NULL"]
|
|
if args.gt6only:
|
|
conditions.append("gt_shiptype = '6'")
|
|
|
|
query = f"""
|
|
SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo
|
|
FROM mt_bulk_staging
|
|
WHERE {' AND '.join(conditions)}
|
|
ORDER BY
|
|
CASE WHEN gt_shiptype = '6' THEN 0 ELSE 1 END,
|
|
dwt DESC NULLS LAST
|
|
"""
|
|
if args.limit:
|
|
query += f" LIMIT {args.limit}"
|
|
|
|
cur.execute(query)
|
|
all_vessels = [(str(r[0]), r[1], r[2], r[3], r[4], r[5], r[6])
|
|
for r in cur.fetchall()
|
|
if str(r[0]) not in processed_ids]
|
|
|
|
total = len(all_vessels)
|
|
eta_sec = total * (args.delay + 0.3)
|
|
print(f"Vessels to process: {total}")
|
|
print(f"Delay: {args.delay}s | ETA: ~{eta_sec/60:.0f} min ({eta_sec/3600:.1f}h)")
|
|
|
|
found_owner = 0
|
|
found_operator = 0
|
|
not_found = 0
|
|
errors = 0
|
|
batch_count = 0
|
|
t0 = time.time()
|
|
|
|
for i, (ship_id, name, gt, dwt, flag, mmsi, imo) in enumerate(all_vessels):
|
|
try:
|
|
row = await fetch_ownership(page, ship_id)
|
|
parsed = extract_ownership(row) if row else None
|
|
except Exception as e:
|
|
errors += 1
|
|
print(f" [{i+1}] ERROR {name}: {e}")
|
|
parsed = None
|
|
|
|
if parsed:
|
|
if parsed.get('owner'):
|
|
found_owner += 1
|
|
if parsed.get('operator'):
|
|
found_operator += 1
|
|
|
|
cur.execute("""
|
|
UPDATE mt_bulk_staging
|
|
SET owner = COALESCE(%s, owner),
|
|
operator = COALESCE(%s, operator),
|
|
registered_owner = COALESCE(%s, registered_owner),
|
|
imo = COALESCE(%s, imo),
|
|
scraped_at = NOW()
|
|
WHERE ship_id = %s
|
|
""", (
|
|
parsed.get('owner'),
|
|
parsed.get('operator'),
|
|
parsed.get('registered_owner'),
|
|
parsed.get('imo') or imo,
|
|
ship_id,
|
|
))
|
|
|
|
# Show every 10th success or first 20
|
|
if i < 20 or (i + 1) % 50 == 0:
|
|
print(f" [{i+1}/{total}] {name} -> {parsed.get('owner','?')[:40]} | {parsed.get('operator','?')[:30]}")
|
|
else:
|
|
not_found += 1
|
|
if i < 20 or (i + 1) % 100 == 0:
|
|
print(f" [{i+1}/{total}] {name} -> no ownership")
|
|
|
|
processed_ids.add(ship_id)
|
|
batch_count += 1
|
|
|
|
if batch_count >= BATCH:
|
|
conn.commit()
|
|
with open(CKPT_FILE, 'w') as f:
|
|
json.dump({'processed': list(processed_ids)}, f)
|
|
elapsed = time.time() - t0
|
|
rate = (i + 1) / elapsed
|
|
remaining = (total - i - 1) / rate if rate > 0 else 0
|
|
print(f"\n=== CHECKPOINT [{i+1}/{total}] {elapsed:.0f}s | "
|
|
f"owners={found_owner} ops={found_operator} "
|
|
f"none={not_found} err={errors} | "
|
|
f"ETA: {remaining/60:.0f}m ===\n")
|
|
batch_count = 0
|
|
|
|
await asyncio.sleep(args.delay)
|
|
|
|
# Final commit
|
|
conn.commit()
|
|
with open(CKPT_FILE, 'w') as f:
|
|
json.dump({'processed': list(processed_ids), 'done': True}, f)
|
|
|
|
# Summary
|
|
cur.execute("SELECT count(owner) FROM mt_bulk_staging WHERE owner IS NOT NULL")
|
|
total_with_owner = cur.fetchone()[0]
|
|
cur.execute("SELECT count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6' AND owner IS NOT NULL")
|
|
bulk_with_owner = cur.fetchone()[0]
|
|
|
|
elapsed = time.time() - t0
|
|
print(f"\n{'='*60}")
|
|
print(f"DONE in {elapsed/60:.1f} minutes!")
|
|
print(f" Processed: {total}")
|
|
print(f" Owner found: {found_owner} ({found_owner*100//max(total,1)}%)")
|
|
print(f" Operator found: {found_operator}")
|
|
print(f" No data: {not_found}")
|
|
print(f" Errors: {errors}")
|
|
print(f" Total with owner in DB: {total_with_owner}")
|
|
print(f" Bulk carriers with owner: {bulk_with_owner}")
|
|
|
|
conn.close()
|
|
await browser.close()
|
|
|
|
|
|
asyncio.run(main())
|