montana/Русский/Логистика/mt_final_sweep.py

384 lines
14 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
MT Final Sweep Get MMSI + Ownership in ONE query per vessel.
For all vessels in mt_bulk_staging that still need MMSI or ownership data.
Uses quicksearch_shipid with columns: shipname,imo,mmsi,flag,beneficial_owner,operator,registered_owner
This combines the MMSI lookup + ownership fetch into a single API call per vessel.
~0.5s per vessel = ~2,340 vessels in ~20 minutes.
Usage:
python mt_final_sweep.py # Process all needing data
python mt_final_sweep.py --probe # Test on 10 vessels
python mt_final_sweep.py --limit 100 # Process 100 vessels
"""
import asyncio, json, sys, os, time, struct, hmac, hashlib, base64, argparse
import psycopg2
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
EMAIL = "operation@mrlogisticcorp.com"
PASSWORD = "NKh9i8Z!7fU9jfi"
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
DELAY = 0.5
BATCH = 200
# Combined columns: get MMSI + all ownership fields in one query
COMBINED_COLS = 'shipname,imo,mmsi,flag,beneficial_owner,operator,registered_owner,commercial_manager'
def totp(secret):
s = secret.upper().replace(' ', '')
pad = (-len(s)) % 8
key = base64.b32decode(s + '=' * pad)
counter = int(time.time()) // 30
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff
return str(code % 1000000).zfill(6)
def db_connect():
return psycopg2.connect(
DB_URL, connect_timeout=15,
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5
)
def _restart_ssh_tunnel():
import subprocess
try:
subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'],
capture_output=True, timeout=5)
except Exception:
pass
time.sleep(2)
try:
subprocess.Popen(
['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120',
'-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no',
'-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
print(f" [SSH] Tunnel restarted, waiting 5s...")
time.sleep(5)
except Exception as e:
print(f" [SSH] Failed to restart: {e}")
def db_reconnect(conn):
try:
conn.close()
except Exception:
pass
for attempt in range(5):
try:
time.sleep(3)
new_conn = db_connect()
print(f" [DB] Reconnected (attempt {attempt+1})")
return new_conn, new_conn.cursor()
except Exception as e:
print(f" [DB] Reconnect attempt {attempt+1} failed: {e}")
print(f" [DB] Restarting SSH tunnel...")
_restart_ssh_tunnel()
for attempt in range(10):
try:
time.sleep(5)
new_conn = db_connect()
print(f" [DB] Reconnected after tunnel restart (attempt {attempt+1})")
return new_conn, new_conn.cursor()
except Exception as e:
print(f" [DB] Post-restart attempt {attempt+1} failed: {e}")
if attempt == 4:
_restart_ssh_tunnel()
raise Exception("DB reconnect failed")
def db_safe_execute(conn, cur, query, params=None):
try:
cur.execute(query, params)
return conn, cur
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
print(f" [DB] Connection lost on execute ({e}), reconnecting...")
conn, cur = db_reconnect(conn)
cur.execute(query, params)
return conn, cur
def db_safe_commit(conn):
try:
conn.commit()
return conn
except (psycopg2.InterfaceError, psycopg2.OperationalError):
print(f" [DB] Connection lost on commit, reconnecting...")
conn, _ = db_reconnect(conn)
return conn
async def do_login(page, max_retries=2):
for attempt in range(max_retries):
print(f"LOGIN (attempt {attempt+1}/{max_retries})...")
await page.goto('https://www.marinetraffic.com/en/users/login',
wait_until='domcontentloaded', timeout=30000)
await asyncio.sleep(3)
await page.fill('input[name="username"]', EMAIL)
await page.click('button[type="submit"]')
await asyncio.sleep(3)
await page.fill('input[type="password"]', PASSWORD)
await page.click('button[type="submit"]')
await asyncio.sleep(4)
if 'mfa' in page.url.lower() or 'auth.kpler' in page.url:
try:
await page.click('button:has-text("Google Authenticator")', timeout=3000)
await asyncio.sleep(2)
except Exception:
pass
await asyncio.sleep(1)
otp = totp(TOTP_SECRET)
print(f" TOTP: {otp}")
try:
await page.fill('input[name="code"]', otp)
await page.click('button[type="submit"]')
except Exception:
try:
await page.fill('input[type="text"]', otp)
await page.click('button[type="submit"]')
except Exception as e:
print(f" 2FA error: {e}")
await asyncio.sleep(8)
ok = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
if ok:
print(f" Login: OK")
return True
print(f" Login attempt {attempt+1} failed | {page.url[:80]}")
if attempt < max_retries - 1:
await asyncio.sleep(5)
return False
async def fetch_combined(page, ship_id):
"""Get MMSI + ownership in one API call using quicksearch_shipid."""
url = (f'https://www.marinetraffic.com/en/reports/?asset_type=vessels'
f'&columns={COMBINED_COLS}&quicksearch_shipid={ship_id}')
js = f"""
async () => {{
try {{
const r = await fetch({json.dumps(url)}, {{
credentials: 'include', cache: 'no-store',
headers: {{
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json',
'Referer': 'https://www.marinetraffic.com/en/data/?asset_type=vessels',
}}
}});
if (r.status !== 200) return {{error: 'HTTP ' + r.status}};
const text = await r.text();
const d = JSON.parse(text);
const rows = d.data || [];
if (rows.length === 0) return {{error: 'no rows'}};
return rows[0];
}} catch(e) {{ return {{error: e.message}}; }}
}}
"""
try:
result = await page.evaluate(js)
if result and not result.get('error'):
return result
return None
except Exception:
return None
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--probe', action='store_true')
parser.add_argument('--limit', type=int, default=0)
parser.add_argument('--delay', type=float, default=DELAY)
args = parser.parse_args()
try:
conn = db_connect()
cur = conn.cursor()
cur.execute('SELECT count(*) FROM mt_bulk_staging')
total_db = cur.fetchone()[0]
cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NULL')
no_mmsi = cur.fetchone()[0]
cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL AND owner IS NULL')
no_owner = cur.fetchone()[0]
print(f"DB: {total_db} total | {no_mmsi} need MMSI | {no_owner} need owner")
except Exception as e:
print(f"DB ERROR: {e}")
return
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False,
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
)
context = await browser.new_context(
viewport={'width': 1440, 'height': 900},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
)
page = await context.new_page()
if not await do_login(page):
await browser.close()
conn.close()
return
await asyncio.sleep(3)
await page.goto('https://www.marinetraffic.com/en/data/?asset_type=vessels',
wait_until='load', timeout=40000)
await asyncio.sleep(5)
# Get vessels: prioritize those without MMSI/owner/commercial_manager
query = """
SELECT ship_id, name, gt_shiptype, dwt, flag, mmsi, imo, owner
FROM mt_bulk_staging
WHERE mmsi IS NULL OR owner IS NULL OR commercial_manager IS NULL
ORDER BY
CASE WHEN mmsi IS NULL THEN 0 ELSE 1 END,
CASE WHEN gt_shiptype = '6' THEN 0 ELSE 1 END,
dwt DESC NULLS LAST
"""
if args.limit:
query += f" LIMIT {args.limit}"
elif args.probe:
query += " LIMIT 20"
cur.execute(query)
vessels = cur.fetchall()
total = len(vessels)
print(f"\nVessels to process: {total}")
if total == 0:
print("Nothing to do!")
await browser.close()
conn.close()
return
eta_sec = total * (args.delay + 0.3)
print(f"Delay: {args.delay}s | ETA: ~{eta_sec/60:.0f} min")
got_mmsi = 0
got_owner = 0
no_data = 0
errors = 0
batch_count = 0
t0 = time.time()
for i, (ship_id, name, gt, dwt, flag, mmsi, imo, existing_owner) in enumerate(vessels):
try:
row = await fetch_combined(page, ship_id)
except Exception as e:
errors += 1
row = None
if row:
api_mmsi = str(row.get('MMSI', '')) if row.get('MMSI') else None
api_imo = str(row.get('IMO', '')) if row.get('IMO') and str(row.get('IMO')) != '0' else None
api_owner = row.get('BENEFICIAL_OWNER') or row.get('REGISTERED_OWNER') or None
api_reg_owner = row.get('REGISTERED_OWNER') or None
api_operator = row.get('OPERATOR') or None
api_commercial_manager = row.get('COMMERCIAL_MANAGER') or None
updates = []
params = []
if api_mmsi and not mmsi:
updates.append("mmsi = %s")
params.append(api_mmsi)
got_mmsi += 1
if api_imo:
updates.append("imo = COALESCE(%s, imo)")
params.append(api_imo)
if api_owner and not existing_owner:
updates.append("owner = %s")
params.append(api_owner)
got_owner += 1
if api_reg_owner:
updates.append("registered_owner = COALESCE(%s, registered_owner)")
params.append(api_reg_owner)
if api_operator:
updates.append("operator = COALESCE(%s, operator)")
params.append(api_operator)
if api_commercial_manager:
updates.append("commercial_manager = COALESCE(%s, commercial_manager)")
params.append(api_commercial_manager)
if updates:
updates.append("scraped_at = NOW()")
params.append(ship_id)
conn, cur = db_safe_execute(conn, cur,
f"UPDATE mt_bulk_staging SET {', '.join(updates)} WHERE ship_id = %s",
tuple(params))
else:
no_data += 1
# Progress
if i < 20 or (i + 1) % 50 == 0:
if row:
m = api_mmsi or mmsi or '?'
o = (api_owner or existing_owner or '?')[:35]
print(f" [{i+1}/{total}] {name} -> MMSI={m} | {o}")
else:
print(f" [{i+1}/{total}] {name} -> no data")
batch_count += 1
if batch_count >= BATCH:
conn = db_safe_commit(conn)
cur = conn.cursor()
elapsed = time.time() - t0
rate = (i + 1) / elapsed if elapsed > 0 else 0
remaining = (total - i - 1) / rate if rate > 0 else 0
print(f"\n=== CHECKPOINT [{i+1}/{total}] {elapsed:.0f}s | "
f"mmsi={got_mmsi} owner={got_owner} "
f"none={no_data} err={errors} | "
f"ETA: {remaining/60:.0f}m ===\n")
batch_count = 0
await asyncio.sleep(args.delay)
# Final commit
conn = db_safe_commit(conn)
cur = conn.cursor()
# Summary
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging")
t, m, o = cur.fetchone()
cur.execute("SELECT count(*), count(mmsi), count(owner) FROM mt_bulk_staging WHERE gt_shiptype='6'")
tb, mb, ob = cur.fetchone()
elapsed = time.time() - t0
print(f"\n{'='*60}")
print(f"DONE in {elapsed/60:.1f} minutes!")
print(f" Processed: {total}")
print(f" New MMSI: {got_mmsi}")
print(f" New owners: {got_owner}")
print(f" No data: {no_data}")
print(f" Errors: {errors}")
print(f"\nGLOBAL: total={t} mmsi={m} owner={o}")
print(f"BULK: total={tb} mmsi={mb} owner={ob}")
print(f"{'='*60}")
conn.close()
await browser.close()
asyncio.run(main())