#!/usr/bin/env python3 """ MT Position Updater — Refresh vessel positions from FREE MarineTraffic (no login). Uses curl_cffi (TLS fingerprint bypass) to fetch tile data directly — no browser needed. Updates lat/lon/speed/course/scraped_at in mt_bulk_staging. Auto-discovers new cargo/tanker vessels not in our DB. Free MT API limit: z:2-4 only (z:5+ requires Pro login). z:4 = 256 tiles, ~31K vessels, ~2 min — optimal for full world. z:3 = 64 tiles, ~15K vessels, ~26 sec — good for priority quick scan. Usage: python mt_position_updater.py # Full world scan (z:4, ~2 min) python mt_position_updater.py --probe # Test 20 tiles python mt_position_updater.py --full # Same as default python mt_position_updater.py --loop # Continuous: every 6h """ import json, sys, os, time, argparse, math import psycopg2 os.chdir(os.path.dirname(os.path.abspath(__file__))) if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True) if hasattr(sys.stderr, 'reconfigure'): sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True) try: from curl_cffi import requests as cf_requests except ImportError: print("FATAL: curl_cffi not installed. Run: pip install curl_cffi", flush=True) sys.exit(1) IS_SERVER = sys.platform == 'linux' DB_URL = os.environ.get('DATABASE_URL') or ( 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:5432/seafare_db' if IS_SERVER else 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db' ) SCAN_ZOOM = 4 # z:4 = 256 tiles, ~31K vessels, ~2 min (max for free MT API) UPDATE_INTERVAL_HOURS = 1 # GT_SHIPTYPE codes for cargo + tanker CARGO_GT_TYPES = { '6', '9', '11', '12', '21', '70', '122', # Cargo (green) '4', '17', '18', '19', '56', '71', '88', # Tanker (red) } CARGO_TYPE_KEYWORDS = { 'bulk', 'cargo', 'container', 'tanker', 'chemical', 'oil', 'lpg', 'lng', 'reefer', 'ro-ro', 'roro', 'multipurpose', 'general', 'ore', 'cement', 'vehicle', 'barge', 'heavy load', 'fpso', 'fso', 'bitumen', 'asphalt', 'product', } # ---- TILE SYSTEM ---- def lat_lon_to_tile(lat, lon, zoom): """Convert lat/lon to slippy map tile X/Y.""" n = 2 ** zoom x = int((lon + 180.0) / 360.0 * n) y = int((1.0 - math.log(math.tan(math.radians(lat)) + 1.0 / math.cos(math.radians(lat))) / math.pi) / 2.0 * n) return max(0, min(x, n - 1)), max(0, min(y, n - 1)) def fetch_tile(z, x, y): """Fetch single MT tile via curl_cffi. Returns list of raw vessel dicts.""" url = f'https://www.marinetraffic.com/getData/get_data_json_4/z:{z}/X:{x}/Y:{y}/station:0' headers = { 'Referer': f'https://www.marinetraffic.com/en/ais/home/centerx:0/centery:0/zoom:{z}', 'X-Requested-With': 'XMLHttpRequest', } resp = cf_requests.get(url, headers=headers, impersonate='chrome', timeout=15) if resp.status_code == 200: data = resp.json() if isinstance(data, dict): rows = data.get('data', data) if isinstance(rows, dict): rows = rows.get('rows', []) return rows if isinstance(rows, list) else [] return [] def generate_all_tiles(zoom=SCAN_ZOOM): """All tiles at given zoom. z:4 = 256 tiles, ~31K vessels, ~2 min.""" tiles = [] n = 2 ** zoom for x in range(n): for y in range(n): tiles.append((zoom, x, y, "world")) return tiles # ---- VESSEL CLASSIFICATION ---- def is_cargo_vessel(row): gt = str(row.get('GT_SHIPTYPE', '') or '') if gt in CARGO_GT_TYPES: return True st = str(row.get('SHIPTYPE', '') or '').lower() return any(kw in st for kw in CARGO_TYPE_KEYWORDS) def classify_vessel(row): gt = str(row.get('GT_SHIPTYPE', '') or '') if gt == '6': return 'bulk' elif gt == '11': return 'container' elif gt == '12': return 'roro' elif gt in ('4', '17', '18', '19', '56', '71', '88'): return 'tanker' elif gt in ('9', '70', '122', '21'): return 'general' st = str(row.get('SHIPTYPE', '') or '').lower() if 'bulk' in st: return 'bulk' elif 'container' in st: return 'container' elif any(k in st for k in ('tanker', 'oil', 'chemical', 'lpg', 'lng')): return 'tanker' elif any(k in st for k in ('ro-ro', 'roro', 'vehicle')): return 'roro' return 'general' # ---- DB helpers ---- def db_connect(): return psycopg2.connect( DB_URL, connect_timeout=15, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5 ) def _restart_ssh_tunnel(): """Restart SSH tunnel (Windows only, no-op on server).""" if IS_SERVER: return import subprocess try: subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5) except Exception: pass time.sleep(2) try: subprocess.Popen( ['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120', '-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no', '-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) print(f" [SSH] Tunnel restarted, waiting 5s...", flush=True) time.sleep(5) except Exception as e: print(f" [SSH] Failed: {e}", flush=True) def db_reconnect(conn): try: conn.close() except Exception: pass for attempt in range(10): try: time.sleep(3) c = db_connect() print(f" [DB] Reconnected (attempt {attempt+1})", flush=True) return c, c.cursor() except Exception as e: print(f" [DB] Attempt {attempt+1} failed: {e}", flush=True) if not IS_SERVER and attempt in (4, 8): print(f" [DB] Restarting SSH tunnel...", flush=True) _restart_ssh_tunnel() raise Exception("DB reconnect failed") def db_safe_execute(conn, cur, query, params=None): try: cur.execute(query, params) return conn, cur except (psycopg2.InterfaceError, psycopg2.OperationalError) as e: print(f" [DB] Lost on execute ({e}), reconnecting...", flush=True) conn, cur = db_reconnect(conn) cur.execute(query, params) return conn, cur except psycopg2.Error: try: conn.rollback() except Exception: pass raise # ---- MAIN SCAN ---- def run_scan(tiles, conn, cur, existing_ids, mode="priority"): """Scan tiles via curl_cffi, update positions in DB. Returns (updated, discovered).""" total_tiles = len(tiles) updated = 0 discovered = 0 errors = 0 t_start = time.time() print(f"\n[{mode.upper()}] Scanning {total_tiles} tiles...", flush=True) for tile_idx, (z, x, y, label) in enumerate(tiles): # Fetch tile try: rows = fetch_tile(z, x, y) except Exception as e: errors += 1 if errors <= 5: print(f" WARN: tile z:{z}/X:{x}/Y:{y} error: {e}", flush=True) continue # Process vessels tile_updated = 0 tile_discovered = 0 for row in rows: if not is_cargo_vessel(row): continue ship_id = str(row.get('SHIP_ID', '') or '') if not ship_id: continue v_lat = row.get('LAT') v_lon = row.get('LON') v_speed_raw = row.get('SPEED') v_speed = round(float(v_speed_raw) / 10.0, 1) if v_speed_raw is not None else None v_course = row.get('COURSE') v_name = row.get('SHIPNAME', '') v_gt = str(row.get('GT_SHIPTYPE', '') or '') v_type_cat = classify_vessel(row) if ship_id in existing_ids: # UPDATE existing vessel position try: conn, cur = db_safe_execute(conn, cur, """ UPDATE mt_bulk_staging SET lat = %s, lon = %s, speed = %s, course = %s, scraped_at = NOW() WHERE ship_id = %s """, (v_lat, v_lon, v_speed, v_course, ship_id)) conn.commit() tile_updated += 1 except Exception: try: conn.rollback() except Exception: pass else: # DISCOVER new vessel try: conn, cur = db_safe_execute(conn, cur, """ INSERT INTO mt_bulk_staging (ship_id, name, lat, lon, speed, course, gt_shiptype, type_category, scraped_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (ship_id) DO UPDATE SET lat = EXCLUDED.lat, lon = EXCLUDED.lon, speed = EXCLUDED.speed, course = EXCLUDED.course, scraped_at = NOW() """, (ship_id, v_name, v_lat, v_lon, v_speed, v_course, v_gt, v_type_cat)) conn.commit() existing_ids.add(ship_id) tile_discovered += 1 except Exception: try: conn.rollback() except Exception: pass updated += tile_updated discovered += tile_discovered # Progress every 50 tiles or when new vessels found if (tile_idx + 1) % 50 == 0 or tile_discovered > 0: elapsed = time.time() - t_start remaining = total_tiles - tile_idx - 1 eta = (elapsed / (tile_idx + 1)) * remaining if tile_idx > 0 else 0 print(f" [{tile_idx+1}/{total_tiles}] {label} z:{z} | " f"upd={tile_updated} new={tile_discovered} | " f"total: upd={updated} new={discovered} err={errors} | " f"{elapsed:.0f}s | ETA: {eta:.0f}s", flush=True) elapsed = time.time() - t_start print(f"\n{'='*60}", flush=True) print(f"[{mode.upper()}] SCAN COMPLETE — {elapsed:.0f}s ({elapsed/60:.1f} min)", flush=True) print(f" Tiles: {total_tiles} | Errors: {errors}", flush=True) print(f" Positions updated: {updated}", flush=True) print(f" New vessels discovered: {discovered}", flush=True) print(f"{'='*60}\n", flush=True) return updated, discovered def main(): parser = argparse.ArgumentParser(description='MT Position Updater (curl_cffi)') parser.add_argument('--probe', action='store_true', help='Test 20 tiles') parser.add_argument('--full', action='store_true', help='Full world scan (z:4)') parser.add_argument('--loop', action='store_true', help='Continuous: full scan every 6h') parser.add_argument('--interval', type=float, default=UPDATE_INTERVAL_HOURS, help='Hours between scans in loop mode (default 6)') args = parser.parse_args() # DB connection try: conn = db_connect() cur = conn.cursor() cur.execute('SELECT count(*) FROM mt_bulk_staging') existing_count = cur.fetchone()[0] cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL') existing_ids = set(str(r[0]) for r in cur.fetchall()) print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids") except Exception as e: print(f"DB ERROR: {e}") if not IS_SERVER: print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158") return if args.probe: tiles = generate_all_tiles()[:20] print(f"PROBE: {len(tiles)} tiles (z:{SCAN_ZOOM})") run_scan(tiles, conn, cur, existing_ids, mode="probe") elif args.loop: cycle = 0 while True: cycle += 1 tiles = generate_all_tiles() print(f"\n=== CYCLE {cycle}: FULL WORLD SCAN ({len(tiles)} tiles, z:{SCAN_ZOOM}) ===", flush=True) run_scan(tiles, conn, cur, existing_ids, mode="full") sleep_secs = args.interval * 3600 print(f"Sleeping {args.interval}h until next cycle...", flush=True) time.sleep(sleep_secs) else: # Default (and --full): full world scan tiles = generate_all_tiles() print(f"FULL WORLD SCAN: {len(tiles)} tiles (z:{SCAN_ZOOM})") run_scan(tiles, conn, cur, existing_ids, mode="full") conn.close() if __name__ == '__main__': main()