352 lines
12 KiB
Python
352 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MT Position Updater — Refresh vessel positions from FREE MarineTraffic (no login).
|
|
|
|
Uses curl_cffi (TLS fingerprint bypass) to fetch tile data directly — no browser needed.
|
|
Updates lat/lon/speed/course/scraped_at in mt_bulk_staging.
|
|
Auto-discovers new cargo/tanker vessels not in our DB.
|
|
|
|
Free MT API limit: z:2-4 only (z:5+ requires Pro login).
|
|
z:4 = 256 tiles, ~31K vessels, ~2 min — optimal for full world.
|
|
z:3 = 64 tiles, ~15K vessels, ~26 sec — good for priority quick scan.
|
|
|
|
Usage:
|
|
python mt_position_updater.py # Full world scan (z:4, ~2 min)
|
|
python mt_position_updater.py --probe # Test 20 tiles
|
|
python mt_position_updater.py --full # Same as default
|
|
python mt_position_updater.py --loop # Continuous: every 6h
|
|
"""
|
|
import json, sys, os, time, argparse, math
|
|
import psycopg2
|
|
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
|
if hasattr(sys.stdout, 'reconfigure'):
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
|
|
if hasattr(sys.stderr, 'reconfigure'):
|
|
sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
|
|
|
|
try:
|
|
from curl_cffi import requests as cf_requests
|
|
except ImportError:
|
|
print("FATAL: curl_cffi not installed. Run: pip install curl_cffi", flush=True)
|
|
sys.exit(1)
|
|
|
|
IS_SERVER = sys.platform == 'linux'
|
|
DB_URL = os.environ.get('DATABASE_URL') or (
|
|
'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:5432/seafare_db' if IS_SERVER
|
|
else 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
|
)
|
|
|
|
SCAN_ZOOM = 4 # z:4 = 256 tiles, ~31K vessels, ~2 min (max for free MT API)
|
|
UPDATE_INTERVAL_HOURS = 1
|
|
|
|
# GT_SHIPTYPE codes for cargo + tanker
|
|
CARGO_GT_TYPES = {
|
|
'6', '9', '11', '12', '21', '70', '122', # Cargo (green)
|
|
'4', '17', '18', '19', '56', '71', '88', # Tanker (red)
|
|
}
|
|
|
|
CARGO_TYPE_KEYWORDS = {
|
|
'bulk', 'cargo', 'container', 'tanker', 'chemical', 'oil',
|
|
'lpg', 'lng', 'reefer', 'ro-ro', 'roro', 'multipurpose',
|
|
'general', 'ore', 'cement', 'vehicle', 'barge', 'heavy load',
|
|
'fpso', 'fso', 'bitumen', 'asphalt', 'product',
|
|
}
|
|
|
|
|
|
# ---- TILE SYSTEM ----
|
|
|
|
def lat_lon_to_tile(lat, lon, zoom):
|
|
"""Convert lat/lon to slippy map tile X/Y."""
|
|
n = 2 ** zoom
|
|
x = int((lon + 180.0) / 360.0 * n)
|
|
y = int((1.0 - math.log(math.tan(math.radians(lat)) + 1.0 / math.cos(math.radians(lat))) / math.pi) / 2.0 * n)
|
|
return max(0, min(x, n - 1)), max(0, min(y, n - 1))
|
|
|
|
|
|
def fetch_tile(z, x, y):
|
|
"""Fetch single MT tile via curl_cffi. Returns list of raw vessel dicts."""
|
|
url = f'https://www.marinetraffic.com/getData/get_data_json_4/z:{z}/X:{x}/Y:{y}/station:0'
|
|
headers = {
|
|
'Referer': f'https://www.marinetraffic.com/en/ais/home/centerx:0/centery:0/zoom:{z}',
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
}
|
|
resp = cf_requests.get(url, headers=headers, impersonate='chrome', timeout=15)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if isinstance(data, dict):
|
|
rows = data.get('data', data)
|
|
if isinstance(rows, dict):
|
|
rows = rows.get('rows', [])
|
|
return rows if isinstance(rows, list) else []
|
|
return []
|
|
|
|
|
|
def generate_all_tiles(zoom=SCAN_ZOOM):
|
|
"""All tiles at given zoom. z:4 = 256 tiles, ~31K vessels, ~2 min."""
|
|
tiles = []
|
|
n = 2 ** zoom
|
|
for x in range(n):
|
|
for y in range(n):
|
|
tiles.append((zoom, x, y, "world"))
|
|
return tiles
|
|
|
|
|
|
# ---- VESSEL CLASSIFICATION ----
|
|
|
|
def is_cargo_vessel(row):
|
|
gt = str(row.get('GT_SHIPTYPE', '') or '')
|
|
if gt in CARGO_GT_TYPES:
|
|
return True
|
|
st = str(row.get('SHIPTYPE', '') or '').lower()
|
|
return any(kw in st for kw in CARGO_TYPE_KEYWORDS)
|
|
|
|
|
|
def classify_vessel(row):
|
|
gt = str(row.get('GT_SHIPTYPE', '') or '')
|
|
if gt == '6':
|
|
return 'bulk'
|
|
elif gt == '11':
|
|
return 'container'
|
|
elif gt == '12':
|
|
return 'roro'
|
|
elif gt in ('4', '17', '18', '19', '56', '71', '88'):
|
|
return 'tanker'
|
|
elif gt in ('9', '70', '122', '21'):
|
|
return 'general'
|
|
st = str(row.get('SHIPTYPE', '') or '').lower()
|
|
if 'bulk' in st:
|
|
return 'bulk'
|
|
elif 'container' in st:
|
|
return 'container'
|
|
elif any(k in st for k in ('tanker', 'oil', 'chemical', 'lpg', 'lng')):
|
|
return 'tanker'
|
|
elif any(k in st for k in ('ro-ro', 'roro', 'vehicle')):
|
|
return 'roro'
|
|
return 'general'
|
|
|
|
|
|
# ---- DB helpers ----
|
|
|
|
def db_connect():
|
|
return psycopg2.connect(
|
|
DB_URL, connect_timeout=15,
|
|
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5
|
|
)
|
|
|
|
|
|
def _restart_ssh_tunnel():
|
|
"""Restart SSH tunnel (Windows only, no-op on server)."""
|
|
if IS_SERVER:
|
|
return
|
|
import subprocess
|
|
try:
|
|
subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5)
|
|
except Exception:
|
|
pass
|
|
time.sleep(2)
|
|
try:
|
|
subprocess.Popen(
|
|
['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120',
|
|
'-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no',
|
|
'-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
print(f" [SSH] Tunnel restarted, waiting 5s...", flush=True)
|
|
time.sleep(5)
|
|
except Exception as e:
|
|
print(f" [SSH] Failed: {e}", flush=True)
|
|
|
|
|
|
def db_reconnect(conn):
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
for attempt in range(10):
|
|
try:
|
|
time.sleep(3)
|
|
c = db_connect()
|
|
print(f" [DB] Reconnected (attempt {attempt+1})", flush=True)
|
|
return c, c.cursor()
|
|
except Exception as e:
|
|
print(f" [DB] Attempt {attempt+1} failed: {e}", flush=True)
|
|
if not IS_SERVER and attempt in (4, 8):
|
|
print(f" [DB] Restarting SSH tunnel...", flush=True)
|
|
_restart_ssh_tunnel()
|
|
raise Exception("DB reconnect failed")
|
|
|
|
|
|
def db_safe_execute(conn, cur, query, params=None):
|
|
try:
|
|
cur.execute(query, params)
|
|
return conn, cur
|
|
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
|
|
print(f" [DB] Lost on execute ({e}), reconnecting...", flush=True)
|
|
conn, cur = db_reconnect(conn)
|
|
cur.execute(query, params)
|
|
return conn, cur
|
|
except psycopg2.Error:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
|
|
# ---- MAIN SCAN ----
|
|
|
|
def run_scan(tiles, conn, cur, existing_ids, mode="priority"):
|
|
"""Scan tiles via curl_cffi, update positions in DB. Returns (updated, discovered)."""
|
|
total_tiles = len(tiles)
|
|
updated = 0
|
|
discovered = 0
|
|
errors = 0
|
|
t_start = time.time()
|
|
|
|
print(f"\n[{mode.upper()}] Scanning {total_tiles} tiles...", flush=True)
|
|
|
|
for tile_idx, (z, x, y, label) in enumerate(tiles):
|
|
# Fetch tile
|
|
try:
|
|
rows = fetch_tile(z, x, y)
|
|
except Exception as e:
|
|
errors += 1
|
|
if errors <= 5:
|
|
print(f" WARN: tile z:{z}/X:{x}/Y:{y} error: {e}", flush=True)
|
|
continue
|
|
|
|
# Process vessels
|
|
tile_updated = 0
|
|
tile_discovered = 0
|
|
|
|
for row in rows:
|
|
if not is_cargo_vessel(row):
|
|
continue
|
|
|
|
ship_id = str(row.get('SHIP_ID', '') or '')
|
|
if not ship_id:
|
|
continue
|
|
|
|
v_lat = row.get('LAT')
|
|
v_lon = row.get('LON')
|
|
v_speed_raw = row.get('SPEED')
|
|
v_speed = round(float(v_speed_raw) / 10.0, 1) if v_speed_raw is not None else None
|
|
v_course = row.get('COURSE')
|
|
v_name = row.get('SHIPNAME', '')
|
|
v_gt = str(row.get('GT_SHIPTYPE', '') or '')
|
|
v_type_cat = classify_vessel(row)
|
|
|
|
if ship_id in existing_ids:
|
|
# UPDATE existing vessel position
|
|
try:
|
|
conn, cur = db_safe_execute(conn, cur, """
|
|
UPDATE mt_bulk_staging
|
|
SET lat = %s, lon = %s, speed = %s, course = %s, scraped_at = NOW()
|
|
WHERE ship_id = %s
|
|
""", (v_lat, v_lon, v_speed, v_course, ship_id))
|
|
conn.commit()
|
|
tile_updated += 1
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# DISCOVER new vessel
|
|
try:
|
|
conn, cur = db_safe_execute(conn, cur, """
|
|
INSERT INTO mt_bulk_staging (ship_id, name, lat, lon, speed, course,
|
|
gt_shiptype, type_category, scraped_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
|
|
ON CONFLICT (ship_id) DO UPDATE SET
|
|
lat = EXCLUDED.lat, lon = EXCLUDED.lon,
|
|
speed = EXCLUDED.speed, course = EXCLUDED.course,
|
|
scraped_at = NOW()
|
|
""", (ship_id, v_name, v_lat, v_lon, v_speed, v_course, v_gt, v_type_cat))
|
|
conn.commit()
|
|
existing_ids.add(ship_id)
|
|
tile_discovered += 1
|
|
except Exception:
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
|
|
updated += tile_updated
|
|
discovered += tile_discovered
|
|
|
|
# Progress every 50 tiles or when new vessels found
|
|
if (tile_idx + 1) % 50 == 0 or tile_discovered > 0:
|
|
elapsed = time.time() - t_start
|
|
remaining = total_tiles - tile_idx - 1
|
|
eta = (elapsed / (tile_idx + 1)) * remaining if tile_idx > 0 else 0
|
|
print(f" [{tile_idx+1}/{total_tiles}] {label} z:{z} | "
|
|
f"upd={tile_updated} new={tile_discovered} | "
|
|
f"total: upd={updated} new={discovered} err={errors} | "
|
|
f"{elapsed:.0f}s | ETA: {eta:.0f}s", flush=True)
|
|
|
|
elapsed = time.time() - t_start
|
|
print(f"\n{'='*60}", flush=True)
|
|
print(f"[{mode.upper()}] SCAN COMPLETE — {elapsed:.0f}s ({elapsed/60:.1f} min)", flush=True)
|
|
print(f" Tiles: {total_tiles} | Errors: {errors}", flush=True)
|
|
print(f" Positions updated: {updated}", flush=True)
|
|
print(f" New vessels discovered: {discovered}", flush=True)
|
|
print(f"{'='*60}\n", flush=True)
|
|
return updated, discovered
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='MT Position Updater (curl_cffi)')
|
|
parser.add_argument('--probe', action='store_true', help='Test 20 tiles')
|
|
parser.add_argument('--full', action='store_true', help='Full world scan (z:4)')
|
|
parser.add_argument('--loop', action='store_true', help='Continuous: full scan every 6h')
|
|
parser.add_argument('--interval', type=float, default=UPDATE_INTERVAL_HOURS,
|
|
help='Hours between scans in loop mode (default 6)')
|
|
args = parser.parse_args()
|
|
|
|
# DB connection
|
|
try:
|
|
conn = db_connect()
|
|
cur = conn.cursor()
|
|
cur.execute('SELECT count(*) FROM mt_bulk_staging')
|
|
existing_count = cur.fetchone()[0]
|
|
cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL')
|
|
existing_ids = set(str(r[0]) for r in cur.fetchall())
|
|
print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids")
|
|
except Exception as e:
|
|
print(f"DB ERROR: {e}")
|
|
if not IS_SERVER:
|
|
print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
|
|
return
|
|
|
|
if args.probe:
|
|
tiles = generate_all_tiles()[:20]
|
|
print(f"PROBE: {len(tiles)} tiles (z:{SCAN_ZOOM})")
|
|
run_scan(tiles, conn, cur, existing_ids, mode="probe")
|
|
|
|
elif args.loop:
|
|
cycle = 0
|
|
while True:
|
|
cycle += 1
|
|
tiles = generate_all_tiles()
|
|
print(f"\n=== CYCLE {cycle}: FULL WORLD SCAN ({len(tiles)} tiles, z:{SCAN_ZOOM}) ===",
|
|
flush=True)
|
|
run_scan(tiles, conn, cur, existing_ids, mode="full")
|
|
|
|
sleep_secs = args.interval * 3600
|
|
print(f"Sleeping {args.interval}h until next cycle...", flush=True)
|
|
time.sleep(sleep_secs)
|
|
|
|
else:
|
|
# Default (and --full): full world scan
|
|
tiles = generate_all_tiles()
|
|
print(f"FULL WORLD SCAN: {len(tiles)} tiles (z:{SCAN_ZOOM})")
|
|
run_scan(tiles, conn, cur, existing_ids, mode="full")
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|