montana/Русский/Логистика/mt_position_updater.py

352 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
MT Position Updater Refresh vessel positions from FREE MarineTraffic (no login).
Uses curl_cffi (TLS fingerprint bypass) to fetch tile data directly no browser needed.
Updates lat/lon/speed/course/scraped_at in mt_bulk_staging.
Auto-discovers new cargo/tanker vessels not in our DB.
Free MT API limit: z:2-4 only (z:5+ requires Pro login).
z:4 = 256 tiles, ~31K vessels, ~2 min optimal for full world.
z:3 = 64 tiles, ~15K vessels, ~26 sec good for priority quick scan.
Usage:
python mt_position_updater.py # Full world scan (z:4, ~2 min)
python mt_position_updater.py --probe # Test 20 tiles
python mt_position_updater.py --full # Same as default
python mt_position_updater.py --loop # Continuous: every 6h
"""
import json, sys, os, time, argparse, math
import psycopg2
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
if hasattr(sys.stderr, 'reconfigure'):
sys.stderr.reconfigure(encoding='utf-8', errors='replace', line_buffering=True)
try:
from curl_cffi import requests as cf_requests
except ImportError:
print("FATAL: curl_cffi not installed. Run: pip install curl_cffi", flush=True)
sys.exit(1)
IS_SERVER = sys.platform == 'linux'
DB_URL = os.environ.get('DATABASE_URL') or (
'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:5432/seafare_db' if IS_SERVER
else 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
)
SCAN_ZOOM = 4 # z:4 = 256 tiles, ~31K vessels, ~2 min (max for free MT API)
UPDATE_INTERVAL_HOURS = 1
# GT_SHIPTYPE codes for cargo + tanker
CARGO_GT_TYPES = {
'6', '9', '11', '12', '21', '70', '122', # Cargo (green)
'4', '17', '18', '19', '56', '71', '88', # Tanker (red)
}
CARGO_TYPE_KEYWORDS = {
'bulk', 'cargo', 'container', 'tanker', 'chemical', 'oil',
'lpg', 'lng', 'reefer', 'ro-ro', 'roro', 'multipurpose',
'general', 'ore', 'cement', 'vehicle', 'barge', 'heavy load',
'fpso', 'fso', 'bitumen', 'asphalt', 'product',
}
# ---- TILE SYSTEM ----
def lat_lon_to_tile(lat, lon, zoom):
"""Convert lat/lon to slippy map tile X/Y."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
y = int((1.0 - math.log(math.tan(math.radians(lat)) + 1.0 / math.cos(math.radians(lat))) / math.pi) / 2.0 * n)
return max(0, min(x, n - 1)), max(0, min(y, n - 1))
def fetch_tile(z, x, y):
"""Fetch single MT tile via curl_cffi. Returns list of raw vessel dicts."""
url = f'https://www.marinetraffic.com/getData/get_data_json_4/z:{z}/X:{x}/Y:{y}/station:0'
headers = {
'Referer': f'https://www.marinetraffic.com/en/ais/home/centerx:0/centery:0/zoom:{z}',
'X-Requested-With': 'XMLHttpRequest',
}
resp = cf_requests.get(url, headers=headers, impersonate='chrome', timeout=15)
if resp.status_code == 200:
data = resp.json()
if isinstance(data, dict):
rows = data.get('data', data)
if isinstance(rows, dict):
rows = rows.get('rows', [])
return rows if isinstance(rows, list) else []
return []
def generate_all_tiles(zoom=SCAN_ZOOM):
"""All tiles at given zoom. z:4 = 256 tiles, ~31K vessels, ~2 min."""
tiles = []
n = 2 ** zoom
for x in range(n):
for y in range(n):
tiles.append((zoom, x, y, "world"))
return tiles
# ---- VESSEL CLASSIFICATION ----
def is_cargo_vessel(row):
gt = str(row.get('GT_SHIPTYPE', '') or '')
if gt in CARGO_GT_TYPES:
return True
st = str(row.get('SHIPTYPE', '') or '').lower()
return any(kw in st for kw in CARGO_TYPE_KEYWORDS)
def classify_vessel(row):
gt = str(row.get('GT_SHIPTYPE', '') or '')
if gt == '6':
return 'bulk'
elif gt == '11':
return 'container'
elif gt == '12':
return 'roro'
elif gt in ('4', '17', '18', '19', '56', '71', '88'):
return 'tanker'
elif gt in ('9', '70', '122', '21'):
return 'general'
st = str(row.get('SHIPTYPE', '') or '').lower()
if 'bulk' in st:
return 'bulk'
elif 'container' in st:
return 'container'
elif any(k in st for k in ('tanker', 'oil', 'chemical', 'lpg', 'lng')):
return 'tanker'
elif any(k in st for k in ('ro-ro', 'roro', 'vehicle')):
return 'roro'
return 'general'
# ---- DB helpers ----
def db_connect():
return psycopg2.connect(
DB_URL, connect_timeout=15,
keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5
)
def _restart_ssh_tunnel():
"""Restart SSH tunnel (Windows only, no-op on server)."""
if IS_SERVER:
return
import subprocess
try:
subprocess.run(['taskkill', '/F', '/IM', 'ssh.exe'], capture_output=True, timeout=5)
except Exception:
pass
time.sleep(2)
try:
subprocess.Popen(
['ssh', '-o', 'ServerAliveInterval=5', '-o', 'ServerAliveCountMax=120',
'-o', 'TCPKeepAlive=yes', '-o', 'StrictHostKeyChecking=no',
'-L', '15432:127.0.0.1:5432', '-N', 'root@89.19.208.158'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
print(f" [SSH] Tunnel restarted, waiting 5s...", flush=True)
time.sleep(5)
except Exception as e:
print(f" [SSH] Failed: {e}", flush=True)
def db_reconnect(conn):
try:
conn.close()
except Exception:
pass
for attempt in range(10):
try:
time.sleep(3)
c = db_connect()
print(f" [DB] Reconnected (attempt {attempt+1})", flush=True)
return c, c.cursor()
except Exception as e:
print(f" [DB] Attempt {attempt+1} failed: {e}", flush=True)
if not IS_SERVER and attempt in (4, 8):
print(f" [DB] Restarting SSH tunnel...", flush=True)
_restart_ssh_tunnel()
raise Exception("DB reconnect failed")
def db_safe_execute(conn, cur, query, params=None):
try:
cur.execute(query, params)
return conn, cur
except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:
print(f" [DB] Lost on execute ({e}), reconnecting...", flush=True)
conn, cur = db_reconnect(conn)
cur.execute(query, params)
return conn, cur
except psycopg2.Error:
try:
conn.rollback()
except Exception:
pass
raise
# ---- MAIN SCAN ----
def run_scan(tiles, conn, cur, existing_ids, mode="priority"):
"""Scan tiles via curl_cffi, update positions in DB. Returns (updated, discovered)."""
total_tiles = len(tiles)
updated = 0
discovered = 0
errors = 0
t_start = time.time()
print(f"\n[{mode.upper()}] Scanning {total_tiles} tiles...", flush=True)
for tile_idx, (z, x, y, label) in enumerate(tiles):
# Fetch tile
try:
rows = fetch_tile(z, x, y)
except Exception as e:
errors += 1
if errors <= 5:
print(f" WARN: tile z:{z}/X:{x}/Y:{y} error: {e}", flush=True)
continue
# Process vessels
tile_updated = 0
tile_discovered = 0
for row in rows:
if not is_cargo_vessel(row):
continue
ship_id = str(row.get('SHIP_ID', '') or '')
if not ship_id:
continue
v_lat = row.get('LAT')
v_lon = row.get('LON')
v_speed_raw = row.get('SPEED')
v_speed = round(float(v_speed_raw) / 10.0, 1) if v_speed_raw is not None else None
v_course = row.get('COURSE')
v_name = row.get('SHIPNAME', '')
v_gt = str(row.get('GT_SHIPTYPE', '') or '')
v_type_cat = classify_vessel(row)
if ship_id in existing_ids:
# UPDATE existing vessel position
try:
conn, cur = db_safe_execute(conn, cur, """
UPDATE mt_bulk_staging
SET lat = %s, lon = %s, speed = %s, course = %s, scraped_at = NOW()
WHERE ship_id = %s
""", (v_lat, v_lon, v_speed, v_course, ship_id))
conn.commit()
tile_updated += 1
except Exception:
try:
conn.rollback()
except Exception:
pass
else:
# DISCOVER new vessel
try:
conn, cur = db_safe_execute(conn, cur, """
INSERT INTO mt_bulk_staging (ship_id, name, lat, lon, speed, course,
gt_shiptype, type_category, scraped_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
lat = EXCLUDED.lat, lon = EXCLUDED.lon,
speed = EXCLUDED.speed, course = EXCLUDED.course,
scraped_at = NOW()
""", (ship_id, v_name, v_lat, v_lon, v_speed, v_course, v_gt, v_type_cat))
conn.commit()
existing_ids.add(ship_id)
tile_discovered += 1
except Exception:
try:
conn.rollback()
except Exception:
pass
updated += tile_updated
discovered += tile_discovered
# Progress every 50 tiles or when new vessels found
if (tile_idx + 1) % 50 == 0 or tile_discovered > 0:
elapsed = time.time() - t_start
remaining = total_tiles - tile_idx - 1
eta = (elapsed / (tile_idx + 1)) * remaining if tile_idx > 0 else 0
print(f" [{tile_idx+1}/{total_tiles}] {label} z:{z} | "
f"upd={tile_updated} new={tile_discovered} | "
f"total: upd={updated} new={discovered} err={errors} | "
f"{elapsed:.0f}s | ETA: {eta:.0f}s", flush=True)
elapsed = time.time() - t_start
print(f"\n{'='*60}", flush=True)
print(f"[{mode.upper()}] SCAN COMPLETE — {elapsed:.0f}s ({elapsed/60:.1f} min)", flush=True)
print(f" Tiles: {total_tiles} | Errors: {errors}", flush=True)
print(f" Positions updated: {updated}", flush=True)
print(f" New vessels discovered: {discovered}", flush=True)
print(f"{'='*60}\n", flush=True)
return updated, discovered
def main():
parser = argparse.ArgumentParser(description='MT Position Updater (curl_cffi)')
parser.add_argument('--probe', action='store_true', help='Test 20 tiles')
parser.add_argument('--full', action='store_true', help='Full world scan (z:4)')
parser.add_argument('--loop', action='store_true', help='Continuous: full scan every 6h')
parser.add_argument('--interval', type=float, default=UPDATE_INTERVAL_HOURS,
help='Hours between scans in loop mode (default 6)')
args = parser.parse_args()
# DB connection
try:
conn = db_connect()
cur = conn.cursor()
cur.execute('SELECT count(*) FROM mt_bulk_staging')
existing_count = cur.fetchone()[0]
cur.execute('SELECT ship_id FROM mt_bulk_staging WHERE ship_id IS NOT NULL')
existing_ids = set(str(r[0]) for r in cur.fetchall())
print(f"DB: {existing_count} existing vessels | {len(existing_ids)} ship_ids")
except Exception as e:
print(f"DB ERROR: {e}")
if not IS_SERVER:
print("Start SSH tunnel: ssh -L 15432:127.0.0.1:5432 -N root@89.19.208.158")
return
if args.probe:
tiles = generate_all_tiles()[:20]
print(f"PROBE: {len(tiles)} tiles (z:{SCAN_ZOOM})")
run_scan(tiles, conn, cur, existing_ids, mode="probe")
elif args.loop:
cycle = 0
while True:
cycle += 1
tiles = generate_all_tiles()
print(f"\n=== CYCLE {cycle}: FULL WORLD SCAN ({len(tiles)} tiles, z:{SCAN_ZOOM}) ===",
flush=True)
run_scan(tiles, conn, cur, existing_ids, mode="full")
sleep_secs = args.interval * 3600
print(f"Sleeping {args.interval}h until next cycle...", flush=True)
time.sleep(sleep_secs)
else:
# Default (and --full): full world scan
tiles = generate_all_tiles()
print(f"FULL WORLD SCAN: {len(tiles)} tiles (z:{SCAN_ZOOM})")
run_scan(tiles, conn, cur, existing_ids, mode="full")
conn.close()
if __name__ == '__main__':
main()