275 lines
8.3 KiB
Python
275 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Merge UN/LOCODE seaports with existing world_ports.json
|
|
Downloads: https://github.com/cristan/improved-un-locodes
|
|
|
|
Run: python merge_unlocode.py
|
|
"""
|
|
import csv
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
UNLOCODE_CSV = os.path.join(SCRIPT_DIR, 'unlocode_improved.csv')
|
|
COUNTRY_CSV = os.path.join(SCRIPT_DIR, 'unlocode_countries.csv')
|
|
EXISTING_JSON = os.path.join(SCRIPT_DIR, 'world_ports.json')
|
|
OUTPUT_JSON = os.path.join(SCRIPT_DIR, 'world_ports.json')
|
|
|
|
|
|
def assign_region(lat, lon):
|
|
"""Same logic as assemble_ports.py and marinetraffic_parser.py"""
|
|
if lon < -100 and lat > 25:
|
|
return 'USWC'
|
|
elif lon < -30 and lat > 25:
|
|
return 'USEC'
|
|
elif lon < -30 and -5 < lat <= 25:
|
|
return 'CARIB'
|
|
elif lon < -70 and lat <= -5:
|
|
return 'SAW'
|
|
elif lon < -30 and lat <= -5:
|
|
return 'SAE'
|
|
elif lon > 100 and lat < -10:
|
|
return 'AUSNZ'
|
|
elif lon > 100:
|
|
return 'EASIA'
|
|
elif 60 < lon <= 100 and lat > 0:
|
|
return 'SASIA'
|
|
elif 40 < lon <= 60 and lat > 10:
|
|
return 'GULF'
|
|
elif 10 < lon < 45 and lat < -20:
|
|
return 'SAFR'
|
|
elif 30 < lon <= 45 and -10 < lat < 20:
|
|
return 'ERED'
|
|
elif -5 < lon < 15 and -5 < lat < 15:
|
|
return 'WAFR'
|
|
elif 25 < lon <= 40 and 40 < lat <= 48:
|
|
return 'BSEA'
|
|
elif lon < 40 and lat > 48:
|
|
return 'NEUR'
|
|
elif -10 <= lon <= 40 and 25 < lat <= 48:
|
|
return 'MED'
|
|
else:
|
|
return 'OTHER'
|
|
|
|
|
|
def load_country_names():
|
|
"""Load country code -> country name mapping"""
|
|
countries = {}
|
|
with open(COUNTRY_CSV, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
code = row['CountryCode'].strip()
|
|
name = row['CountryName'].strip()
|
|
# Clean up names like "United Arab Emirates (the)"
|
|
name = re.sub(r'\s*\(the\)\s*$', '', name)
|
|
name = re.sub(r'\s*\(.*?\)\s*$', '', name).strip()
|
|
countries[code] = name
|
|
return countries
|
|
|
|
|
|
def make_key(name):
|
|
"""Convert port name to underscore key"""
|
|
key = name.lower().strip()
|
|
key = re.sub(r"[''`]", "", key)
|
|
key = re.sub(r'[^a-z0-9]+', '_', key)
|
|
key = key.strip('_')
|
|
return key
|
|
|
|
|
|
def load_existing():
|
|
"""Load existing world_ports.json"""
|
|
if not os.path.exists(EXISTING_JSON):
|
|
print("WARNING: No existing world_ports.json found")
|
|
return {}
|
|
with open(EXISTING_JSON, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def parse_unlocode_seaports(country_names):
|
|
"""Parse UN/LOCODE CSV — ALL seaports with coordinates, no filtering."""
|
|
ports = []
|
|
skipped_no_coords = 0
|
|
skipped_no_func = 0
|
|
|
|
with open(UNLOCODE_CSV, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
func = row.get('Function', '')
|
|
# Position 0 = port (seaport)
|
|
if not func or func[0] != '1':
|
|
skipped_no_func += 1
|
|
continue
|
|
|
|
coords_dec = row.get('CoordinatesDecimal', '').strip()
|
|
if not coords_dec:
|
|
skipped_no_coords += 1
|
|
continue
|
|
|
|
try:
|
|
lat_str, lon_str = coords_dec.split(',')
|
|
lat = float(lat_str.strip())
|
|
lon = float(lon_str.strip())
|
|
except (ValueError, IndexError):
|
|
skipped_no_coords += 1
|
|
continue
|
|
|
|
# Validate coordinates
|
|
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
|
|
skipped_no_coords += 1
|
|
continue
|
|
|
|
n_functions = sum(1 for c in func if c not in '-0 ')
|
|
|
|
country_code = row.get('Country', '').strip()
|
|
location_code = row.get('Location', '').strip()
|
|
unlocode = f"{country_code}{location_code}"
|
|
name = row.get('NameWoDiacritics', '').strip() or row.get('Name', '').strip()
|
|
|
|
if not name:
|
|
continue
|
|
|
|
country = country_names.get(country_code, country_code)
|
|
|
|
# Determine size based on function count
|
|
if n_functions >= 4:
|
|
size = 'large'
|
|
elif n_functions >= 2:
|
|
size = 'medium'
|
|
else:
|
|
size = 'small'
|
|
|
|
ports.append({
|
|
'unlocode': unlocode,
|
|
'name': name,
|
|
'country': country,
|
|
'country_code': country_code,
|
|
'lat': round(lat, 5),
|
|
'lon': round(lon, 5),
|
|
'size': size,
|
|
})
|
|
|
|
print(f"UN/LOCODE: {len(ports)} seaports with coords")
|
|
print(f" Skipped (no port function): {skipped_no_func}")
|
|
print(f" Skipped (no/bad coords): {skipped_no_coords}")
|
|
return ports
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("UN/LOCODE Merge Tool")
|
|
print("=" * 60)
|
|
|
|
# 1. Load country names
|
|
print("\n1. Loading country names...")
|
|
country_names = load_country_names()
|
|
print(f" {len(country_names)} countries loaded")
|
|
|
|
# 2. Load existing ports
|
|
print("\n2. Loading existing world_ports.json...")
|
|
existing = load_existing()
|
|
print(f" {len(existing)} existing ports")
|
|
|
|
# Build UNLOCODE index of existing ports
|
|
existing_unlocodes = {}
|
|
existing_keys = set()
|
|
for key, port in existing.items():
|
|
existing_keys.add(key)
|
|
if port.get('unlocode'):
|
|
existing_unlocodes[port['unlocode']] = key
|
|
|
|
# 3. Parse UN/LOCODE
|
|
print("\n3. Parsing UN/LOCODE seaports...")
|
|
unlocode_ports = parse_unlocode_seaports(country_names)
|
|
|
|
# 4. Merge
|
|
print("\n4. Merging...")
|
|
merged = dict(existing) # Start with existing (they have priority)
|
|
added = 0
|
|
updated_coords = 0
|
|
duplicates = 0
|
|
|
|
for port in unlocode_ports:
|
|
unlocode = port['unlocode']
|
|
key = make_key(port['name'])
|
|
|
|
# Skip if UNLOCODE already exists in our data
|
|
if unlocode in existing_unlocodes:
|
|
duplicates += 1
|
|
continue
|
|
|
|
# Skip if key already exists
|
|
if key in merged:
|
|
# But update UNLOCODE if missing
|
|
if not merged[key].get('unlocode'):
|
|
merged[key]['unlocode'] = unlocode
|
|
duplicates += 1
|
|
continue
|
|
|
|
size = port.get('size', 'small')
|
|
region = assign_region(port['lat'], port['lon'])
|
|
|
|
radius_nm = {'large': 12, 'medium': 8, 'small': 5}.get(size, 5)
|
|
merged[key] = {
|
|
'key': key,
|
|
'name': port['name'],
|
|
'country': port['country'],
|
|
'country_code': port['country_code'],
|
|
'unlocode': unlocode,
|
|
'lat': port['lat'],
|
|
'lon': port['lon'],
|
|
'size': size,
|
|
'region': region,
|
|
'radius_nm': radius_nm,
|
|
}
|
|
added += 1
|
|
|
|
print(f" Added: {added} new ports")
|
|
print(f" Duplicates skipped: {duplicates}")
|
|
|
|
# 5. Validate
|
|
print("\n5. Validating...")
|
|
errors = 0
|
|
for key, port in merged.items():
|
|
if not (-90 <= port['lat'] <= 90):
|
|
print(f" ERROR: {key} lat={port['lat']}")
|
|
errors += 1
|
|
if not (-180 <= port['lon'] <= 180):
|
|
print(f" ERROR: {key} lon={port['lon']}")
|
|
errors += 1
|
|
if errors:
|
|
print(f"\n{errors} validation errors!")
|
|
sys.exit(1)
|
|
print(f" All {len(merged)} ports validated OK")
|
|
|
|
# 6. Write JSON
|
|
print(f"\n6. Writing {len(merged)} ports -> {OUTPUT_JSON}")
|
|
with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
|
|
json.dump(merged, f, separators=(',', ':'), ensure_ascii=False)
|
|
|
|
# 7. Stats
|
|
print(f"\n{'=' * 60}")
|
|
print(f"RESULT: {len(existing)} existing + {added} new = {len(merged)} total ports")
|
|
print(f"{'=' * 60}")
|
|
|
|
print("\nBy region:")
|
|
regions = Counter(p['region'] for p in merged.values())
|
|
for r, c in sorted(regions.items(), key=lambda x: -x[1]):
|
|
print(f" {r}: {c}")
|
|
|
|
print("\nBy size:")
|
|
sizes = Counter(p['size'] for p in merged.values())
|
|
for s, c in sorted(sizes.items()):
|
|
print(f" {s}: {c}")
|
|
|
|
print("\nTop 10 countries:")
|
|
countries = Counter(p.get('country', 'Unknown') for p in merged.values())
|
|
for country, c in countries.most_common(10):
|
|
print(f" {country}: {c}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|