225 lines
7.3 KiB
Python
225 lines
7.3 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
Save MT bulk carrier tile data from checkpoint to PostgreSQL.
|
||
|
|
Creates mt_bulk_staging table (SHIP_ID as key, lat/lon for proximity search).
|
||
|
|
Also cross-references with existing vessels table by name to fill MMSI.
|
||
|
|
|
||
|
|
Usage: python mt_save_to_db.py
|
||
|
|
"""
|
||
|
|
import json, os, sys, re
|
||
|
|
import psycopg2
|
||
|
|
|
||
|
|
if hasattr(sys.stdout, 'reconfigure'):
|
||
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
||
|
|
|
||
|
|
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
||
|
|
CHECKPOINT = 'mt_bulk_v2_checkpoint.json'
|
||
|
|
|
||
|
|
GT_SHIPTYPE_NAMES = {
|
||
|
|
'6': 'bulk_carrier',
|
||
|
|
'9': 'general_cargo',
|
||
|
|
'11': 'container',
|
||
|
|
'17': 'tanker_vlcc',
|
||
|
|
'18': 'tanker_aframax',
|
||
|
|
'88': 'tanker_chemical',
|
||
|
|
'71': 'tanker_product',
|
||
|
|
'12': 'roro',
|
||
|
|
'19': 'gas_carrier',
|
||
|
|
'4': 'tanker_crude',
|
||
|
|
'56': 'fpso',
|
||
|
|
'122':'multipurpose',
|
||
|
|
'70': 'general_cargo',
|
||
|
|
'139':'general_cargo',
|
||
|
|
'21': 'reefer',
|
||
|
|
}
|
||
|
|
|
||
|
|
def gt_to_type_category(gt):
|
||
|
|
name = GT_SHIPTYPE_NAMES.get(str(gt), 'cargo')
|
||
|
|
if 'bulk' in name:
|
||
|
|
return 'bulk'
|
||
|
|
if 'tanker' in name or 'gas' in name or 'fpso' in name:
|
||
|
|
return 'tanker'
|
||
|
|
if 'container' in name:
|
||
|
|
return 'container'
|
||
|
|
if 'roro' in name:
|
||
|
|
return 'roro'
|
||
|
|
return 'general'
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
print("Loading checkpoint...")
|
||
|
|
with open(CHECKPOINT) as f:
|
||
|
|
cp = json.load(f)
|
||
|
|
|
||
|
|
vessels = cp.get('vessels', {})
|
||
|
|
print(f"Total vessels in checkpoint: {len(vessels)}")
|
||
|
|
|
||
|
|
conn = psycopg2.connect(DB_URL)
|
||
|
|
cur = conn.cursor()
|
||
|
|
|
||
|
|
# Create mt_bulk_staging table
|
||
|
|
cur.execute("""
|
||
|
|
CREATE TABLE IF NOT EXISTS mt_bulk_staging (
|
||
|
|
ship_id TEXT PRIMARY KEY,
|
||
|
|
name TEXT,
|
||
|
|
flag TEXT,
|
||
|
|
dwt INTEGER,
|
||
|
|
shiptype TEXT,
|
||
|
|
gt_shiptype TEXT,
|
||
|
|
type_category TEXT,
|
||
|
|
lat DOUBLE PRECISION,
|
||
|
|
lon DOUBLE PRECISION,
|
||
|
|
speed REAL,
|
||
|
|
course REAL,
|
||
|
|
destination TEXT,
|
||
|
|
mmsi TEXT,
|
||
|
|
imo TEXT,
|
||
|
|
owner TEXT,
|
||
|
|
operator TEXT,
|
||
|
|
scraped_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||
|
|
)
|
||
|
|
""")
|
||
|
|
cur.execute("""
|
||
|
|
CREATE INDEX IF NOT EXISTS idx_mbs_lat_lon
|
||
|
|
ON mt_bulk_staging (lat, lon)
|
||
|
|
""")
|
||
|
|
cur.execute("""
|
||
|
|
CREATE INDEX IF NOT EXISTS idx_mbs_gt
|
||
|
|
ON mt_bulk_staging (gt_shiptype)
|
||
|
|
""")
|
||
|
|
cur.execute("""
|
||
|
|
CREATE INDEX IF NOT EXISTS idx_mbs_type_cat
|
||
|
|
ON mt_bulk_staging (type_category)
|
||
|
|
""")
|
||
|
|
cur.execute("""
|
||
|
|
CREATE INDEX IF NOT EXISTS idx_mbs_name
|
||
|
|
ON mt_bulk_staging (name)
|
||
|
|
""")
|
||
|
|
conn.commit()
|
||
|
|
print("Table mt_bulk_staging ready.")
|
||
|
|
|
||
|
|
# Load existing vessels MMSI lookup by name
|
||
|
|
print("Building name->MMSI lookup from existing vessels...")
|
||
|
|
cur.execute("SELECT mmsi, name FROM vessels WHERE mmsi IS NOT NULL AND name IS NOT NULL")
|
||
|
|
name_to_mmsi = {}
|
||
|
|
for row in cur.fetchall():
|
||
|
|
mmsi, name = row
|
||
|
|
if name:
|
||
|
|
name_to_mmsi[name.upper().strip()] = mmsi
|
||
|
|
print(f" Found {len(name_to_mmsi)} vessels with MMSI in DB")
|
||
|
|
|
||
|
|
# Insert all checkpoint vessels
|
||
|
|
inserted = 0
|
||
|
|
updated = 0
|
||
|
|
mmsi_found = 0
|
||
|
|
errors = 0
|
||
|
|
|
||
|
|
for ship_id, v in vessels.items():
|
||
|
|
name = v.get('name', '') or ''
|
||
|
|
flag = v.get('flag', '') or ''
|
||
|
|
dwt = int(v.get('dwt', 0) or 0)
|
||
|
|
shiptype = str(v.get('shiptype', '') or '')
|
||
|
|
gt = str(v.get('gt_shiptype', '') or '')
|
||
|
|
lat = v.get('lat')
|
||
|
|
lon = v.get('lon')
|
||
|
|
speed = _safe_float(v.get('speed'))
|
||
|
|
course = _safe_float(v.get('course'))
|
||
|
|
dest = v.get('destination', '') or ''
|
||
|
|
mmsi = v.get('mmsi')
|
||
|
|
imo = v.get('imo')
|
||
|
|
owner = v.get('owner')
|
||
|
|
operator = v.get('operator')
|
||
|
|
type_cat = gt_to_type_category(gt)
|
||
|
|
|
||
|
|
# Try to find MMSI from existing vessels table by name
|
||
|
|
if not mmsi and name:
|
||
|
|
mmsi = name_to_mmsi.get(name.upper().strip())
|
||
|
|
if mmsi:
|
||
|
|
mmsi_found += 1
|
||
|
|
|
||
|
|
try:
|
||
|
|
cur.execute("""
|
||
|
|
INSERT INTO mt_bulk_staging
|
||
|
|
(ship_id, name, flag, dwt, shiptype, gt_shiptype, type_category,
|
||
|
|
lat, lon, speed, course, destination, mmsi, imo, owner, operator,
|
||
|
|
scraped_at)
|
||
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
|
||
|
|
ON CONFLICT (ship_id) DO UPDATE SET
|
||
|
|
name = EXCLUDED.name,
|
||
|
|
flag = EXCLUDED.flag,
|
||
|
|
dwt = EXCLUDED.dwt,
|
||
|
|
gt_shiptype = EXCLUDED.gt_shiptype,
|
||
|
|
type_category= EXCLUDED.type_category,
|
||
|
|
lat = EXCLUDED.lat,
|
||
|
|
lon = EXCLUDED.lon,
|
||
|
|
speed = EXCLUDED.speed,
|
||
|
|
course = EXCLUDED.course,
|
||
|
|
destination = EXCLUDED.destination,
|
||
|
|
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
|
||
|
|
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
|
||
|
|
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
|
||
|
|
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
|
||
|
|
scraped_at = NOW()
|
||
|
|
""", (ship_id, name, flag, dwt, shiptype, gt, type_cat,
|
||
|
|
lat, lon, speed, course, dest, mmsi, imo, owner, operator))
|
||
|
|
inserted += 1
|
||
|
|
except Exception as e:
|
||
|
|
errors += 1
|
||
|
|
if errors <= 3:
|
||
|
|
print(f" Insert error for {ship_id} ({name}): {e}")
|
||
|
|
|
||
|
|
if inserted % 500 == 0:
|
||
|
|
conn.commit()
|
||
|
|
print(f" {inserted}/{len(vessels)} inserted...")
|
||
|
|
|
||
|
|
conn.commit()
|
||
|
|
|
||
|
|
# Summary
|
||
|
|
cur.execute("SELECT count(*) FROM mt_bulk_staging")
|
||
|
|
total = cur.fetchone()[0]
|
||
|
|
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE gt_shiptype='6'")
|
||
|
|
bulk_count = cur.fetchone()[0]
|
||
|
|
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL")
|
||
|
|
with_mmsi = cur.fetchone()[0]
|
||
|
|
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE lat IS NOT NULL")
|
||
|
|
with_pos = cur.fetchone()[0]
|
||
|
|
|
||
|
|
print(f"\nDone!")
|
||
|
|
print(f" Total rows in mt_bulk_staging: {total}")
|
||
|
|
print(f" Bulk carriers (GT=6): {bulk_count}")
|
||
|
|
print(f" With MMSI: {with_mmsi}")
|
||
|
|
print(f" With position: {with_pos}")
|
||
|
|
print(f" MMSI found by name matching: {mmsi_found}")
|
||
|
|
print(f" Insert errors: {errors}")
|
||
|
|
|
||
|
|
# Also update existing vessels table where we found new data
|
||
|
|
print("\nUpdating vessels table for matched vessels...")
|
||
|
|
cur.execute("""
|
||
|
|
UPDATE vessels v
|
||
|
|
SET lat = m.lat,
|
||
|
|
lon = m.lon,
|
||
|
|
deadweight = COALESCE(v.deadweight, m.dwt),
|
||
|
|
updated_at = NOW()
|
||
|
|
FROM mt_bulk_staging m
|
||
|
|
WHERE v.mmsi = m.mmsi
|
||
|
|
AND m.lat IS NOT NULL
|
||
|
|
AND m.lon IS NOT NULL
|
||
|
|
""")
|
||
|
|
rows_updated = cur.rowcount
|
||
|
|
conn.commit()
|
||
|
|
print(f" Updated {rows_updated} rows in vessels table with fresh positions")
|
||
|
|
|
||
|
|
conn.close()
|
||
|
|
print("\nDB save complete!")
|
||
|
|
|
||
|
|
|
||
|
|
def _safe_float(val):
|
||
|
|
try:
|
||
|
|
return float(val)
|
||
|
|
except (TypeError, ValueError):
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
main()
|