montana/Русский/Логистика/mt_ownership_scraper.py

349 lines
13 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
MT Ownership Scraper Get Beneficial Owner, Registered Owner, Operator
for vessels in mt_bulk_staging that already have MMSI.
Uses reports endpoint with quicksearch_shipid + ownership columns.
Fast: ~0.5s per vessel (no page navigation, just fetch() API call).
Usage:
python mt_ownership_scraper.py --probe # Test on 10 vessels
python mt_ownership_scraper.py --gt6only # Only bulk carriers (GT=6)
python mt_ownership_scraper.py --limit N # Process N vessels
python mt_ownership_scraper.py # Process all with MMSI, no owner yet
"""
import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse
import psycopg2
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
EMAIL = "operation@mrlogisticcorp.com"
PASSWORD = "NKh9i8Z!7fU9jfi"
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
DELAY = 0.5 # seconds between API calls (fast, no page navigation)
CKPT_FILE = 'mt_ownership_checkpoint.json'
BATCH = 200 # commit every N vessels
# Reports columns that return ownership data
OWNERSHIP_COLS = 'shipname,imo,flag,beneficial_owner,operator,registered_owner'
def totp(secret):
s = secret.upper().replace(' ', '')
pad = (-len(s)) % 8
key = base64.b32decode(s + '=' * pad)
counter = int(time.time()) // 30
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
return str(code % 1000000).zfill(6)
async def do_login(page):
print("Logging in to MT Pro...")
await page.goto('https://www.marinetraffic.com/en/users/login',
wait_until='domcontentloaded', timeout=30000)
await asyncio.sleep(3)
await page.fill('input[name="username"]', EMAIL)
await page.click('button[type="submit"]')
await asyncio.sleep(3)
await page.fill('input[type="password"]', PASSWORD)
await page.click('button[type="submit"]')
await asyncio.sleep(4)
if 'mfa' in page.url.lower() or 'auth.kpler' in page.url:
print(" 2FA required...")
try:
await page.click('button:has-text("Google Authenticator")', timeout=5000)
except Exception:
pass
await asyncio.sleep(2)
otp = totp(TOTP_SECRET)
print(f" TOTP: {otp}")
await page.fill('input[name="code"]', otp)
await page.click('button[type="submit"]')
await asyncio.sleep(5)
logged_in = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
print(f" Login OK: {logged_in} URL: {page.url}")
return logged_in
async def fetch_ownership(page, ship_id):
"""
Single API call via page.evaluate(fetch()) to get ownership data.
Uses quicksearch_shipid on reports endpoint with ownership columns.
Returns dict with BENEFICIAL_OWNER, REGISTERED_OWNER, OPERATOR, etc. or None.
"""
url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
f'&columns={OWNERSHIP_COLS}&quicksearch_shipid={ship_id}')
js = f"""
async () => {{
try {{
const r = await fetch({json.dumps(url)}, {{
credentials: 'include',
cache: 'no-store',
headers: {{
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json',
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
}}
}});
if (r.status !== 200) return {{error: 'HTTP ' + r.status}};
const text = await r.text();
const d = JSON.parse(text);
const rows = d.data || [];
if (rows.length === 0) return {{error: 'no rows'}};
return rows[0];
}} catch(e) {{
return {{error: e.message}};
}}
}}
"""
try:
result = await page.evaluate(js)
if result and not result.get('error'):
return result
return None
except Exception as e:
print(f" evaluate error: {e}")
return None
def extract_ownership(row):
"""Extract owner/operator from reports API row."""
if not row or not isinstance(row, dict):
return None
owner = (row.get('BENEFICIAL_OWNER') or row.get('REGISTERED_OWNER') or
row.get('MANAGER') or None)
reg_owner = row.get('REGISTERED_OWNER') or None
operator = row.get('OPERATOR') or None
imo = row.get('IMO') or None
# Skip if no meaningful data
if not owner and not operator:
return None
result = {}
if owner:
result['owner'] = str(owner).strip()
if reg_owner and reg_owner != owner:
result['registered_owner'] = str(reg_owner).strip()
if operator:
result['operator'] = str(operator).strip()
if imo and str(imo) != '0':
result['imo'] = str(imo).strip()
return result
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--probe', action='store_true', help='Test on 10 vessels')
parser.add_argument('--gt6only', action='store_true', help='Only bulk carriers (GT=6)')
parser.add_argument('--limit', type=int, default=0, help='Max vessels')
parser.add_argument('--delay', type=float, default=DELAY, help=f'Delay (default {DELAY}s)')
args = parser.parse_args()
# Test DB connection
try:
conn = psycopg2.connect(DB_URL, connect_timeout=10)
cur = conn.cursor()
cur.execute('SELECT 1')
print(f"DB connected: {DB_URL.split('@')[1]}")
except Exception as e:
print(f"DB ERROR: {e}")
print("Is SSH tunnel running? ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
return
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False,
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
)
context = await browser.new_context(
viewport={'width': 1440, 'height': 900},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
)
page = await context.new_page()
if not await do_login(page):
print("ERROR: Login failed!")
await browser.close()
conn.close()
return
await asyncio.sleep(3)
# Warm up: navigate to data page (establishes session for reports API)
print("Warming up session on data page...")
await page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='load', timeout=40000)
await asyncio.sleep(5)
print(f"Session ready: {page.url}")
# ── PROBE ──────────────────────────────────────────────────────────
if args.probe:
print("\n=== PROBE: Testing ownership fetch for 10 bulk carriers ===\n")
cur.execute("""
SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo
FROM mt_bulk_staging
WHERE mmsi IS NOT NULL AND gt_shiptype = '6'
ORDER BY dwt DESC NULLS LAST
LIMIT 10
""")
vessels = cur.fetchall()
found = 0
for ship_id, name, gt, dwt, flag, mmsi, imo in vessels:
print(f"[GT={gt}] {name} ({flag}, DWT={dwt}, SHIP_ID={ship_id})")
row = await fetch_ownership(page, ship_id)
if row:
parsed = extract_ownership(row)
if parsed:
found += 1
print(f" owner={parsed.get('owner')}")
print(f" reg_owner={parsed.get('registered_owner')}")
print(f" operator={parsed.get('operator')}")
else:
print(f" No ownership fields (row has keys: {list(row.keys())[:8]})")
else:
print(f" No data returned")
await asyncio.sleep(0.3)
print(f"\n=== Probe done: {found}/{len(vessels)} have ownership data ===")
await browser.close()
conn.close()
return
# ── MAIN SCRAPE ─────────────────────────────────────────────────────
processed_ids = set()
if os.path.exists(CKPT_FILE):
with open(CKPT_FILE) as f:
cp = json.load(f)
processed_ids = set(str(x) for x in cp.get('processed', []))
print(f"Checkpoint: {len(processed_ids)} already processed")
conditions = ["mmsi IS NOT NULL", "owner IS NULL"]
if args.gt6only:
conditions.append("gt_shiptype = '6'")
query = f"""
SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo
FROM mt_bulk_staging
WHERE {' AND '.join(conditions)}
ORDER BY
CASE WHEN gt_shiptype = '6' THEN 0 ELSE 1 END,
dwt DESC NULLS LAST
"""
if args.limit:
query += f" LIMIT {args.limit}"
cur.execute(query)
all_vessels = [(str(r[0]), r[1], r[2], r[3], r[4], r[5], r[6])
for r in cur.fetchall()
if str(r[0]) not in processed_ids]
total = len(all_vessels)
eta_sec = total * (args.delay + 0.3)
print(f"Vessels to process: {total}")
print(f"Delay: {args.delay}s | ETA: ~{eta_sec/60:.0f} min ({eta_sec/3600:.1f}h)")
found_owner = 0
found_operator = 0
not_found = 0
errors = 0
batch_count = 0
t0 = time.time()
for i, (ship_id, name, gt, dwt, flag, mmsi, imo) in enumerate(all_vessels):
try:
row = await fetch_ownership(page, ship_id)
parsed = extract_ownership(row) if row else None
except Exception as e:
errors += 1
print(f" [{i+1}] ERROR {name}: {e}")
parsed = None
if parsed:
if parsed.get('owner'):
found_owner += 1
if parsed.get('operator'):
found_operator += 1
cur.execute("""
UPDATE mt_bulk_staging
SET owner = COALESCE(%s, owner),
operator = COALESCE(%s, operator),
registered_owner = COALESCE(%s, registered_owner),
imo = COALESCE(%s, imo),
scraped_at = NOW()
WHERE ship_id = %s
""", (
parsed.get('owner'),
parsed.get('operator'),
parsed.get('registered_owner'),
parsed.get('imo') or imo,
ship_id,
))
# Show every 10th success or first 20
if i < 20 or (i + 1) % 50 == 0:
print(f" [{i+1}/{total}] {name} -> {parsed.get('owner','?')[:40]} | {parsed.get('operator','?')[:30]}")
else:
not_found += 1
if i < 20 or (i + 1) % 100 == 0:
print(f" [{i+1}/{total}] {name} -> no ownership")
processed_ids.add(ship_id)
batch_count += 1
if batch_count >= BATCH:
conn.commit()
with open(CKPT_FILE, 'w') as f:
json.dump({'processed': list(processed_ids)}, f)
elapsed = time.time() - t0
rate = (i + 1) / elapsed
remaining = (total - i - 1) / rate if rate > 0 else 0
print(f"\n=== CHECKPOINT [{i+1}/{total}] {elapsed:.0f}s | "
f"owners={found_owner} ops={found_operator} "
f"none={not_found} err={errors} | "
f"ETA: {remaining/60:.0f}m ===\n")
batch_count = 0
await asyncio.sleep(args.delay)
# Final commit
conn.commit()
with open(CKPT_FILE, 'w') as f:
json.dump({'processed': list(processed_ids), 'done': True}, f)
# Summary
cur.execute("SELECT count(owner) FROM mt_bulk_staging WHERE owner IS NOT NULL")
total_with_owner = cur.fetchone()[0]
cur.execute("SELECT count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6' AND owner IS NOT NULL")
bulk_with_owner = cur.fetchone()[0]
elapsed = time.time() - t0
print(f"\n{'='*60}")
print(f"DONE in {elapsed/60:.1f} minutes!")
print(f" Processed: {total}")
print(f" Owner found: {found_owner} ({found_owner*100//max(total,1)}%)")
print(f" Operator found: {found_operator}")
print(f" No data: {not_found}")
print(f" Errors: {errors}")
print(f" Total with owner in DB: {total_with_owner}")
print(f" Bulk carriers with owner: {bulk_with_owner}")
conn.close()
await browser.close()
asyncio.run(main())