659 lines
27 KiB
Python
659 lines
27 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
MT Bulk Carrier Database Scraper — Async Playwright v1
|
||
|
|
=======================================================
|
||
|
|
Uses page.on('response') to capture tile API data (CF-safe: browser's own requests).
|
||
|
|
Phase 1 : Navigate all 104 grid areas → collect bulk carrier SHIP_IDs + positions
|
||
|
|
Phase 2 : Visit vessel detail pages → MMSI + ownership (Beneficial Owner, etc.)
|
||
|
|
Phase 3 : Save to PostgreSQL production DB via SSH tunnel
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python mt_bulk_scraper.py # full run
|
||
|
|
python mt_bulk_scraper.py --phase1 # tile collection only
|
||
|
|
python mt_bulk_scraper.py --phase2 # detail pages only (after phase1)
|
||
|
|
python mt_bulk_scraper.py --reset # clear checkpoint and restart
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio, json, time, base64, struct, hmac, hashlib, math
|
||
|
|
import os, sys, re, argparse
|
||
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
||
|
|
if hasattr(sys.stdout, 'reconfigure'):
|
||
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
||
|
|
if hasattr(sys.stderr, 'reconfigure'):
|
||
|
|
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
|
||
|
|
|
||
|
|
# ── Credentials ──────────────────────────────────────────────────────────────
|
||
|
|
MT_EMAIL = "operation@mrlogisticcorp.com"
|
||
|
|
MT_PASS = "NKh9i8Z!7fU9jfi"
|
||
|
|
MT_TOTP = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH"
|
||
|
|
|
||
|
|
# ── DB (SSH tunnel must be running: ssh -L 15432:127.0.0.1:5432 root@89.19.208.158) ──
|
||
|
|
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
||
|
|
|
||
|
|
# ── Files ─────────────────────────────────────────────────────────────────────
|
||
|
|
CHECKPOINT = 'mt_bulk_v2_checkpoint.json'
|
||
|
|
|
||
|
|
# ── Filtering: which tile SHIPTYPE values are bulk carriers? ─────────────────
|
||
|
|
# From tile analysis: SHIPTYPE "7" = bulk/cargo, "8" = mixed (tanker+bulk)
|
||
|
|
# Include both for first pass; DWT threshold narrows it further.
|
||
|
|
BULK_TYPES = {'4', '5', '7'} # definitely cargo/bulk
|
||
|
|
EXTRA_TYPES = {'8', '9', '6'} # mixed — require higher DWT
|
||
|
|
MIN_DWT_BULK = 5_000 # t, minimum for any bulk carrier
|
||
|
|
MIN_DWT_EXTRA = 20_000 # t, for EXTRA_TYPES
|
||
|
|
|
||
|
|
# ── Grid: 104 points ordered Caspian → Black Sea → Med → Baltic → ... ───────
|
||
|
|
ZOOM = 9
|
||
|
|
GRID = [
|
||
|
|
# 1. Caspian Sea
|
||
|
|
(40.0, 50.0), (41.5, 50.5), (43.5, 51.5), (45.0, 52.0), (47.0, 51.5),
|
||
|
|
# 2. Black Sea
|
||
|
|
(43.0, 33.0), (44.5, 33.5), (45.5, 30.0), (43.0, 40.5),
|
||
|
|
(42.5, 37.5), (41.5, 29.0), (46.3, 30.7),
|
||
|
|
# 3. Mediterranean
|
||
|
|
(36.0, 14.0), (37.5, 23.0), (35.5, 28.0), (35.5, 10.0), (37.0, 6.0),
|
||
|
|
(36.5, -5.0), (38.0, 3.0), (38.5, 16.0), (37.0, 21.0),
|
||
|
|
(36.5, 29.0), (37.5, 34.0),
|
||
|
|
# 4. Baltic
|
||
|
|
(55.5, 18.0), (57.0, 20.0), (59.5, 24.0), (60.0, 25.5),
|
||
|
|
(57.0, 18.0), (56.0, 12.0), (55.0, 14.0), (56.0, 10.0),
|
||
|
|
(57.5, 12.0), (59.0, 18.0), (60.5, 27.0),
|
||
|
|
# 5. North Sea / English Channel
|
||
|
|
(51.9, 4.5), (53.5, 3.5), (56.0, 4.0), (51.0, 2.0), (51.2, 1.0),
|
||
|
|
# 6. UK / Irish Sea
|
||
|
|
(51.5, -3.0), (53.5, -4.0), (56.0, -5.0), (58.0, -3.0),
|
||
|
|
# 7. Atlantic European coasts
|
||
|
|
(48.5, -5.0), (44.5, -3.0), (41.2, -9.0), (36.5, -8.5),
|
||
|
|
# 8. Red Sea + Suez
|
||
|
|
(27.0, 34.0), (22.0, 38.0), (15.0, 42.0), (12.5, 44.0),
|
||
|
|
(11.0, 43.5), (30.5, 32.5),
|
||
|
|
# 9. Persian Gulf
|
||
|
|
(26.0, 56.0), (24.5, 55.0), (27.5, 51.0), (29.0, 48.0), (25.0, 53.0),
|
||
|
|
# 10. Indian Ocean
|
||
|
|
(10.0, 65.0), (5.0, 73.0), (-5.0, 65.0), (-15.0, 45.0),
|
||
|
|
(20.0, 65.0), (-25.0, 43.5), (7.0, 58.0),
|
||
|
|
# 11. SE Asia
|
||
|
|
(1.5, 103.8), (10.0, 108.0), (5.5, 100.0), (-7.0, 112.0),
|
||
|
|
(15.0, 108.0), (-4.0, 115.0),
|
||
|
|
# 12. East Asia
|
||
|
|
(30.0, 122.0), (35.0, 129.0), (34.0, 119.0), (37.5, 121.5),
|
||
|
|
(32.0, 125.0), (35.5, 136.5),
|
||
|
|
# 13. Australia
|
||
|
|
(-33.9, 151.2), (-31.9, 115.8), (-23.0, 150.5), (-20.0, 118.5),
|
||
|
|
(-38.0, 145.0), (-34.5, 138.5),
|
||
|
|
# 14. US East + Gulf
|
||
|
|
(29.5, -94.5), (28.0, -90.5), (30.0, -81.5), (37.0, -76.0),
|
||
|
|
(40.7, -74.0), (43.0, -71.0),
|
||
|
|
# 15. US West
|
||
|
|
(37.5, -122.5), (33.7, -118.3), (48.5, -123.5),
|
||
|
|
# 16. South America
|
||
|
|
(-34.0, -70.5), (-23.0, -43.0), (-33.5, -71.5),
|
||
|
|
(-2.5, -44.5), (8.0, -77.0), (-3.0, -59.5),
|
||
|
|
# 17. Africa
|
||
|
|
(-34.0, 18.5), (-4.0, 39.5), (6.5, 2.5), (-26.0, 33.0),
|
||
|
|
(30.0, 32.5), (5.0, -1.0), (14.0, -17.0),
|
||
|
|
# 18. Great Lakes / Canada
|
||
|
|
(43.0, -79.0), (46.5, -84.0), (45.5, -73.5),
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Utilities
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def totp(secret):
|
||
|
|
s = secret.upper().replace(' ', '')
|
||
|
|
pad = (-len(s)) % 8
|
||
|
|
key = base64.b32decode(s + '=' * pad)
|
||
|
|
counter = int(time.time()) // 30
|
||
|
|
msg = struct.pack('>Q', counter)
|
||
|
|
h = hmac.new(key, msg, hashlib.sha1).digest()
|
||
|
|
offset = h[-1] & 0x0f
|
||
|
|
code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff
|
||
|
|
return str(code % 1_000_000).zfill(6)
|
||
|
|
|
||
|
|
|
||
|
|
def ts():
|
||
|
|
return datetime.now().strftime('%H:%M:%S')
|
||
|
|
|
||
|
|
|
||
|
|
def load_checkpoint():
|
||
|
|
if os.path.exists(CHECKPOINT):
|
||
|
|
with open(CHECKPOINT) as f:
|
||
|
|
return json.load(f)
|
||
|
|
return {
|
||
|
|
'phase1_done': False,
|
||
|
|
'last_grid_idx': -1,
|
||
|
|
'vessels': {}, # ship_id -> {lat, lon, name, flag, dwt, shiptype, gt_shiptype}
|
||
|
|
'phase2_done_ids': [],
|
||
|
|
'started_at': datetime.now().isoformat(),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def save_checkpoint(cp):
|
||
|
|
with open(CHECKPOINT, 'w') as f:
|
||
|
|
json.dump(cp, f, indent=2)
|
||
|
|
|
||
|
|
|
||
|
|
def is_bulk(row):
|
||
|
|
"""Return True if tile row is likely a bulk carrier."""
|
||
|
|
st = str(row.get('SHIPTYPE', ''))
|
||
|
|
dwt = int(row.get('DWT', 0) or 0)
|
||
|
|
if st in BULK_TYPES and dwt >= MIN_DWT_BULK:
|
||
|
|
return True
|
||
|
|
if st in EXTRA_TYPES and dwt >= MIN_DWT_EXTRA:
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Database helpers
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def db_connect():
|
||
|
|
return __import__('psycopg2').connect(DB_URL)
|
||
|
|
|
||
|
|
|
||
|
|
def db_upsert_vessel(conn, data: dict):
|
||
|
|
"""Insert or update vessel row. data keys match columns."""
|
||
|
|
cols = ['mmsi', 'name', 'flag', 'type_category', 'deadweight',
|
||
|
|
'year_built', 'imo', 'owner', 'operator', 'companies_json',
|
||
|
|
'website', 'source', 'lat', 'lon']
|
||
|
|
vals = {c: data.get(c) for c in cols}
|
||
|
|
vals['source'] = 'mt_pro_scraper'
|
||
|
|
if not vals.get('mmsi'):
|
||
|
|
return
|
||
|
|
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute("""
|
||
|
|
INSERT INTO vessels (mmsi, name, flag, type_category, deadweight,
|
||
|
|
year_built, imo, owner, operator, companies_json, website, source, lat, lon,
|
||
|
|
updated_at)
|
||
|
|
VALUES (%(mmsi)s, %(name)s, %(flag)s, %(type_category)s, %(deadweight)s,
|
||
|
|
%(year_built)s, %(imo)s, %(owner)s, %(operator)s, %(companies_json)s,
|
||
|
|
%(website)s, %(source)s, %(lat)s, %(lon)s, NOW())
|
||
|
|
ON CONFLICT (mmsi) DO UPDATE SET
|
||
|
|
name = COALESCE(EXCLUDED.name, vessels.name),
|
||
|
|
flag = COALESCE(EXCLUDED.flag, vessels.flag),
|
||
|
|
type_category= COALESCE(EXCLUDED.type_category,vessels.type_category),
|
||
|
|
deadweight = COALESCE(EXCLUDED.deadweight, vessels.deadweight),
|
||
|
|
year_built = COALESCE(EXCLUDED.year_built, vessels.year_built),
|
||
|
|
imo = COALESCE(EXCLUDED.imo, vessels.imo),
|
||
|
|
owner = COALESCE(EXCLUDED.owner, vessels.owner),
|
||
|
|
operator = COALESCE(EXCLUDED.operator, vessels.operator),
|
||
|
|
companies_json = COALESCE(EXCLUDED.companies_json, vessels.companies_json),
|
||
|
|
website = COALESCE(EXCLUDED.website, vessels.website),
|
||
|
|
source = 'mt_pro_scraper',
|
||
|
|
lat = COALESCE(EXCLUDED.lat, vessels.lat),
|
||
|
|
lon = COALESCE(EXCLUDED.lon, vessels.lon),
|
||
|
|
updated_at = NOW()
|
||
|
|
""", vals)
|
||
|
|
conn.commit()
|
||
|
|
|
||
|
|
|
||
|
|
def db_upsert_position(conn, mmsi, lat, lon, speed, course, destination):
|
||
|
|
"""Insert or update position row."""
|
||
|
|
try:
|
||
|
|
with conn.cursor() as cur:
|
||
|
|
cur.execute("""
|
||
|
|
INSERT INTO positions (mmsi, lat, lon, speed, course, destination,
|
||
|
|
timestamp, source)
|
||
|
|
VALUES (%s, %s, %s, %s, %s, %s, NOW(), 'mt_tile')
|
||
|
|
ON CONFLICT (mmsi) DO UPDATE SET
|
||
|
|
lat = EXCLUDED.lat,
|
||
|
|
lon = EXCLUDED.lon,
|
||
|
|
speed = EXCLUDED.speed,
|
||
|
|
course = EXCLUDED.course,
|
||
|
|
destination = EXCLUDED.destination,
|
||
|
|
timestamp = NOW(),
|
||
|
|
source = 'mt_tile'
|
||
|
|
""", (mmsi, lat, lon, speed, course, destination))
|
||
|
|
conn.commit()
|
||
|
|
except Exception as e:
|
||
|
|
try: conn.rollback()
|
||
|
|
except: pass
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Playwright helpers
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
async def do_login(page):
|
||
|
|
print(f"[{ts()}] LOGIN: navigating to MT...")
|
||
|
|
await page.goto('https://www.marinetraffic.com/en/users/login',
|
||
|
|
wait_until='domcontentloaded', timeout=30000)
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
|
||
|
|
await page.fill('input[name="username"]', MT_EMAIL)
|
||
|
|
await page.click('button[type="submit"]')
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
|
||
|
|
await page.fill('input[type="password"]', MT_PASS)
|
||
|
|
await page.click('button[type="submit"]')
|
||
|
|
await asyncio.sleep(4)
|
||
|
|
|
||
|
|
if 'mfa-login-options' in page.url:
|
||
|
|
print(f"[{ts()}] LOGIN: 2FA screen...")
|
||
|
|
await page.click('button:has-text("Google Authenticator")')
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
otp = totp(MT_TOTP)
|
||
|
|
print(f"[{ts()}] LOGIN: TOTP={otp}")
|
||
|
|
await page.fill('input[name="code"]', otp)
|
||
|
|
await page.click('button[type="submit"]')
|
||
|
|
await asyncio.sleep(5)
|
||
|
|
|
||
|
|
logged = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url
|
||
|
|
print(f"[{ts()}] LOGIN: {'OK' if logged else 'FAILED'} | {page.url[:80]}")
|
||
|
|
return logged
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# PHASE 1: Tile collection
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
async def phase1(page, cp):
|
||
|
|
"""Navigate all grid areas and collect bulk carrier SHIP_IDs from tiles."""
|
||
|
|
print(f"\n[{ts()}] ===== PHASE 1: Grid traversal ({len(GRID)} areas) =====")
|
||
|
|
|
||
|
|
current_tiles = []
|
||
|
|
new_vessels = 0
|
||
|
|
|
||
|
|
async def on_tile(response):
|
||
|
|
if 'getData/get_data_json_4' not in response.url:
|
||
|
|
return
|
||
|
|
try:
|
||
|
|
body = await response.body()
|
||
|
|
parsed = json.loads(body)
|
||
|
|
rows = parsed.get('data', {}).get('rows', [])
|
||
|
|
current_tiles.extend(rows)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
page.on('response', on_tile)
|
||
|
|
|
||
|
|
start_idx = cp.get('last_grid_idx', -1) + 1
|
||
|
|
|
||
|
|
for i, (lat, lon) in enumerate(GRID):
|
||
|
|
if i < start_idx:
|
||
|
|
continue
|
||
|
|
|
||
|
|
current_tiles.clear()
|
||
|
|
area_url = (f'https://www.marinetraffic.com/en/ais/home'
|
||
|
|
f'/centerx:{lon}/centery:{lat}/zoom:{ZOOM}')
|
||
|
|
print(f"[{ts()}] Grid {i+1:3d}/{len(GRID)} ({lat:.1f},{lon:.1f})", end=' ', flush=True)
|
||
|
|
|
||
|
|
try:
|
||
|
|
await page.goto(area_url, wait_until='load', timeout=35000)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"goto error: {e}")
|
||
|
|
await asyncio.sleep(5)
|
||
|
|
try:
|
||
|
|
await page.goto(area_url, wait_until='domcontentloaded', timeout=35000)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Wait for tiles to arrive
|
||
|
|
await asyncio.sleep(12)
|
||
|
|
|
||
|
|
# Process captured tile rows
|
||
|
|
area_new = 0
|
||
|
|
for row in current_tiles:
|
||
|
|
if not is_bulk(row):
|
||
|
|
continue
|
||
|
|
sid = str(row.get('SHIP_ID', ''))
|
||
|
|
if not sid:
|
||
|
|
continue
|
||
|
|
if sid not in cp['vessels']:
|
||
|
|
cp['vessels'][sid] = {
|
||
|
|
'name': row.get('SHIPNAME', ''),
|
||
|
|
'flag': row.get('FLAG', ''),
|
||
|
|
'dwt': int(row.get('DWT', 0) or 0),
|
||
|
|
'shiptype': str(row.get('SHIPTYPE', '')),
|
||
|
|
'gt_shiptype':str(row.get('GT_SHIPTYPE', '')),
|
||
|
|
'lat': row.get('LAT'),
|
||
|
|
'lon': row.get('LON'),
|
||
|
|
'speed': row.get('SPEED'),
|
||
|
|
'course': row.get('COURSE'),
|
||
|
|
'destination':row.get('DESTINATION', ''),
|
||
|
|
'mmsi': None,
|
||
|
|
'imo': None,
|
||
|
|
'owner': None,
|
||
|
|
'operator': None,
|
||
|
|
'companies': None,
|
||
|
|
'grid_idx': i,
|
||
|
|
}
|
||
|
|
area_new += 1
|
||
|
|
new_vessels += 1
|
||
|
|
|
||
|
|
total = len(cp['vessels'])
|
||
|
|
print(f"tiles={len(current_tiles):5d} bulk_new={area_new:4d} total={total:6d}")
|
||
|
|
|
||
|
|
cp['last_grid_idx'] = i
|
||
|
|
save_checkpoint(cp)
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
|
||
|
|
page.remove_listener('response', on_tile)
|
||
|
|
cp['phase1_done'] = True
|
||
|
|
save_checkpoint(cp)
|
||
|
|
print(f"\n[{ts()}] PHASE 1 DONE. Total bulk carriers found: {len(cp['vessels'])}")
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# PHASE 2: Vessel detail pages
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def parse_ownership_html(html: str) -> dict:
|
||
|
|
"""
|
||
|
|
Parse MT vessel detail page HTML for ownership fields.
|
||
|
|
Returns dict with keys: mmsi, imo, year_built, owner, operator,
|
||
|
|
registered_owner, commercial_manager, companies (list)
|
||
|
|
"""
|
||
|
|
from bs4 import BeautifulSoup
|
||
|
|
soup = BeautifulSoup(html, 'html.parser')
|
||
|
|
result = {}
|
||
|
|
|
||
|
|
# MMSI — appears in page text and meta
|
||
|
|
mmsi_m = re.search(r'"MMSI"\s*:\s*"?(\d{9})"?', html)
|
||
|
|
if not mmsi_m:
|
||
|
|
mmsi_m = re.search(r'mmsi["\s:=]+(\d{9})', html, re.I)
|
||
|
|
if mmsi_m:
|
||
|
|
result['mmsi'] = mmsi_m.group(1)
|
||
|
|
|
||
|
|
# IMO
|
||
|
|
imo_m = re.search(r'"IMO"\s*:\s*"?(\d{7})"?', html)
|
||
|
|
if not imo_m:
|
||
|
|
imo_m = re.search(r'imo["\s:=]+(\d{7})', html, re.I)
|
||
|
|
if imo_m:
|
||
|
|
result['imo'] = imo_m.group(1)
|
||
|
|
|
||
|
|
# Year built
|
||
|
|
yr_m = re.search(r'"year_built"\s*:\s*"?(\d{4})"?', html, re.I)
|
||
|
|
if not yr_m:
|
||
|
|
yr_m = re.search(r'year.?built["\s:=]+(\d{4})', html, re.I)
|
||
|
|
if yr_m:
|
||
|
|
result['year_built'] = int(yr_m.group(1))
|
||
|
|
|
||
|
|
# Ownership roles
|
||
|
|
role_map = {
|
||
|
|
'beneficial owner': 'beneficial_owner',
|
||
|
|
'registered owner': 'registered_owner',
|
||
|
|
'commercial manager': 'commercial_manager',
|
||
|
|
'ship manager': 'commercial_manager',
|
||
|
|
'operator': 'operator',
|
||
|
|
'disponent owner': 'operator',
|
||
|
|
'technical manager': 'commercial_manager',
|
||
|
|
}
|
||
|
|
companies = []
|
||
|
|
|
||
|
|
# Try structured ownership table rows
|
||
|
|
for row in soup.select('tr, .company-row, [class*="ownership"]'):
|
||
|
|
text = row.get_text(' ', strip=True).lower()
|
||
|
|
for role_key, field in role_map.items():
|
||
|
|
if role_key in text:
|
||
|
|
# Company name is the link text or next cell
|
||
|
|
links = row.select('a')
|
||
|
|
if links:
|
||
|
|
name = links[0].get_text(strip=True)
|
||
|
|
if name and len(name) > 2:
|
||
|
|
result[field] = name
|
||
|
|
companies.append({'role': role_key, 'name': name,
|
||
|
|
'href': links[0].get('href', '')})
|
||
|
|
break
|
||
|
|
|
||
|
|
# Fallback: regex for common ownership JSON in page scripts
|
||
|
|
for pattern in [
|
||
|
|
r'"beneficial_owner"\s*:\s*"([^"]+)"',
|
||
|
|
r'"Beneficial Owner"\s*:\s*"([^"]+)"',
|
||
|
|
r'Beneficial Owner[^<]{0,60}<[^>]+>([^<]{3,80})<',
|
||
|
|
]:
|
||
|
|
m = re.search(pattern, html, re.I)
|
||
|
|
if m and 'beneficial_owner' not in result:
|
||
|
|
result['beneficial_owner'] = m.group(1).strip()
|
||
|
|
|
||
|
|
for pattern in [
|
||
|
|
r'"operator"\s*:\s*"([^"]+)"',
|
||
|
|
r'"Operator"\s*:\s*"([^"]+)"',
|
||
|
|
r'Operator[^<]{0,60}<[^>]+>([^<]{3,80})<',
|
||
|
|
]:
|
||
|
|
m = re.search(pattern, html, re.I)
|
||
|
|
if m and 'operator' not in result:
|
||
|
|
result['operator'] = m.group(1).strip()
|
||
|
|
|
||
|
|
# Derive 'owner' field for DB (prefer beneficial_owner, then registered_owner)
|
||
|
|
result['owner'] = (result.get('beneficial_owner')
|
||
|
|
or result.get('registered_owner')
|
||
|
|
or result.get('operator'))
|
||
|
|
result['companies'] = companies
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
async def get_vessel_detail(page, ship_id: str, vessel_data: dict) -> dict:
|
||
|
|
"""
|
||
|
|
Navigate to MT vessel detail page, extract MMSI + ownership.
|
||
|
|
Updates vessel_data dict in place, returns it.
|
||
|
|
"""
|
||
|
|
url = f'https://www.marinetraffic.com/en/ais/details/ships/ship_id:{ship_id}'
|
||
|
|
|
||
|
|
# Capture any JSON API responses from this page
|
||
|
|
detail_jsons = []
|
||
|
|
|
||
|
|
async def on_detail_resp(response):
|
||
|
|
resp_url = response.url
|
||
|
|
# Skip static assets
|
||
|
|
if any(ext in resp_url for ext in ['.js', '.css', '.png', '.jpg', '.svg']):
|
||
|
|
return
|
||
|
|
# Capture JSON API responses
|
||
|
|
ct = response.headers.get('content-type', '')
|
||
|
|
if 'json' in ct or 'getData' in resp_url or '/en/reports' in resp_url:
|
||
|
|
try:
|
||
|
|
body = await response.body()
|
||
|
|
text = body.decode('utf-8', errors='replace')
|
||
|
|
if text.startswith('{') or text.startswith('['):
|
||
|
|
detail_jsons.append({'url': resp_url, 'body': text})
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
page.on('response', on_detail_resp)
|
||
|
|
|
||
|
|
try:
|
||
|
|
await page.goto(url, wait_until='load', timeout=30000)
|
||
|
|
await asyncio.sleep(5)
|
||
|
|
|
||
|
|
# MMSI from URL redirect (MT redirects to /mmsi:{MMSI}/...)
|
||
|
|
final_url = page.url
|
||
|
|
mmsi_m = re.search(r'/mmsi[:/](\d{9})', final_url, re.I)
|
||
|
|
if mmsi_m:
|
||
|
|
vessel_data['mmsi'] = mmsi_m.group(1)
|
||
|
|
|
||
|
|
# Parse HTML
|
||
|
|
html = await page.content()
|
||
|
|
parsed = parse_ownership_html(html)
|
||
|
|
|
||
|
|
if not vessel_data.get('mmsi') and parsed.get('mmsi'):
|
||
|
|
vessel_data['mmsi'] = parsed['mmsi']
|
||
|
|
if parsed.get('imo'):
|
||
|
|
vessel_data['imo'] = parsed['imo']
|
||
|
|
if parsed.get('year_built'):
|
||
|
|
vessel_data['year_built'] = parsed['year_built']
|
||
|
|
if parsed.get('owner'):
|
||
|
|
vessel_data['owner'] = parsed['owner']
|
||
|
|
if parsed.get('operator'):
|
||
|
|
vessel_data['operator'] = parsed['operator']
|
||
|
|
if parsed.get('companies'):
|
||
|
|
vessel_data['companies'] = parsed['companies']
|
||
|
|
|
||
|
|
# Also scan captured JSON responses for ownership data
|
||
|
|
for item in detail_jsons:
|
||
|
|
try:
|
||
|
|
j = json.loads(item['body'])
|
||
|
|
if isinstance(j, dict):
|
||
|
|
# Try direct keys
|
||
|
|
for key in ('MMSI', 'mmsi'):
|
||
|
|
if j.get(key) and not vessel_data.get('mmsi'):
|
||
|
|
vessel_data['mmsi'] = str(j[key])
|
||
|
|
for key in ('IMO', 'imo'):
|
||
|
|
if j.get(key) and not vessel_data.get('imo'):
|
||
|
|
vessel_data['imo'] = str(j[key])
|
||
|
|
# Ownership from JSON
|
||
|
|
for key in ('MANAGER_OWNER', 'owner', 'beneficial_owner'):
|
||
|
|
if j.get(key) and not vessel_data.get('owner'):
|
||
|
|
vessel_data['owner'] = str(j[key])
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f" detail_error={e}")
|
||
|
|
finally:
|
||
|
|
page.remove_listener('response', on_detail_resp)
|
||
|
|
|
||
|
|
return vessel_data
|
||
|
|
|
||
|
|
|
||
|
|
async def phase2(page, cp, conn):
|
||
|
|
"""Visit vessel detail pages for all ships without MMSI."""
|
||
|
|
ship_ids = [
|
||
|
|
sid for sid, v in cp['vessels'].items()
|
||
|
|
if not v.get('mmsi') and sid not in cp.get('phase2_done_ids', [])
|
||
|
|
]
|
||
|
|
total = len(ship_ids)
|
||
|
|
print(f"\n[{ts()}] ===== PHASE 2: Vessel details ({total} ships to process) =====")
|
||
|
|
|
||
|
|
done_ids = set(cp.get('phase2_done_ids', []))
|
||
|
|
saved_count = 0
|
||
|
|
|
||
|
|
for idx, sid in enumerate(ship_ids):
|
||
|
|
vdata = cp['vessels'][sid]
|
||
|
|
name = vdata.get('name', sid)
|
||
|
|
|
||
|
|
print(f"[{ts()}] [{idx+1:4d}/{total}] {name[:35]:35s}", end=' ', flush=True)
|
||
|
|
|
||
|
|
await get_vessel_detail(page, sid, vdata)
|
||
|
|
|
||
|
|
mmsi = vdata.get('mmsi', '')
|
||
|
|
owner = vdata.get('owner', '')
|
||
|
|
print(f"MMSI={mmsi or '?':12s} owner={owner[:30] if owner else '?'}")
|
||
|
|
|
||
|
|
# Save to DB if we have MMSI
|
||
|
|
if mmsi:
|
||
|
|
try:
|
||
|
|
companies = vdata.get('companies', [])
|
||
|
|
db_upsert_vessel(conn, {
|
||
|
|
'mmsi': mmsi,
|
||
|
|
'name': vdata.get('name'),
|
||
|
|
'flag': vdata.get('flag'),
|
||
|
|
'type_category': 'bulk',
|
||
|
|
'deadweight': vdata.get('dwt'),
|
||
|
|
'year_built': vdata.get('year_built'),
|
||
|
|
'imo': vdata.get('imo'),
|
||
|
|
'owner': vdata.get('owner'),
|
||
|
|
'operator': vdata.get('operator'),
|
||
|
|
'companies_json': json.dumps(companies) if companies else None,
|
||
|
|
'lat': vdata.get('lat'),
|
||
|
|
'lon': vdata.get('lon'),
|
||
|
|
})
|
||
|
|
db_upsert_position(conn, mmsi,
|
||
|
|
vdata.get('lat'), vdata.get('lon'),
|
||
|
|
vdata.get('speed'), vdata.get('course'),
|
||
|
|
vdata.get('destination'))
|
||
|
|
saved_count += 1
|
||
|
|
except Exception as e:
|
||
|
|
print(f" DB error: {e}")
|
||
|
|
|
||
|
|
done_ids.add(sid)
|
||
|
|
cp['vessels'][sid] = vdata
|
||
|
|
cp['phase2_done_ids'] = list(done_ids)
|
||
|
|
|
||
|
|
# Save checkpoint every 20 vessels
|
||
|
|
if (idx + 1) % 20 == 0:
|
||
|
|
save_checkpoint(cp)
|
||
|
|
print(f"[{ts()}] Checkpoint saved. DB rows written: {saved_count}")
|
||
|
|
|
||
|
|
await asyncio.sleep(3) # Rate limit: ~20 vessels/min
|
||
|
|
|
||
|
|
save_checkpoint(cp)
|
||
|
|
print(f"\n[{ts()}] PHASE 2 DONE. DB rows written: {saved_count}")
|
||
|
|
|
||
|
|
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
# Main
|
||
|
|
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
async def main(run_phase1=True, run_phase2=True):
|
||
|
|
from playwright.async_api import async_playwright
|
||
|
|
|
||
|
|
cp = load_checkpoint()
|
||
|
|
print(f"[{ts()}] Checkpoint: phase1_done={cp['phase1_done']}, "
|
||
|
|
f"vessels={len(cp['vessels'])}, "
|
||
|
|
f"phase2_done={len(cp.get('phase2_done_ids',[]))}")
|
||
|
|
|
||
|
|
# Check SSH tunnel / DB connection
|
||
|
|
conn = None
|
||
|
|
if run_phase2:
|
||
|
|
try:
|
||
|
|
conn = db_connect()
|
||
|
|
conn.cursor().execute('SELECT 1')
|
||
|
|
print(f"[{ts()}] DB connection: OK")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[{ts()}] DB connection FAILED: {e}")
|
||
|
|
print(f"[{ts()}] Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
|
||
|
|
if run_phase1:
|
||
|
|
print(f"[{ts()}] Continuing with Phase 1 only (no DB writes)...")
|
||
|
|
run_phase2 = False
|
||
|
|
conn = None
|
||
|
|
else:
|
||
|
|
return
|
||
|
|
|
||
|
|
async with async_playwright() as p:
|
||
|
|
browser = await p.chromium.launch(
|
||
|
|
headless=False,
|
||
|
|
args=['--no-sandbox', '--disable-blink-features=AutomationControlled']
|
||
|
|
)
|
||
|
|
context = await browser.new_context(
|
||
|
|
viewport={'width': 1280, 'height': 900},
|
||
|
|
user_agent=('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
||
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
||
|
|
'Chrome/121.0.0.0 Safari/537.36'),
|
||
|
|
)
|
||
|
|
page = await context.new_page()
|
||
|
|
|
||
|
|
logged_in = await do_login(page)
|
||
|
|
if not logged_in:
|
||
|
|
print(f"[{ts()}] LOGIN FAILED — aborting")
|
||
|
|
await browser.close()
|
||
|
|
return
|
||
|
|
|
||
|
|
# Let session stabilize
|
||
|
|
await asyncio.sleep(5)
|
||
|
|
|
||
|
|
if run_phase1 and not cp.get('phase1_done'):
|
||
|
|
await phase1(page, cp)
|
||
|
|
elif run_phase1 and cp.get('phase1_done'):
|
||
|
|
print(f"[{ts()}] Phase 1 already complete ({len(cp['vessels'])} vessels). Skipping.")
|
||
|
|
|
||
|
|
if run_phase2:
|
||
|
|
await phase2(page, cp, conn)
|
||
|
|
|
||
|
|
await browser.close()
|
||
|
|
|
||
|
|
if conn:
|
||
|
|
conn.close()
|
||
|
|
print(f"\n[{ts()}] ALL DONE. Total vessels in checkpoint: {len(cp['vessels'])}")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
ap = argparse.ArgumentParser()
|
||
|
|
ap.add_argument('--phase1', action='store_true', help='Phase 1 only (tile collection)')
|
||
|
|
ap.add_argument('--phase2', action='store_true', help='Phase 2 only (vessel details)')
|
||
|
|
ap.add_argument('--reset', action='store_true', help='Clear checkpoint and restart')
|
||
|
|
args = ap.parse_args()
|
||
|
|
|
||
|
|
if args.reset and os.path.exists(CHECKPOINT):
|
||
|
|
os.remove(CHECKPOINT)
|
||
|
|
print("Checkpoint cleared.")
|
||
|
|
|
||
|
|
p1 = not args.phase2 # default: run phase1 unless --phase2 only
|
||
|
|
p2 = not args.phase1 # default: run phase2 unless --phase1 only
|
||
|
|
|
||
|
|
asyncio.run(main(run_phase1=p1, run_phase2=p2))
|