#!/usr/bin/env python3 """ Save MT bulk carrier tile data from checkpoint to PostgreSQL. Creates mt_bulk_staging table (SHIP_ID as key, lat/lon for proximity search). Also cross-references with existing vessels table by name to fill MMSI. Usage: python mt_save_to_db.py """ import json, os, sys, re import psycopg2 if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='replace') DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db' CHECKPOINT = 'mt_bulk_v2_checkpoint.json' GT_SHIPTYPE_NAMES = { '6': 'bulk_carrier', '9': 'general_cargo', '11': 'container', '17': 'tanker_vlcc', '18': 'tanker_aframax', '88': 'tanker_chemical', '71': 'tanker_product', '12': 'roro', '19': 'gas_carrier', '4': 'tanker_crude', '56': 'fpso', '122':'multipurpose', '70': 'general_cargo', '139':'general_cargo', '21': 'reefer', } def gt_to_type_category(gt): name = GT_SHIPTYPE_NAMES.get(str(gt), 'cargo') if 'bulk' in name: return 'bulk' if 'tanker' in name or 'gas' in name or 'fpso' in name: return 'tanker' if 'container' in name: return 'container' if 'roro' in name: return 'roro' return 'general' def main(): print("Loading checkpoint...") with open(CHECKPOINT) as f: cp = json.load(f) vessels = cp.get('vessels', {}) print(f"Total vessels in checkpoint: {len(vessels)}") conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Create mt_bulk_staging table cur.execute(""" CREATE TABLE IF NOT EXISTS mt_bulk_staging ( ship_id TEXT PRIMARY KEY, name TEXT, flag TEXT, dwt INTEGER, shiptype TEXT, gt_shiptype TEXT, type_category TEXT, lat DOUBLE PRECISION, lon DOUBLE PRECISION, speed REAL, course REAL, destination TEXT, mmsi TEXT, imo TEXT, owner TEXT, operator TEXT, scraped_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_mbs_lat_lon ON mt_bulk_staging (lat, lon) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_mbs_gt ON mt_bulk_staging (gt_shiptype) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_mbs_type_cat ON mt_bulk_staging (type_category) """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_mbs_name ON mt_bulk_staging (name) """) conn.commit() print("Table mt_bulk_staging ready.") # Load existing vessels MMSI lookup by name print("Building name->MMSI lookup from existing vessels...") cur.execute("SELECT mmsi, name FROM vessels WHERE mmsi IS NOT NULL AND name IS NOT NULL") name_to_mmsi = {} for row in cur.fetchall(): mmsi, name = row if name: name_to_mmsi[name.upper().strip()] = mmsi print(f" Found {len(name_to_mmsi)} vessels with MMSI in DB") # Insert all checkpoint vessels inserted = 0 updated = 0 mmsi_found = 0 errors = 0 for ship_id, v in vessels.items(): name = v.get('name', '') or '' flag = v.get('flag', '') or '' dwt = int(v.get('dwt', 0) or 0) shiptype = str(v.get('shiptype', '') or '') gt = str(v.get('gt_shiptype', '') or '') lat = v.get('lat') lon = v.get('lon') speed = _safe_float(v.get('speed')) course = _safe_float(v.get('course')) dest = v.get('destination', '') or '' mmsi = v.get('mmsi') imo = v.get('imo') owner = v.get('owner') operator = v.get('operator') type_cat = gt_to_type_category(gt) # Try to find MMSI from existing vessels table by name if not mmsi and name: mmsi = name_to_mmsi.get(name.upper().strip()) if mmsi: mmsi_found += 1 try: cur.execute(""" INSERT INTO mt_bulk_staging (ship_id, name, flag, dwt, shiptype, gt_shiptype, type_category, lat, lon, speed, course, destination, mmsi, imo, owner, operator, scraped_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW()) ON CONFLICT (ship_id) DO UPDATE SET name = EXCLUDED.name, flag = EXCLUDED.flag, dwt = EXCLUDED.dwt, gt_shiptype = EXCLUDED.gt_shiptype, type_category= EXCLUDED.type_category, lat = EXCLUDED.lat, lon = EXCLUDED.lon, speed = EXCLUDED.speed, course = EXCLUDED.course, destination = EXCLUDED.destination, mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi), imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo), owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner), operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator), scraped_at = NOW() """, (ship_id, name, flag, dwt, shiptype, gt, type_cat, lat, lon, speed, course, dest, mmsi, imo, owner, operator)) inserted += 1 except Exception as e: errors += 1 if errors <= 3: print(f" Insert error for {ship_id} ({name}): {e}") if inserted % 500 == 0: conn.commit() print(f" {inserted}/{len(vessels)} inserted...") conn.commit() # Summary cur.execute("SELECT count(*) FROM mt_bulk_staging") total = cur.fetchone()[0] cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE gt_shiptype='6'") bulk_count = cur.fetchone()[0] cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL") with_mmsi = cur.fetchone()[0] cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE lat IS NOT NULL") with_pos = cur.fetchone()[0] print(f"\nDone!") print(f" Total rows in mt_bulk_staging: {total}") print(f" Bulk carriers (GT=6): {bulk_count}") print(f" With MMSI: {with_mmsi}") print(f" With position: {with_pos}") print(f" MMSI found by name matching: {mmsi_found}") print(f" Insert errors: {errors}") # Also update existing vessels table where we found new data print("\nUpdating vessels table for matched vessels...") cur.execute(""" UPDATE vessels v SET lat = m.lat, lon = m.lon, deadweight = COALESCE(v.deadweight, m.dwt), updated_at = NOW() FROM mt_bulk_staging m WHERE v.mmsi = m.mmsi AND m.lat IS NOT NULL AND m.lon IS NOT NULL """) rows_updated = cur.rowcount conn.commit() print(f" Updated {rows_updated} rows in vessels table with fresh positions") conn.close() print("\nDB save complete!") def _safe_float(val): try: return float(val) except (TypeError, ValueError): return None if __name__ == '__main__': main()