montana/Русский/Логистика/mt_save_to_db.py

225 lines
7.3 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Save MT bulk carrier tile data from checkpoint to PostgreSQL.
Creates mt_bulk_staging table (SHIP_ID as key, lat/lon for proximity search).
Also cross-references with existing vessels table by name to fill MMSI.
Usage: python mt_save_to_db.py
"""
import json, os, sys, re
import psycopg2
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
CHECKPOINT = 'mt_bulk_v2_checkpoint.json'
GT_SHIPTYPE_NAMES = {
'6': 'bulk_carrier',
'9': 'general_cargo',
'11': 'container',
'17': 'tanker_vlcc',
'18': 'tanker_aframax',
'88': 'tanker_chemical',
'71': 'tanker_product',
'12': 'roro',
'19': 'gas_carrier',
'4': 'tanker_crude',
'56': 'fpso',
'122':'multipurpose',
'70': 'general_cargo',
'139':'general_cargo',
'21': 'reefer',
}
def gt_to_type_category(gt):
name = GT_SHIPTYPE_NAMES.get(str(gt), 'cargo')
if 'bulk' in name:
return 'bulk'
if 'tanker' in name or 'gas' in name or 'fpso' in name:
return 'tanker'
if 'container' in name:
return 'container'
if 'roro' in name:
return 'roro'
return 'general'
def main():
print("Loading checkpoint...")
with open(CHECKPOINT) as f:
cp = json.load(f)
vessels = cp.get('vessels', {})
print(f"Total vessels in checkpoint: {len(vessels)}")
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Create mt_bulk_staging table
cur.execute("""
CREATE TABLE IF NOT EXISTS mt_bulk_staging (
ship_id TEXT PRIMARY KEY,
name TEXT,
flag TEXT,
dwt INTEGER,
shiptype TEXT,
gt_shiptype TEXT,
type_category TEXT,
lat DOUBLE PRECISION,
lon DOUBLE PRECISION,
speed REAL,
course REAL,
destination TEXT,
mmsi TEXT,
imo TEXT,
owner TEXT,
operator TEXT,
scraped_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_mbs_lat_lon
ON mt_bulk_staging (lat, lon)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_mbs_gt
ON mt_bulk_staging (gt_shiptype)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_mbs_type_cat
ON mt_bulk_staging (type_category)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_mbs_name
ON mt_bulk_staging (name)
""")
conn.commit()
print("Table mt_bulk_staging ready.")
# Load existing vessels MMSI lookup by name
print("Building name->MMSI lookup from existing vessels...")
cur.execute("SELECT mmsi, name FROM vessels WHERE mmsi IS NOT NULL AND name IS NOT NULL")
name_to_mmsi = {}
for row in cur.fetchall():
mmsi, name = row
if name:
name_to_mmsi[name.upper().strip()] = mmsi
print(f" Found {len(name_to_mmsi)} vessels with MMSI in DB")
# Insert all checkpoint vessels
inserted = 0
updated = 0
mmsi_found = 0
errors = 0
for ship_id, v in vessels.items():
name = v.get('name', '') or ''
flag = v.get('flag', '') or ''
dwt = int(v.get('dwt', 0) or 0)
shiptype = str(v.get('shiptype', '') or '')
gt = str(v.get('gt_shiptype', '') or '')
lat = v.get('lat')
lon = v.get('lon')
speed = _safe_float(v.get('speed'))
course = _safe_float(v.get('course'))
dest = v.get('destination', '') or ''
mmsi = v.get('mmsi')
imo = v.get('imo')
owner = v.get('owner')
operator = v.get('operator')
type_cat = gt_to_type_category(gt)
# Try to find MMSI from existing vessels table by name
if not mmsi and name:
mmsi = name_to_mmsi.get(name.upper().strip())
if mmsi:
mmsi_found += 1
try:
cur.execute("""
INSERT INTO mt_bulk_staging
(ship_id, name, flag, dwt, shiptype, gt_shiptype, type_category,
lat, lon, speed, course, destination, mmsi, imo, owner, operator,
scraped_at)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW())
ON CONFLICT (ship_id) DO UPDATE SET
name = EXCLUDED.name,
flag = EXCLUDED.flag,
dwt = EXCLUDED.dwt,
gt_shiptype = EXCLUDED.gt_shiptype,
type_category= EXCLUDED.type_category,
lat = EXCLUDED.lat,
lon = EXCLUDED.lon,
speed = EXCLUDED.speed,
course = EXCLUDED.course,
destination = EXCLUDED.destination,
mmsi = COALESCE(EXCLUDED.mmsi, mt_bulk_staging.mmsi),
imo = COALESCE(EXCLUDED.imo, mt_bulk_staging.imo),
owner = COALESCE(EXCLUDED.owner, mt_bulk_staging.owner),
operator = COALESCE(EXCLUDED.operator, mt_bulk_staging.operator),
scraped_at = NOW()
""", (ship_id, name, flag, dwt, shiptype, gt, type_cat,
lat, lon, speed, course, dest, mmsi, imo, owner, operator))
inserted += 1
except Exception as e:
errors += 1
if errors <= 3:
print(f" Insert error for {ship_id} ({name}): {e}")
if inserted % 500 == 0:
conn.commit()
print(f" {inserted}/{len(vessels)} inserted...")
conn.commit()
# Summary
cur.execute("SELECT count(*) FROM mt_bulk_staging")
total = cur.fetchone()[0]
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE gt_shiptype='6'")
bulk_count = cur.fetchone()[0]
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE mmsi IS NOT NULL")
with_mmsi = cur.fetchone()[0]
cur.execute("SELECT count(*) FROM mt_bulk_staging WHERE lat IS NOT NULL")
with_pos = cur.fetchone()[0]
print(f"\nDone!")
print(f" Total rows in mt_bulk_staging: {total}")
print(f" Bulk carriers (GT=6): {bulk_count}")
print(f" With MMSI: {with_mmsi}")
print(f" With position: {with_pos}")
print(f" MMSI found by name matching: {mmsi_found}")
print(f" Insert errors: {errors}")
# Also update existing vessels table where we found new data
print("\nUpdating vessels table for matched vessels...")
cur.execute("""
UPDATE vessels v
SET lat = m.lat,
lon = m.lon,
deadweight = COALESCE(v.deadweight, m.dwt),
updated_at = NOW()
FROM mt_bulk_staging m
WHERE v.mmsi = m.mmsi
AND m.lat IS NOT NULL
AND m.lon IS NOT NULL
""")
rows_updated = cur.rowcount
conn.commit()
print(f" Updated {rows_updated} rows in vessels table with fresh positions")
conn.close()
print("\nDB save complete!")
def _safe_float(val):
try:
return float(val)
except (TypeError, ValueError):
return None
if __name__ == '__main__':
main()