montana/Русский/Логистика/mt_data_scraper.py

494 lines
18 KiB
Python

#!/usr/bin/env python3
"""
MT Data Page Scraper — GET MMSI + Ownership from MT Data Export
URL: https://www.marinetraffic.com/en/data/?asset_type=vessels
This page has paginated vessel data with MMSI, IMO, Flag, Type, DWT, Owner, Operator.
Uses page.on('response') to capture the data API calls.
Modes:
--probe Show what API calls are made (no DB writes)
--type N Filter by vessel type (6=bulk, default: bulk carrier page)
--limit N Stop after N vessels
--start PAGE Start from page N (resume)
Usage: python mt_data_scraper.py [--probe] [--type 6] [--limit 1000]
"""
import asyncio, json, sys, os, time, re, struct, hmac, hashlib, base64, argparse
import psycopg2
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
EMAIL = "operation@mrlogisticcorp.com"
PASSWORD = "NKh9i8Z!7fU9jfi"
TOTP_SECRET = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
# Pagination
PAGE_DELAY = 3.0 # seconds between pages
BATCH_SIZE = 200 # commit every N vessels
def totp(secret):
s = secret.upper().replace(' ', '')
pad = (-len(s)) % 8
key = base64.b32decode(s + '=' * pad)
counter = int(time.time()) // 30
msg = struct.pack('>Q', counter)
h = hmac.new(key, msg, hashlib.sha1).digest()
offset = h[-1] & 0x0f
code = struct.unpack('>I', h[offset:offset + 4])[0] & 0x7fffffff
return str(code % 1000000).zfill(6)
async def do_login(page):
print("Login to MT Pro...")
await page.goto('https://www.marinetraffic.com/en/users/login',
wait_until='domcontentloaded', timeout=30000)
await asyncio.sleep(3)
await page.fill('input[name="username"]', EMAIL)
await page.click('button[type="submit"]')
await asyncio.sleep(3)
await page.fill('input[type="password"]', PASSWORD)
await page.click('button[type="submit"]')
await asyncio.sleep(4)
if 'mfa-login-options' in page.url or 'mfa' in page.url.lower():
print(" 2FA: Google Authenticator...")
try:
await page.click('button:has-text("Google Authenticator")', timeout=5000)
except Exception:
pass
await asyncio.sleep(2)
otp = totp(TOTP_SECRET)
print(f" TOTP: {otp}")
await page.fill('input[name="code"]', otp)
await page.click('button[type="submit"]')
await asyncio.sleep(5)
logged_in = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
print(f" Logged in: {logged_in} URL: {page.url}")
return logged_in
def parse_vessel_row(row):
"""
Extract vessel data from a JSON row (structure depends on MT API response format).
Returns dict with standardized keys.
"""
if not row or not isinstance(row, dict):
return {}
result = {}
# MMSI
for k in ['MMSI', 'mmsi', 'VESSEL_MMSI']:
if k in row and row[k]:
result['mmsi'] = str(row[k])
break
# IMO
for k in ['IMO', 'imo', 'VESSEL_IMO']:
if k in row and row[k]:
result['imo'] = str(row[k])
break
# Name
for k in ['SHIPNAME', 'NAME', 'name', 'VESSEL_NAME']:
if k in row and row[k]:
result['name'] = str(row[k])
break
# Flag
for k in ['FLAG', 'flag', 'VESSEL_FLAG']:
if k in row and row[k]:
result['flag'] = str(row[k])
break
# Ship type
for k in ['SHIPTYPE', 'TYPE_SUMMARY', 'ship_type', 'GT_SHIPTYPE', 'VESSEL_TYPE']:
if k in row and row[k]:
result['shiptype'] = str(row[k])
break
# DWT
for k in ['DWT', 'dwt', 'DEADWEIGHT']:
if k in row and row[k]:
try: result['dwt'] = int(row[k])
except Exception: pass
break
# Year built
for k in ['YEAR_BUILT', 'year_built', 'BUILT']:
if k in row and row[k]:
try: result['year_built'] = int(row[k])
except Exception: pass
break
# Position
for k in ['LAT', 'lat', 'LATITUDE']:
if k in row and row[k]:
try: result['lat'] = float(row[k])
except Exception: pass
break
for k in ['LON', 'lon', 'LONGITUDE', 'LNG']:
if k in row and row[k]:
try: result['lon'] = float(row[k])
except Exception: pass
break
# Destination
for k in ['DESTINATION', 'destination']:
if k in row and row[k]:
result['destination'] = str(row[k])
break
# Speed
for k in ['SPEED', 'speed']:
if k in row and row[k]:
try: result['speed'] = float(row[k])
except Exception: pass
break
# Owner fields
for k in ['MANAGER_OWNER', 'manager', 'BENEFICIAL_OWNER', 'REGISTERED_OWNER',
'OPERATOR', 'COMMERCIAL_MANAGER']:
if k in row and row[k]:
key_map = {
'MANAGER_OWNER': 'owner',
'BENEFICIAL_OWNER': 'owner',
'REGISTERED_OWNER': 'owner',
'OPERATOR': 'operator',
'COMMERCIAL_MANAGER': 'operator',
'manager': 'owner',
}
field = key_map.get(k, 'owner')
if field not in result:
result[field] = str(row[k])
# SHIP_ID (MT internal)
for k in ['SHIP_ID', 'ship_id', 'id']:
if k in row and row[k]:
result['ship_id'] = str(row[k])
break
return result
async def scrape_data_page(page, url, captured_data):
"""Navigate to a MT data page, wait for API response, return captured rows."""
captured_data.clear()
await page.goto(url, wait_until='load', timeout=40000)
await asyncio.sleep(5)
# Scroll down to trigger lazy loading
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
await asyncio.sleep(2)
return list(captured_data)
async def try_direct_api_pages(page, vessel_type, start_page, limit_pages):
"""
Try to call MT data API directly via page.evaluate(fetch()).
The data page calls something like:
GET /getData/get_vessels_in_area/type:TYPE/page:N
or:
GET /en/data/?asset_type=vessels&vessel_type=TYPE&page=N (JSON via XHR)
"""
results = []
# Common MT data API patterns
api_candidates = [
f'/getData/get_vessels_in_area/type:{vessel_type}/page:{{page}}',
f'/getData/get_data_json_4/type:{vessel_type}/page:{{page}}',
f'/en/data/?asset_type=vessels&vessel_type={vessel_type}&page={{page}}&format=json',
]
for page_num in range(start_page, start_page + limit_pages):
print(f"\n Page {page_num}...")
for template in api_candidates:
url = template.format(page=page_num)
js_code = f"""
async () => {{
try {{
const resp = await fetch('{url}', {{
credentials: 'include',
headers: {{
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*',
'Referer': window.location.href,
}}
}});
const text = await resp.text();
return {{url: '{url}', status: resp.status, body: text.substring(0, 5000)}};
}} catch(e) {{
return {{url: '{url}', status: 0, error: e.message}};
}}
}}
"""
try:
result = await page.evaluate(js_code)
status = result.get('status', 0)
body = result.get('body', '')
if status == 200 and (body.startswith('{') or body.startswith('[')):
print(f" HIT: {url} ({status}, {len(body)}b)")
print(f" Body: {body[:200]}")
results.append({'url': url, 'page': page_num, 'body': body})
break
else:
print(f" Miss: {url} -> {status}")
except Exception as e:
print(f" Error {url}: {e}")
await asyncio.sleep(0.5)
return results
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('--probe', action='store_true', help='Probe mode: just show API calls')
parser.add_argument('--type', type=int, default=0, help='Vessel type (0=all bulk types)')
parser.add_argument('--limit', type=int, default=0, help='Max vessels to collect')
parser.add_argument('--pages', type=int, default=5, help='Pages to try in probe')
parser.add_argument('--start', type=int, default=1, help='Start from page N')
args = parser.parse_args()
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=False,
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
)
context = await browser.new_context(
viewport={'width': 1440, 'height': 900},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
)
page = await context.new_page()
# ---- Capture ALL JSON responses ----
all_captured = []
interesting_captured = []
async def capture_response(response):
url = response.url
# Skip static assets
if any(url.endswith(x) for x in ['.js', '.css', '.png', '.jpg', '.gif', '.woff']):
return
ct = response.headers.get('content-type', '')
try:
body = await response.body()
text = body.decode('utf-8', errors='replace')
# Capture anything that looks like vessel data
if text.startswith('{') or text.startswith('['):
entry = {'url': url, 'status': response.status,
'size': len(body), 'body': text[:3000]}
all_captured.append(entry)
# Is this interesting? (has vessel/mmsi/ship data)
low = text[:500].lower()
if any(kw in low for kw in ['mmsi', 'shipname', 'vessel', 'rows', '"data"']):
interesting_captured.append(entry)
print(f" ** VESSEL API ** {url[:100]} ({len(body)}b)")
print(f" {text[:200]}")
except Exception:
pass
page.on('response', capture_response)
# ---- Login ----
logged_in = await do_login(page)
if not logged_in:
print("ERROR: Login failed!")
await browser.close()
conn.close()
return
await asyncio.sleep(5)
# ---- Navigate to Data page ----
data_url = 'https://www.marinetraffic.com/en/data/?asset_type=vessels'
if args.type:
data_url += f'&vessel_type={args.type}'
print(f"\nNavigating to MT Data page: {data_url}")
all_captured.clear()
interesting_captured.clear()
await page.goto(data_url, wait_until='load', timeout=40000)
await asyncio.sleep(8)
# Scroll to trigger lazy loading
for _ in range(3):
await page.evaluate('window.scrollTo(0, document.body.scrollHeight / 2)')
await asyncio.sleep(1)
await page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
await asyncio.sleep(1)
await asyncio.sleep(3)
print(f"\n Page loaded: {page.url}")
print(f" All JSON responses: {len(all_captured)}")
print(f" Vessel API responses: {len(interesting_captured)}")
if all_captured:
with open('mt_data_page_responses.json', 'w', encoding='utf-8') as f:
json.dump(all_captured, f, indent=2, ensure_ascii=False)
print(f" Saved -> mt_data_page_responses.json")
if args.probe:
print("\n=== PROBE COMPLETE ===")
print("All captured API calls:")
for e in all_captured:
print(f" {e['status']} {e['url'][:120]}")
print("\nVessel-like API calls:")
for e in interesting_captured:
print(f" {e['status']} {e['url'][:120]}")
print(f" Body: {e['body'][:300]}")
print()
await browser.close()
conn.close()
return
# ---- Probe API endpoint via page.evaluate ----
print(f"\nProbing direct API via page.evaluate...")
vtype = args.type if args.type else 6
api_results = await try_direct_api_pages(page, vtype, args.start, args.pages)
if not api_results and not interesting_captured:
print("\nNo vessel data found. Let's try pagination...")
# Try clicking pagination / "next page" on the data table
all_captured.clear()
interesting_captured.clear()
# Look for "next" button or pagination
for sel in [
'button:has-text("Next")', 'a:has-text("Next")',
'[aria-label="Next page"]', '.pagination .next',
'button[class*="next"]',
]:
try:
btn = await page.query_selector(sel)
if btn:
print(f" Found pagination: {sel}")
await btn.click()
await asyncio.sleep(5)
print(f" After page click: {len(interesting_captured)} vessel responses")
break
except Exception:
pass
# ---- Extract vessel rows ----
all_vessels = {}
def process_response_body(body_text):
"""Parse JSON body and extract vessel rows."""
try:
data = json.loads(body_text)
except Exception:
return []
rows = []
# Various response formats
if isinstance(data, list):
rows = data
elif isinstance(data, dict):
rows = (data.get('data', {}).get('rows', []) or
data.get('rows', []) or
data.get('data', []) or
data.get('vessels', []) or
data.get('results', []))
vessels = []
for row in rows:
if isinstance(row, dict):
v = parse_vessel_row(row)
if v.get('mmsi') or v.get('name'):
vessels.append(v)
return vessels
# Process captured responses
for entry in interesting_captured + api_results:
body = entry.get('body', '')
vessels = process_response_body(body)
for v in vessels:
key = v.get('mmsi') or v.get('name', '')
if key and key not in all_vessels:
all_vessels[key] = v
print(f"\n Extracted {len(all_vessels)} unique vessels from data page")
if all_vessels:
# Show sample
for k, v in list(all_vessels.items())[:5]:
print(f" Sample: {v}")
# Save to mt_data_page_vessels.json
with open('mt_data_page_vessels.json', 'w', encoding='utf-8') as f:
json.dump(list(all_vessels.values()), f, indent=2, ensure_ascii=False)
print(f" Saved -> mt_data_page_vessels.json")
# Upsert to mt_bulk_staging
inserted = 0
for v in all_vessels.values():
mmsi = v.get('mmsi')
name = v.get('name', '')
ship_id = v.get('ship_id')
if not mmsi and not name:
continue
try:
cur.execute("""
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, gt_shiptype, type_category,
lat, lon, speed, destination, mmsi, imo, owner, operator, scraped_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
lat = COALESCE(EXCLUDED.lat, mt_bulk_staging.lat),
lon = COALESCE(EXCLUDED.lon, mt_bulk_staging.lon),
scraped_at = NOW()
""", (
ship_id or mmsi or name,
name,
v.get('flag'),
v.get('dwt'),
v.get('shiptype'),
'bulk' if str(v.get('shiptype', '')) == '6' else 'general',
v.get('lat'),
v.get('lon'),
v.get('speed'),
v.get('destination'),
mmsi,
v.get('imo'),
v.get('owner'),
v.get('operator'),
))
inserted += 1
except Exception as e:
print(f" DB error: {e}")
conn.commit()
print(f" Upserted {inserted} vessels into mt_bulk_staging")
# Summary
cur.execute('SELECT count(*) FROM mt_bulk_staging')
total = cur.fetchone()[0]
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL")
with_mmsi = cur.fetchone()[0]
print(f"\n DB: {total} total, {with_mmsi} with MMSI")
conn.close()
await browser.close()
print("\nDone!")
asyncio.run(main())