#!/usr/bin/env python3 """ Merge UN/LOCODE seaports with existing world_ports.json Downloads: https://github.com/cristan/improved-un-locodes Run: python merge_unlocode.py """ import csv import json import os import re import sys from collections import Counter SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) UNLOCODE_CSV = os.path.join(SCRIPT_DIR, 'unlocode_improved.csv') COUNTRY_CSV = os.path.join(SCRIPT_DIR, 'unlocode_countries.csv') EXISTING_JSON = os.path.join(SCRIPT_DIR, 'world_ports.json') OUTPUT_JSON = os.path.join(SCRIPT_DIR, 'world_ports.json') def assign_region(lat, lon): """Same logic as assemble_ports.py and marinetraffic_parser.py""" if lon < -100 and lat > 25: return 'USWC' elif lon < -30 and lat > 25: return 'USEC' elif lon < -30 and -5 < lat <= 25: return 'CARIB' elif lon < -70 and lat <= -5: return 'SAW' elif lon < -30 and lat <= -5: return 'SAE' elif lon > 100 and lat < -10: return 'AUSNZ' elif lon > 100: return 'EASIA' elif 60 < lon <= 100 and lat > 0: return 'SASIA' elif 40 < lon <= 60 and lat > 10: return 'GULF' elif 10 < lon < 45 and lat < -20: return 'SAFR' elif 30 < lon <= 45 and -10 < lat < 20: return 'ERED' elif -5 < lon < 15 and -5 < lat < 15: return 'WAFR' elif 25 < lon <= 40 and 40 < lat <= 48: return 'BSEA' elif lon < 40 and lat > 48: return 'NEUR' elif -10 <= lon <= 40 and 25 < lat <= 48: return 'MED' else: return 'OTHER' def load_country_names(): """Load country code -> country name mapping""" countries = {} with open(COUNTRY_CSV, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: code = row['CountryCode'].strip() name = row['CountryName'].strip() # Clean up names like "United Arab Emirates (the)" name = re.sub(r'\s*\(the\)\s*$', '', name) name = re.sub(r'\s*\(.*?\)\s*$', '', name).strip() countries[code] = name return countries def make_key(name): """Convert port name to underscore key""" key = name.lower().strip() key = re.sub(r"[''`]", "", key) key = re.sub(r'[^a-z0-9]+', '_', key) key = key.strip('_') return key def load_existing(): """Load existing world_ports.json""" if not os.path.exists(EXISTING_JSON): print("WARNING: No existing world_ports.json found") return {} with open(EXISTING_JSON, 'r', encoding='utf-8') as f: return json.load(f) def parse_unlocode_seaports(country_names): """Parse UN/LOCODE CSV — ALL seaports with coordinates, no filtering.""" ports = [] skipped_no_coords = 0 skipped_no_func = 0 with open(UNLOCODE_CSV, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: func = row.get('Function', '') # Position 0 = port (seaport) if not func or func[0] != '1': skipped_no_func += 1 continue coords_dec = row.get('CoordinatesDecimal', '').strip() if not coords_dec: skipped_no_coords += 1 continue try: lat_str, lon_str = coords_dec.split(',') lat = float(lat_str.strip()) lon = float(lon_str.strip()) except (ValueError, IndexError): skipped_no_coords += 1 continue # Validate coordinates if not (-90 <= lat <= 90) or not (-180 <= lon <= 180): skipped_no_coords += 1 continue n_functions = sum(1 for c in func if c not in '-0 ') country_code = row.get('Country', '').strip() location_code = row.get('Location', '').strip() unlocode = f"{country_code}{location_code}" name = row.get('NameWoDiacritics', '').strip() or row.get('Name', '').strip() if not name: continue country = country_names.get(country_code, country_code) # Determine size based on function count if n_functions >= 4: size = 'large' elif n_functions >= 2: size = 'medium' else: size = 'small' ports.append({ 'unlocode': unlocode, 'name': name, 'country': country, 'country_code': country_code, 'lat': round(lat, 5), 'lon': round(lon, 5), 'size': size, }) print(f"UN/LOCODE: {len(ports)} seaports with coords") print(f" Skipped (no port function): {skipped_no_func}") print(f" Skipped (no/bad coords): {skipped_no_coords}") return ports def main(): print("=" * 60) print("UN/LOCODE Merge Tool") print("=" * 60) # 1. Load country names print("\n1. Loading country names...") country_names = load_country_names() print(f" {len(country_names)} countries loaded") # 2. Load existing ports print("\n2. Loading existing world_ports.json...") existing = load_existing() print(f" {len(existing)} existing ports") # Build UNLOCODE index of existing ports existing_unlocodes = {} existing_keys = set() for key, port in existing.items(): existing_keys.add(key) if port.get('unlocode'): existing_unlocodes[port['unlocode']] = key # 3. Parse UN/LOCODE print("\n3. Parsing UN/LOCODE seaports...") unlocode_ports = parse_unlocode_seaports(country_names) # 4. Merge print("\n4. Merging...") merged = dict(existing) # Start with existing (they have priority) added = 0 updated_coords = 0 duplicates = 0 for port in unlocode_ports: unlocode = port['unlocode'] key = make_key(port['name']) # Skip if UNLOCODE already exists in our data if unlocode in existing_unlocodes: duplicates += 1 continue # Skip if key already exists if key in merged: # But update UNLOCODE if missing if not merged[key].get('unlocode'): merged[key]['unlocode'] = unlocode duplicates += 1 continue size = port.get('size', 'small') region = assign_region(port['lat'], port['lon']) radius_nm = {'large': 12, 'medium': 8, 'small': 5}.get(size, 5) merged[key] = { 'key': key, 'name': port['name'], 'country': port['country'], 'country_code': port['country_code'], 'unlocode': unlocode, 'lat': port['lat'], 'lon': port['lon'], 'size': size, 'region': region, 'radius_nm': radius_nm, } added += 1 print(f" Added: {added} new ports") print(f" Duplicates skipped: {duplicates}") # 5. Validate print("\n5. Validating...") errors = 0 for key, port in merged.items(): if not (-90 <= port['lat'] <= 90): print(f" ERROR: {key} lat={port['lat']}") errors += 1 if not (-180 <= port['lon'] <= 180): print(f" ERROR: {key} lon={port['lon']}") errors += 1 if errors: print(f"\n{errors} validation errors!") sys.exit(1) print(f" All {len(merged)} ports validated OK") # 6. Write JSON print(f"\n6. Writing {len(merged)} ports -> {OUTPUT_JSON}") with open(OUTPUT_JSON, 'w', encoding='utf-8') as f: json.dump(merged, f, separators=(',', ':'), ensure_ascii=False) # 7. Stats print(f"\n{'=' * 60}") print(f"RESULT: {len(existing)} existing + {added} new = {len(merged)} total ports") print(f"{'=' * 60}") print("\nBy region:") regions = Counter(p['region'] for p in merged.values()) for r, c in sorted(regions.items(), key=lambda x: -x[1]): print(f" {r}: {c}") print("\nBy size:") sizes = Counter(p['size'] for p in merged.values()) for s, c in sorted(sizes.items()): print(f" {s}: {c}") print("\nTop 10 countries:") countries = Counter(p.get('country', 'Unknown') for p in merged.values()) for country, c in countries.most_common(10): print(f" {country}: {c}") if __name__ == '__main__': main()