montana/Русский/Логистика/mt_phase2_mmsi.py

543 lines
20 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
MT Phase 2: Get MMSI + Ownership data for vessels in mt_bulk_staging.
KEY INSIGHT: page.request.get() = 403 (CF blocks programmatic requests)
page.evaluate(fetch()) = 200 (browser's own TLS/cookies pass CF)
Strategy:
1. Login to MT Pro
2. Navigate to MT map (establishes session)
3. For each vessel without MMSI:
a. Try page.evaluate(fetch) to discover vessel info endpoint
b. Alternatively use MT search box (type name capture autocomplete)
4. Update mt_bulk_staging with MMSI + ownership
5. Checkpoint progress
Usage: python mt_phase2_mmsi.py [--limit N] [--gt6only]
"""
import asyncio, json, sys, os, time, re, struct, hmac, hashlib, base64, argparse
import psycopg2
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
EMAIL = "operation@mrlogisticcorp.com"
PASSWORD = "NKh9i8Z!7fU9jfi"
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
DELAY = 1.5 # seconds between vessel requests
BATCH = 100 # commit to DB every N vessels
CHECKPOINT_FILE = 'mt_phase2_checkpoint.json'
def totp(secret):
s = secret.upper().replace(' ', '')
pad = (-len(s)) % 8
key = base64.b32decode(s + '=' * pad)
counter = int(time.time()) // 30
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
return str(code % 1000000).zfill(6)
async def do_login(page):
print("Login to MT Pro...")
await page.goto('https://www.marinetraffic.com/en/users/login',
wait_until='domcontentloaded', timeout=30000)
await asyncio.sleep(3)
await page.fill('input[name="username"]', EMAIL)
await page.click('button[type="submit"]')
await asyncio.sleep(3)
await page.fill('input[type="password"]', PASSWORD)
await page.click('button[type="submit"]')
await asyncio.sleep(4)
if 'mfa-login-options' in page.url or 'mfa' in page.url.lower():
print(" 2FA: Google Authenticator...")
try:
await page.click('button:has-text("Google Authenticator")', timeout=5000)
except Exception:
pass
await asyncio.sleep(2)
otp = totp(TOTP_SECRET)
print(f" TOTP: {otp}")
await page.fill('input[name="code"]', otp)
await page.click('button[type="submit"]')
await asyncio.sleep(5)
logged_in = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
print(f" Logged in: {logged_in} URL: {page.url}")
return logged_in
async def discover_vessel_endpoint(page):
"""Navigate to map, click a vessel, discover what API endpoints are called."""
print("\nDiscovering vessel info endpoint...")
discovered = []
async def capture_all(response):
url = response.url
# Skip static assets
if any(x in url for x in ['.js', '.css', '.png', '.jpg', '.gif', '.woff', '.ico']):
return
# Capture ALL json/api responses
ct = response.headers.get('content-type', '')
if 'json' in ct or 'javascript' in ct or 'getData' in url or 'vessel' in url.lower():
try:
body = await response.body()
text = body.decode('utf-8', errors='replace')
if text.startswith('{') or text.startswith('['):
discovered.append({'url': url, 'size': len(body), 'body': text[:500]})
print(f" API: {url[:100]} ({len(body)}b)")
except Exception as e:
pass
page.on('response', capture_all)
# Navigate to map with some vessels
print(" Going to Rotterdam map...")
await page.goto(
'https://www.marinetraffic.com/en/ais/home/centerx:4.5/centery:51.9/zoom:12',
wait_until='load', timeout=35000)
await asyncio.sleep(10)
# Try clicking in the center of the map (where vessels likely are)
print(" Attempting map click...")
try:
map_el = await page.query_selector('canvas, .mapboxgl-canvas, #mapContainer, .map-container')
if map_el:
box = await map_el.bounding_box()
if box:
cx = box['x'] + box['width'] / 2
cy = box['y'] + box['height'] / 2
await page.mouse.click(cx, cy)
await asyncio.sleep(3)
await page.mouse.click(cx - 50, cy)
await asyncio.sleep(2)
await page.mouse.click(cx + 50, cy - 30)
await asyncio.sleep(2)
except Exception as e:
print(f" Click error: {e}")
page.remove_listener('response', capture_all)
if discovered:
with open('mt_discovered_endpoints.json', 'w') as f:
json.dump(discovered, f, indent=2)
print(f" Saved {len(discovered)} API calls -> mt_discovered_endpoints.json")
return discovered
async def fetch_vessel_info_browser(page, ship_id, name=''):
"""
Use page.evaluate(fetch()) to call MT vessel info API from browser context.
Browser's TLS fingerprint + cookies bypass Cloudflare.
Returns dict with mmsi, imo, owner, operator or empty dict.
"""
# Try multiple endpoint candidates
js_code = """
async (shipId) => {
const candidates = [
'/getData/get_vessel_info/ship_id:' + shipId,
'/en/getData/get_vessel_info/ship_id:' + shipId,
'/getData/get_data_json_4/ship_id:' + shipId,
'/getData/get_vessel_photo/ship_id:' + shipId,
];
const results = {};
for (const url of candidates) {
try {
const resp = await fetch(url, {
credentials: 'include',
headers: {
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Referer': window.location.href,
}
});
const text = await resp.text();
results[url] = {status: resp.status, body: text.substring(0, 2000)};
} catch(e) {
results[url] = {status: 0, error: e.message};
}
}
return results;
}
"""
try:
results = await page.evaluate(js_code, str(ship_id))
return results
except Exception as e:
print(f" evaluate error for {ship_id} ({name}): {e}")
return {}
async def search_vessel_by_name_browser(page, name):
"""
Use MT search box via browser evaluate.
Type vessel name, capture search results JSON.
Returns list of vessel dicts or empty list.
"""
# MT search endpoint (typically called by search input)
js_code = """
async (vesselName) => {
// Try common MT search endpoints
const candidates = [
'/getData/get_vessels_in_area/search_by:name/name:' + encodeURIComponent(vesselName),
'/getData/get_vessel_info/search:' + encodeURIComponent(vesselName),
'/en/getData/get_vessels_in_area/search:' + encodeURIComponent(vesselName),
];
const results = {};
for (const url of candidates) {
try {
const resp = await fetch(url, {
credentials: 'include',
headers: {
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, */*',
'Referer': window.location.href,
}
});
const text = await resp.text();
results[url] = {status: resp.status, body: text.substring(0, 3000)};
} catch(e) {
results[url] = {status: 0, error: e.message};
}
}
return results;
}
"""
try:
return await page.evaluate(js_code, name)
except Exception as e:
print(f" search error for '{name}': {e}")
return {}
async def search_via_searchbox(page, name, captured_results):
"""Fill MT search box, wait for autocomplete, return results."""
# Find search input
selectors = [
'input[placeholder*="Search"]',
'input[placeholder*="search"]',
'input[type="search"]',
'.search-input input',
'[data-testid*="search"] input',
'#search input',
'.navbar input',
]
search_input = None
for sel in selectors:
try:
el = await page.query_selector(sel)
if el:
search_input = el
break
except Exception:
pass
if not search_input:
return []
try:
await search_input.fill('')
await asyncio.sleep(0.3)
await search_input.type(name[:20], delay=80)
await asyncio.sleep(2.0)
# Check captured_results for search responses
return list(captured_results)
except Exception as e:
print(f" searchbox error: {e}")
return []
def parse_vessel_from_response(text, ship_id=None):
"""Try to extract MMSI, IMO, owner from MT API response text."""
if not text or not text.startswith('{'):
return {}
try:
data = json.loads(text)
except Exception:
return {}
result = {}
# Direct fields
for key in ['MMSI', 'mmsi']:
if key in data:
result['mmsi'] = str(data[key])
for key in ['IMO', 'imo']:
if key in data:
result['imo'] = str(data[key])
# Nested: data.rows, data.vessel, etc.
rows = data.get('data', {}).get('rows', [])
if rows and isinstance(rows, list):
row = rows[0]
if 'MMSI' in row:
result['mmsi'] = str(row['MMSI'])
if 'IMO' in row:
result['imo'] = str(row['IMO'])
if 'SHIPNAME' in row:
result['name'] = row['SHIPNAME']
vessel = data.get('data', {}).get('vessel', {}) or data.get('vessel', {})
if vessel:
if 'mmsi' in vessel:
result['mmsi'] = str(vessel['mmsi'])
if 'imo' in vessel:
result['imo'] = str(vessel['imo'])
# Ownership
for key in ['BENEFICIAL_OWNER', 'beneficial_owner', 'owner']:
if key in data:
result['owner'] = str(data[key])
return result
def load_checkpoint():
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE) as f:
return json.load(f)
return {'processed_ship_ids': [], 'found_mmsi': 0, 'not_found': 0}
def save_checkpoint(cp):
with open(CHECKPOINT_FILE, 'w') as f:
json.dump(cp, f)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--limit', type=int, default=0, help='Max vessels to process (0=all)')
parser.add_argument('--gt6only', action='store_true', help='Only process GT_SHIPTYPE=6 (bulk)')
parser.add_argument('--probe', action='store_true', help='Just probe endpoints and exit')
args = parser.parse_args()
cp = load_checkpoint()
processed_ids = set(cp.get('processed_ship_ids', []))
print(f"Checkpoint: {len(processed_ids)} already processed, "
f"{cp.get('found_mmsi', 0)} MMSI found")
# Load vessels from DB
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
query = """
SELECT ship_id, name, lat, lon, gt_shiptype, dwt, flag
FROM mt_bulk_staging
WHERE mmsi IS NULL
"""
if args.gt6only:
query += " AND gt_shiptype = '6'"
query += " ORDER BY gt_shiptype, dwt DESC NULLS LAST"
if args.limit:
query += f" LIMIT {args.limit}"
cur.execute(query)
vessels = cur.fetchall()
print(f"Vessels to process: {len(vessels)} (MMSI=NULL in DB)")
# Filter already processed
vessels = [(sid, nm, lat, lon, gt, dwt, flag)
for (sid, nm, lat, lon, gt, dwt, flag) in vessels
if sid not in processed_ids]
print(f"After checkpoint filter: {len(vessels)}")
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False,
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
)
context = await browser.new_context(
viewport={'width': 1280, 'height': 900},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
)
page = await context.new_page()
# Login
logged_in = await do_login(page)
if not logged_in:
print("ERROR: Login failed!")
await browser.close()
return
# Stabilize session
await asyncio.sleep(5)
print("\nSession stabilized.")
# ---- PROBE MODE: just discover endpoints ----
if args.probe:
discovered = await discover_vessel_endpoint(page)
await browser.close()
return
# ---- Navigate to map (establishes AIS session) ----
print("Loading map for session warmup...")
await page.goto(
'https://www.marinetraffic.com/en/ais/home/centerx:4.5/centery:51.9/zoom:10',
wait_until='load', timeout=35000)
await asyncio.sleep(8)
print(f" Map loaded: {page.url}")
# ---- Set up search response capture ----
search_captures = []
async def capture_search_responses(response):
url = response.url
if any(x in url for x in ['getData', 'vessel', 'search', 'query']):
if not any(x in url for x in ['.js', '.css', '.png', '.jpg']):
try:
body = await response.body()
text = body.decode('utf-8', errors='replace')
if text.startswith('{') or text.startswith('['):
search_captures.append({'url': url, 'body': text})
except Exception:
pass
page.on('response', capture_search_responses)
# ---- Process vessels ----
found_mmsi = cp.get('found_mmsi', 0)
not_found = cp.get('not_found', 0)
updated = 0
batch_ids = []
total = len(vessels)
for i, (ship_id, name, lat, lon, gt, dwt, flag) in enumerate(vessels):
print(f"\n[{i+1}/{total}] {name} (SHIP_ID={ship_id}, GT={gt}, DWT={dwt}, {flag})")
search_captures.clear()
result = {}
# ---- Approach 1: page.evaluate fetch on vessel info endpoint ----
fetch_results = await fetch_vessel_info_browser(page, ship_id, name)
for url, resp_data in fetch_results.items():
status = resp_data.get('status', 0)
body = resp_data.get('body', '')
if status == 200 and body.startswith('{'):
parsed = parse_vessel_from_response(body, ship_id)
if parsed.get('mmsi'):
result = parsed
print(f" Endpoint {url.split('/')[-1]}: MMSI={parsed['mmsi']}")
break
else:
short = body[:80].replace('\n', ' ')
print(f" {url.split('/')[-1]}: {status} -> {short}")
# ---- Approach 2: search box (if no MMSI yet) ----
if not result.get('mmsi') and name:
print(f" Trying search box for '{name}'...")
search_captures.clear()
await search_via_searchbox(page, name, search_captures)
await asyncio.sleep(1.5)
for cap in search_captures:
parsed = parse_vessel_from_response(cap['body'], ship_id)
if parsed.get('mmsi'):
result = parsed
print(f" Search box: MMSI={parsed['mmsi']}")
break
# ---- Approach 3: navigate to vessel lat/lon, listen for click API ----
if not result.get('mmsi') and lat and lon:
# Navigate to vessel position, try JS click on vessel marker
tile_url = (f'https://www.marinetraffic.com/en/ais/home'
f'/centerx:{lon}/centery:{lat}/zoom:14')
try:
search_captures.clear()
await page.goto(tile_url, wait_until='load', timeout=25000)
await asyncio.sleep(8)
# Try clicking center of map and nearby positions
canvas = await page.query_selector('canvas, .mapboxgl-canvas')
if canvas:
box = await canvas.bounding_box()
if box:
cx = box['x'] + box['width'] / 2
cy = box['y'] + box['height'] / 2
for dx, dy in [(0,0), (20,0), (-20,0), (0,20), (0,-20),
(30,30), (-30,-30), (15,-15), (-15,15)]:
await page.mouse.click(cx + dx, cy + dy)
await asyncio.sleep(0.5)
await asyncio.sleep(2)
for cap in search_captures:
parsed = parse_vessel_from_response(cap['body'], ship_id)
if parsed.get('mmsi'):
result = parsed
print(f" Map click: MMSI={parsed['mmsi']}")
break
except Exception as e:
print(f" Map nav error: {e}")
# ---- Update DB ----
if result.get('mmsi'):
mmsi = result['mmsi']
imo = result.get('imo')
owner = result.get('owner')
operator = result.get('operator')
cur.execute("""
UPDATE mt_bulk_staging
SET mmsi=?, imo=COALESCE(?, imo), owner=COALESCE(?, owner),
operator=COALESCE(?, operator), scraped_at=NOW()
WHERE ship_id=?
""".replace('?', '%s'), (mmsi, imo, owner, operator, ship_id))
found_mmsi += 1
updated += 1
print(f" -> UPDATED: MMSI={mmsi} IMO={imo} owner={owner}")
else:
not_found += 1
print(f" -> No MMSI found")
# Track processed
processed_ids.add(ship_id)
batch_ids.append(ship_id)
# Commit every BATCH vessels
if len(batch_ids) >= BATCH:
conn.commit()
cp['processed_ship_ids'] = list(processed_ids)
cp['found_mmsi'] = found_mmsi
cp['not_found'] = not_found
save_checkpoint(cp)
print(f"\n === CHECKPOINT: {i+1}/{total} processed, "
f"{found_mmsi} MMSI found, {not_found} not found ===")
batch_ids.clear()
await asyncio.sleep(DELAY)
# Final commit
conn.commit()
cp['processed_ship_ids'] = list(processed_ids)
cp['found_mmsi'] = found_mmsi
cp['not_found'] = not_found
save_checkpoint(cp)
print(f"\n=== DONE ===")
print(f" Processed: {len(vessels)}")
print(f" MMSI found: {found_mmsi}")
print(f" Not found: {not_found}")
print(f" Updated in DB: {updated}")
# Summary
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL")
total_with_mmsi = cur.fetchone()[0]
print(f" Total with MMSI in DB: {total_with_mmsi}")
conn.close()
await browser.close()
print("\nPhase 2 complete!")
asyncio.run(main())