#!/usr/bin/env python3 """ MT Bulk Carrier Database Scraper — Async Playwright v1 ======================================================= Uses page.on('response') to capture tile API data (CF-safe: browser's own requests). Phase 1 : Navigate all 104 grid areas → collect bulk carrier SHIP_IDs + positions Phase 2 : Visit vessel detail pages → MMSI + ownership (Beneficial Owner, etc.) Phase 3 : Save to PostgreSQL production DB via SSH tunnel Usage: python mt_bulk_scraper.py # full run python mt_bulk_scraper.py --phase1 # tile collection only python mt_bulk_scraper.py --phase2 # detail pages only (after phase1) python mt_bulk_scraper.py --reset # clear checkpoint and restart """ import asyncio, json, time, base64, struct, hmac, hashlib, math import os, sys, re, argparse from datetime import datetime os.chdir(os.path.dirname(os.path.abspath(__file__))) if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace') # ── Credentials ────────────────────────────────────────────────────────────── MT_EMAIL = "operation@mrlogisticcorp.com" MT_PASS = "NKh9i8Z!7fU9jfi" MT_TOTP = "MNWTEPTFJZBUC32GJFEWY6LVKQ2GGYKH" # ── DB (SSH tunnel must be running: ssh -L 15432:127.0.0.1:5432 root@89.19.208.158) ── DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db' # ── Files ───────────────────────────────────────────────────────────────────── CHECKPOINT = 'mt_bulk_v2_checkpoint.json' # ── Filtering: which tile SHIPTYPE values are bulk carriers? ───────────────── # From tile analysis: SHIPTYPE "7" = bulk/cargo, "8" = mixed (tanker+bulk) # Include both for first pass; DWT threshold narrows it further. BULK_TYPES = {'4', '5', '7'} # definitely cargo/bulk EXTRA_TYPES = {'8', '9', '6'} # mixed — require higher DWT MIN_DWT_BULK = 5_000 # t, minimum for any bulk carrier MIN_DWT_EXTRA = 20_000 # t, for EXTRA_TYPES # ── Grid: 104 points ordered Caspian → Black Sea → Med → Baltic → ... ─────── ZOOM = 9 GRID = [ # 1. Caspian Sea (40.0, 50.0), (41.5, 50.5), (43.5, 51.5), (45.0, 52.0), (47.0, 51.5), # 2. Black Sea (43.0, 33.0), (44.5, 33.5), (45.5, 30.0), (43.0, 40.5), (42.5, 37.5), (41.5, 29.0), (46.3, 30.7), # 3. Mediterranean (36.0, 14.0), (37.5, 23.0), (35.5, 28.0), (35.5, 10.0), (37.0, 6.0), (36.5, -5.0), (38.0, 3.0), (38.5, 16.0), (37.0, 21.0), (36.5, 29.0), (37.5, 34.0), # 4. Baltic (55.5, 18.0), (57.0, 20.0), (59.5, 24.0), (60.0, 25.5), (57.0, 18.0), (56.0, 12.0), (55.0, 14.0), (56.0, 10.0), (57.5, 12.0), (59.0, 18.0), (60.5, 27.0), # 5. North Sea / English Channel (51.9, 4.5), (53.5, 3.5), (56.0, 4.0), (51.0, 2.0), (51.2, 1.0), # 6. UK / Irish Sea (51.5, -3.0), (53.5, -4.0), (56.0, -5.0), (58.0, -3.0), # 7. Atlantic European coasts (48.5, -5.0), (44.5, -3.0), (41.2, -9.0), (36.5, -8.5), # 8. Red Sea + Suez (27.0, 34.0), (22.0, 38.0), (15.0, 42.0), (12.5, 44.0), (11.0, 43.5), (30.5, 32.5), # 9. Persian Gulf (26.0, 56.0), (24.5, 55.0), (27.5, 51.0), (29.0, 48.0), (25.0, 53.0), # 10. Indian Ocean (10.0, 65.0), (5.0, 73.0), (-5.0, 65.0), (-15.0, 45.0), (20.0, 65.0), (-25.0, 43.5), (7.0, 58.0), # 11. SE Asia (1.5, 103.8), (10.0, 108.0), (5.5, 100.0), (-7.0, 112.0), (15.0, 108.0), (-4.0, 115.0), # 12. East Asia (30.0, 122.0), (35.0, 129.0), (34.0, 119.0), (37.5, 121.5), (32.0, 125.0), (35.5, 136.5), # 13. Australia (-33.9, 151.2), (-31.9, 115.8), (-23.0, 150.5), (-20.0, 118.5), (-38.0, 145.0), (-34.5, 138.5), # 14. US East + Gulf (29.5, -94.5), (28.0, -90.5), (30.0, -81.5), (37.0, -76.0), (40.7, -74.0), (43.0, -71.0), # 15. US West (37.5, -122.5), (33.7, -118.3), (48.5, -123.5), # 16. South America (-34.0, -70.5), (-23.0, -43.0), (-33.5, -71.5), (-2.5, -44.5), (8.0, -77.0), (-3.0, -59.5), # 17. Africa (-34.0, 18.5), (-4.0, 39.5), (6.5, 2.5), (-26.0, 33.0), (30.0, 32.5), (5.0, -1.0), (14.0, -17.0), # 18. Great Lakes / Canada (43.0, -79.0), (46.5, -84.0), (45.5, -73.5), ] # ───────────────────────────────────────────────────────────────────────────── # Utilities # ───────────────────────────────────────────────────────────────────────────── def totp(secret): s = secret.upper().replace(' ', '') pad = (-len(s)) % 8 key = base64.b32decode(s + '=' * pad) counter = int(time.time()) // 30 msg = struct.pack('>Q', counter) h = hmac.new(key, msg, hashlib.sha1).digest() offset = h[-1] & 0x0f code = struct.unpack('>I', h[offset:offset+4])[0] & 0x7fffffff return str(code % 1_000_000).zfill(6) def ts(): return datetime.now().strftime('%H:%M:%S') def load_checkpoint(): if os.path.exists(CHECKPOINT): with open(CHECKPOINT) as f: return json.load(f) return { 'phase1_done': False, 'last_grid_idx': -1, 'vessels': {}, # ship_id -> {lat, lon, name, flag, dwt, shiptype, gt_shiptype} 'phase2_done_ids': [], 'started_at': datetime.now().isoformat(), } def save_checkpoint(cp): with open(CHECKPOINT, 'w') as f: json.dump(cp, f, indent=2) def is_bulk(row): """Return True if tile row is likely a bulk carrier.""" st = str(row.get('SHIPTYPE', '')) dwt = int(row.get('DWT', 0) or 0) if st in BULK_TYPES and dwt >= MIN_DWT_BULK: return True if st in EXTRA_TYPES and dwt >= MIN_DWT_EXTRA: return True return False # ───────────────────────────────────────────────────────────────────────────── # Database helpers # ───────────────────────────────────────────────────────────────────────────── def db_connect(): return __import__('psycopg2').connect(DB_URL) def db_upsert_vessel(conn, data: dict): """Insert or update vessel row. data keys match columns.""" cols = ['mmsi', 'name', 'flag', 'type_category', 'deadweight', 'year_built', 'imo', 'owner', 'operator', 'companies_json', 'website', 'source', 'lat', 'lon'] vals = {c: data.get(c) for c in cols} vals['source'] = 'mt_pro_scraper' if not vals.get('mmsi'): return with conn.cursor() as cur: cur.execute(""" INSERT INTO vessels (mmsi, name, flag, type_category, deadweight, year_built, imo, owner, operator, companies_json, website, source, lat, lon, updated_at) VALUES (%(mmsi)s, %(name)s, %(flag)s, %(type_category)s, %(deadweight)s, %(year_built)s, %(imo)s, %(owner)s, %(operator)s, %(companies_json)s, %(website)s, %(source)s, %(lat)s, %(lon)s, NOW()) ON CONFLICT (mmsi) DO UPDATE SET name = COALESCE(EXCLUDED.name, vessels.name), flag = COALESCE(EXCLUDED.flag, vessels.flag), type_category= COALESCE(EXCLUDED.type_category,vessels.type_category), deadweight = COALESCE(EXCLUDED.deadweight, vessels.deadweight), year_built = COALESCE(EXCLUDED.year_built, vessels.year_built), imo = COALESCE(EXCLUDED.imo, vessels.imo), owner = COALESCE(EXCLUDED.owner, vessels.owner), operator = COALESCE(EXCLUDED.operator, vessels.operator), companies_json = COALESCE(EXCLUDED.companies_json, vessels.companies_json), website = COALESCE(EXCLUDED.website, vessels.website), source = 'mt_pro_scraper', lat = COALESCE(EXCLUDED.lat, vessels.lat), lon = COALESCE(EXCLUDED.lon, vessels.lon), updated_at = NOW() """, vals) conn.commit() def db_upsert_position(conn, mmsi, lat, lon, speed, course, destination): """Insert or update position row.""" try: with conn.cursor() as cur: cur.execute(""" INSERT INTO positions (mmsi, lat, lon, speed, course, destination, timestamp, source) VALUES (%s, %s, %s, %s, %s, %s, NOW(), 'mt_tile') ON CONFLICT (mmsi) DO UPDATE SET lat = EXCLUDED.lat, lon = EXCLUDED.lon, speed = EXCLUDED.speed, course = EXCLUDED.course, destination = EXCLUDED.destination, timestamp = NOW(), source = 'mt_tile' """, (mmsi, lat, lon, speed, course, destination)) conn.commit() except Exception as e: try: conn.rollback() except: pass # ───────────────────────────────────────────────────────────────────────────── # Playwright helpers # ───────────────────────────────────────────────────────────────────────────── async def do_login(page): print(f"[{ts()}] LOGIN: navigating to MT...") await page.goto('https://www.marinetraffic.com/en/users/login', wait_until='domcontentloaded', timeout=30000) await asyncio.sleep(3) await page.fill('input[name="username"]', MT_EMAIL) await page.click('button[type="submit"]') await asyncio.sleep(3) await page.fill('input[type="password"]', MT_PASS) await page.click('button[type="submit"]') await asyncio.sleep(4) if 'mfa-login-options' in page.url: print(f"[{ts()}] LOGIN: 2FA screen...") await page.click('button:has-text("Google Authenticator")') await asyncio.sleep(3) otp = totp(MT_TOTP) print(f"[{ts()}] LOGIN: TOTP={otp}") await page.fill('input[name="code"]', otp) await page.click('button[type="submit"]') await asyncio.sleep(5) logged = 'marinetraffic.com' in page.url and 'auth.kpler' not in page.url print(f"[{ts()}] LOGIN: {'OK' if logged else 'FAILED'} | {page.url[:80]}") return logged # ───────────────────────────────────────────────────────────────────────────── # PHASE 1: Tile collection # ───────────────────────────────────────────────────────────────────────────── async def phase1(page, cp): """Navigate all grid areas and collect bulk carrier SHIP_IDs from tiles.""" print(f"\n[{ts()}] ===== PHASE 1: Grid traversal ({len(GRID)} areas) =====") current_tiles = [] new_vessels = 0 async def on_tile(response): if 'getData/get_data_json_4' not in response.url: return try: body = await response.body() parsed = json.loads(body) rows = parsed.get('data', {}).get('rows', []) current_tiles.extend(rows) except Exception: pass page.on('response', on_tile) start_idx = cp.get('last_grid_idx', -1) + 1 for i, (lat, lon) in enumerate(GRID): if i < start_idx: continue current_tiles.clear() area_url = (f'https://www.marinetraffic.com/en/ais/home' f'/centerx:{lon}/centery:{lat}/zoom:{ZOOM}') print(f"[{ts()}] Grid {i+1:3d}/{len(GRID)} ({lat:.1f},{lon:.1f})", end=' ', flush=True) try: await page.goto(area_url, wait_until='load', timeout=35000) except Exception as e: print(f"goto error: {e}") await asyncio.sleep(5) try: await page.goto(area_url, wait_until='domcontentloaded', timeout=35000) except Exception: pass # Wait for tiles to arrive await asyncio.sleep(12) # Process captured tile rows area_new = 0 for row in current_tiles: if not is_bulk(row): continue sid = str(row.get('SHIP_ID', '')) if not sid: continue if sid not in cp['vessels']: cp['vessels'][sid] = { 'name': row.get('SHIPNAME', ''), 'flag': row.get('FLAG', ''), 'dwt': int(row.get('DWT', 0) or 0), 'shiptype': str(row.get('SHIPTYPE', '')), 'gt_shiptype':str(row.get('GT_SHIPTYPE', '')), 'lat': row.get('LAT'), 'lon': row.get('LON'), 'speed': row.get('SPEED'), 'course': row.get('COURSE'), 'destination':row.get('DESTINATION', ''), 'mmsi': None, 'imo': None, 'owner': None, 'operator': None, 'companies': None, 'grid_idx': i, } area_new += 1 new_vessels += 1 total = len(cp['vessels']) print(f"tiles={len(current_tiles):5d} bulk_new={area_new:4d} total={total:6d}") cp['last_grid_idx'] = i save_checkpoint(cp) await asyncio.sleep(2) page.remove_listener('response', on_tile) cp['phase1_done'] = True save_checkpoint(cp) print(f"\n[{ts()}] PHASE 1 DONE. Total bulk carriers found: {len(cp['vessels'])}") # ───────────────────────────────────────────────────────────────────────────── # PHASE 2: Vessel detail pages # ───────────────────────────────────────────────────────────────────────────── def parse_ownership_html(html: str) -> dict: """ Parse MT vessel detail page HTML for ownership fields. Returns dict with keys: mmsi, imo, year_built, owner, operator, registered_owner, commercial_manager, companies (list) """ from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'html.parser') result = {} # MMSI — appears in page text and meta mmsi_m = re.search(r'"MMSI"\s*:\s*"?(\d{9})"?', html) if not mmsi_m: mmsi_m = re.search(r'mmsi["\s:=]+(\d{9})', html, re.I) if mmsi_m: result['mmsi'] = mmsi_m.group(1) # IMO imo_m = re.search(r'"IMO"\s*:\s*"?(\d{7})"?', html) if not imo_m: imo_m = re.search(r'imo["\s:=]+(\d{7})', html, re.I) if imo_m: result['imo'] = imo_m.group(1) # Year built yr_m = re.search(r'"year_built"\s*:\s*"?(\d{4})"?', html, re.I) if not yr_m: yr_m = re.search(r'year.?built["\s:=]+(\d{4})', html, re.I) if yr_m: result['year_built'] = int(yr_m.group(1)) # Ownership roles role_map = { 'beneficial owner': 'beneficial_owner', 'registered owner': 'registered_owner', 'commercial manager': 'commercial_manager', 'ship manager': 'commercial_manager', 'operator': 'operator', 'disponent owner': 'operator', 'technical manager': 'commercial_manager', } companies = [] # Try structured ownership table rows for row in soup.select('tr, .company-row, [class*="ownership"]'): text = row.get_text(' ', strip=True).lower() for role_key, field in role_map.items(): if role_key in text: # Company name is the link text or next cell links = row.select('a') if links: name = links[0].get_text(strip=True) if name and len(name) > 2: result[field] = name companies.append({'role': role_key, 'name': name, 'href': links[0].get('href', '')}) break # Fallback: regex for common ownership JSON in page scripts for pattern in [ r'"beneficial_owner"\s*:\s*"([^"]+)"', r'"Beneficial Owner"\s*:\s*"([^"]+)"', r'Beneficial Owner[^<]{0,60}<[^>]+>([^<]{3,80})<', ]: m = re.search(pattern, html, re.I) if m and 'beneficial_owner' not in result: result['beneficial_owner'] = m.group(1).strip() for pattern in [ r'"operator"\s*:\s*"([^"]+)"', r'"Operator"\s*:\s*"([^"]+)"', r'Operator[^<]{0,60}<[^>]+>([^<]{3,80})<', ]: m = re.search(pattern, html, re.I) if m and 'operator' not in result: result['operator'] = m.group(1).strip() # Derive 'owner' field for DB (prefer beneficial_owner, then registered_owner) result['owner'] = (result.get('beneficial_owner') or result.get('registered_owner') or result.get('operator')) result['companies'] = companies return result async def get_vessel_detail(page, ship_id: str, vessel_data: dict) -> dict: """ Navigate to MT vessel detail page, extract MMSI + ownership. Updates vessel_data dict in place, returns it. """ url = f'https://www.marinetraffic.com/en/ais/details/ships/ship_id:{ship_id}' # Capture any JSON API responses from this page detail_jsons = [] async def on_detail_resp(response): resp_url = response.url # Skip static assets if any(ext in resp_url for ext in ['.js', '.css', '.png', '.jpg', '.svg']): return # Capture JSON API responses ct = response.headers.get('content-type', '') if 'json' in ct or 'getData' in resp_url or '/en/reports' in resp_url: try: body = await response.body() text = body.decode('utf-8', errors='replace') if text.startswith('{') or text.startswith('['): detail_jsons.append({'url': resp_url, 'body': text}) except Exception: pass page.on('response', on_detail_resp) try: await page.goto(url, wait_until='load', timeout=30000) await asyncio.sleep(5) # MMSI from URL redirect (MT redirects to /mmsi:{MMSI}/...) final_url = page.url mmsi_m = re.search(r'/mmsi[:/](\d{9})', final_url, re.I) if mmsi_m: vessel_data['mmsi'] = mmsi_m.group(1) # Parse HTML html = await page.content() parsed = parse_ownership_html(html) if not vessel_data.get('mmsi') and parsed.get('mmsi'): vessel_data['mmsi'] = parsed['mmsi'] if parsed.get('imo'): vessel_data['imo'] = parsed['imo'] if parsed.get('year_built'): vessel_data['year_built'] = parsed['year_built'] if parsed.get('owner'): vessel_data['owner'] = parsed['owner'] if parsed.get('operator'): vessel_data['operator'] = parsed['operator'] if parsed.get('companies'): vessel_data['companies'] = parsed['companies'] # Also scan captured JSON responses for ownership data for item in detail_jsons: try: j = json.loads(item['body']) if isinstance(j, dict): # Try direct keys for key in ('MMSI', 'mmsi'): if j.get(key) and not vessel_data.get('mmsi'): vessel_data['mmsi'] = str(j[key]) for key in ('IMO', 'imo'): if j.get(key) and not vessel_data.get('imo'): vessel_data['imo'] = str(j[key]) # Ownership from JSON for key in ('MANAGER_OWNER', 'owner', 'beneficial_owner'): if j.get(key) and not vessel_data.get('owner'): vessel_data['owner'] = str(j[key]) except Exception: pass except Exception as e: print(f" detail_error={e}") finally: page.remove_listener('response', on_detail_resp) return vessel_data async def phase2(page, cp, conn): """Visit vessel detail pages for all ships without MMSI.""" ship_ids = [ sid for sid, v in cp['vessels'].items() if not v.get('mmsi') and sid not in cp.get('phase2_done_ids', []) ] total = len(ship_ids) print(f"\n[{ts()}] ===== PHASE 2: Vessel details ({total} ships to process) =====") done_ids = set(cp.get('phase2_done_ids', [])) saved_count = 0 for idx, sid in enumerate(ship_ids): vdata = cp['vessels'][sid] name = vdata.get('name', sid) print(f"[{ts()}] [{idx+1:4d}/{total}] {name[:35]:35s}", end=' ', flush=True) await get_vessel_detail(page, sid, vdata) mmsi = vdata.get('mmsi', '') owner = vdata.get('owner', '') print(f"MMSI={mmsi or '?':12s} owner={owner[:30] if owner else '?'}") # Save to DB if we have MMSI if mmsi: try: companies = vdata.get('companies', []) db_upsert_vessel(conn, { 'mmsi': mmsi, 'name': vdata.get('name'), 'flag': vdata.get('flag'), 'type_category': 'bulk', 'deadweight': vdata.get('dwt'), 'year_built': vdata.get('year_built'), 'imo': vdata.get('imo'), 'owner': vdata.get('owner'), 'operator': vdata.get('operator'), 'companies_json': json.dumps(companies) if companies else None, 'lat': vdata.get('lat'), 'lon': vdata.get('lon'), }) db_upsert_position(conn, mmsi, vdata.get('lat'), vdata.get('lon'), vdata.get('speed'), vdata.get('course'), vdata.get('destination')) saved_count += 1 except Exception as e: print(f" DB error: {e}") done_ids.add(sid) cp['vessels'][sid] = vdata cp['phase2_done_ids'] = list(done_ids) # Save checkpoint every 20 vessels if (idx + 1) % 20 == 0: save_checkpoint(cp) print(f"[{ts()}] Checkpoint saved. DB rows written: {saved_count}") await asyncio.sleep(3) # Rate limit: ~20 vessels/min save_checkpoint(cp) print(f"\n[{ts()}] PHASE 2 DONE. DB rows written: {saved_count}") # ───────────────────────────────────────────────────────────────────────────── # Main # ───────────────────────────────────────────────────────────────────────────── async def main(run_phase1=True, run_phase2=True): from playwright.async_api import async_playwright cp = load_checkpoint() print(f"[{ts()}] Checkpoint: phase1_done={cp['phase1_done']}, " f"vessels={len(cp['vessels'])}, " f"phase2_done={len(cp.get('phase2_done_ids',[]))}") # Check SSH tunnel / DB connection conn = None if run_phase2: try: conn = db_connect() conn.cursor().execute('SELECT 1') print(f"[{ts()}] DB connection: OK") except Exception as e: print(f"[{ts()}] DB connection FAILED: {e}") print(f"[{ts()}] Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158") if run_phase1: print(f"[{ts()}] Continuing with Phase 1 only (no DB writes)...") run_phase2 = False conn = None else: return async with async_playwright() as p: browser = await p.chromium.launch( headless=False, args=['--no-sandbox', '--disable-blink-features=AutomationControlled'] ) context = await browser.new_context( viewport={'width': 1280, 'height': 900}, user_agent=('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/121.0.0.0 Safari/537.36'), ) page = await context.new_page() logged_in = await do_login(page) if not logged_in: print(f"[{ts()}] LOGIN FAILED — aborting") await browser.close() return # Let session stabilize await asyncio.sleep(5) if run_phase1 and not cp.get('phase1_done'): await phase1(page, cp) elif run_phase1 and cp.get('phase1_done'): print(f"[{ts()}] Phase 1 already complete ({len(cp['vessels'])} vessels). Skipping.") if run_phase2: await phase2(page, cp, conn) await browser.close() if conn: conn.close() print(f"\n[{ts()}] ALL DONE. Total vessels in checkpoint: {len(cp['vessels'])}") if __name__ == '__main__': ap = argparse.ArgumentParser() ap.add_argument('--phase1', action='store_true', help='Phase 1 only (tile collection)') ap.add_argument('--phase2', action='store_true', help='Phase 2 only (vessel details)') ap.add_argument('--reset', action='store_true', help='Clear checkpoint and restart') args = ap.parse_args() if args.reset and os.path.exists(CHECKPOINT): os.remove(CHECKPOINT) print("Checkpoint cleared.") p1 = not args.phase2 # default: run phase1 unless --phase2 only p2 = not args.phase1 # default: run phase2 unless --phase1 only asyncio.run(main(run_phase1=p1, run_phase2=p2))