montana/Русский/Логистика/merge_unlocode.py

275 lines
8.3 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Merge UN/LOCODE seaports with existing world_ports.json
Downloads: https://github.com/cristan/improved-un-locodes
Run: python merge_unlocode.py
"""
import csv
import json
import os
import re
import sys
from collections import Counter
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
UNLOCODE_CSV = os.path.join(SCRIPT_DIR, 'unlocode_improved.csv')
COUNTRY_CSV = os.path.join(SCRIPT_DIR, 'unlocode_countries.csv')
EXISTING_JSON = os.path.join(SCRIPT_DIR, 'world_ports.json')
OUTPUT_JSON = os.path.join(SCRIPT_DIR, 'world_ports.json')
def assign_region(lat, lon):
"""Same logic as assemble_ports.py and marinetraffic_parser.py"""
if lon < -100 and lat > 25:
return 'USWC'
elif lon < -30 and lat > 25:
return 'USEC'
elif lon < -30 and -5 < lat <= 25:
return 'CARIB'
elif lon < -70 and lat <= -5:
return 'SAW'
elif lon < -30 and lat <= -5:
return 'SAE'
elif lon > 100 and lat < -10:
return 'AUSNZ'
elif lon > 100:
return 'EASIA'
elif 60 < lon <= 100 and lat > 0:
return 'SASIA'
elif 40 < lon <= 60 and lat > 10:
return 'GULF'
elif 10 < lon < 45 and lat < -20:
return 'SAFR'
elif 30 < lon <= 45 and -10 < lat < 20:
return 'ERED'
elif -5 < lon < 15 and -5 < lat < 15:
return 'WAFR'
elif 25 < lon <= 40 and 40 < lat <= 48:
return 'BSEA'
elif lon < 40 and lat > 48:
return 'NEUR'
elif -10 <= lon <= 40 and 25 < lat <= 48:
return 'MED'
else:
return 'OTHER'
def load_country_names():
"""Load country code -> country name mapping"""
countries = {}
with open(COUNTRY_CSV, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
code = row['CountryCode'].strip()
name = row['CountryName'].strip()
# Clean up names like "United Arab Emirates (the)"
name = re.sub(r'\s*\(the\)\s*$', '', name)
name = re.sub(r'\s*\(.*?\)\s*$', '', name).strip()
countries[code] = name
return countries
def make_key(name):
"""Convert port name to underscore key"""
key = name.lower().strip()
key = re.sub(r"[''`]", "", key)
key = re.sub(r'[^a-z0-9]+', '_', key)
key = key.strip('_')
return key
def load_existing():
"""Load existing world_ports.json"""
if not os.path.exists(EXISTING_JSON):
print("WARNING: No existing world_ports.json found")
return {}
with open(EXISTING_JSON, 'r', encoding='utf-8') as f:
return json.load(f)
def parse_unlocode_seaports(country_names):
"""Parse UN/LOCODE CSV — ALL seaports with coordinates, no filtering."""
ports = []
skipped_no_coords = 0
skipped_no_func = 0
with open(UNLOCODE_CSV, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
func = row.get('Function', '')
# Position 0 = port (seaport)
if not func or func[0] != '1':
skipped_no_func += 1
continue
coords_dec = row.get('CoordinatesDecimal', '').strip()
if not coords_dec:
skipped_no_coords += 1
continue
try:
lat_str, lon_str = coords_dec.split(',')
lat = float(lat_str.strip())
lon = float(lon_str.strip())
except (ValueError, IndexError):
skipped_no_coords += 1
continue
# Validate coordinates
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
skipped_no_coords += 1
continue
n_functions = sum(1 for c in func if c not in '-0 ')
country_code = row.get('Country', '').strip()
location_code = row.get('Location', '').strip()
unlocode = f"{country_code}{location_code}"
name = row.get('NameWoDiacritics', '').strip() or row.get('Name', '').strip()
if not name:
continue
country = country_names.get(country_code, country_code)
# Determine size based on function count
if n_functions >= 4:
size = 'large'
elif n_functions >= 2:
size = 'medium'
else:
size = 'small'
ports.append({
'unlocode': unlocode,
'name': name,
'country': country,
'country_code': country_code,
'lat': round(lat, 5),
'lon': round(lon, 5),
'size': size,
})
print(f"UN/LOCODE: {len(ports)} seaports with coords")
print(f" Skipped (no port function): {skipped_no_func}")
print(f" Skipped (no/bad coords): {skipped_no_coords}")
return ports
def main():
print("=" * 60)
print("UN/LOCODE Merge Tool")
print("=" * 60)
# 1. Load country names
print("\n1. Loading country names...")
country_names = load_country_names()
print(f" {len(country_names)} countries loaded")
# 2. Load existing ports
print("\n2. Loading existing world_ports.json...")
existing = load_existing()
print(f" {len(existing)} existing ports")
# Build UNLOCODE index of existing ports
existing_unlocodes = {}
existing_keys = set()
for key, port in existing.items():
existing_keys.add(key)
if port.get('unlocode'):
existing_unlocodes[port['unlocode']] = key
# 3. Parse UN/LOCODE
print("\n3. Parsing UN/LOCODE seaports...")
unlocode_ports = parse_unlocode_seaports(country_names)
# 4. Merge
print("\n4. Merging...")
merged = dict(existing) # Start with existing (they have priority)
added = 0
updated_coords = 0
duplicates = 0
for port in unlocode_ports:
unlocode = port['unlocode']
key = make_key(port['name'])
# Skip if UNLOCODE already exists in our data
if unlocode in existing_unlocodes:
duplicates += 1
continue
# Skip if key already exists
if key in merged:
# But update UNLOCODE if missing
if not merged[key].get('unlocode'):
merged[key]['unlocode'] = unlocode
duplicates += 1
continue
size = port.get('size', 'small')
region = assign_region(port['lat'], port['lon'])
radius_nm = {'large': 12, 'medium': 8, 'small': 5}.get(size, 5)
merged[key] = {
'key': key,
'name': port['name'],
'country': port['country'],
'country_code': port['country_code'],
'unlocode': unlocode,
'lat': port['lat'],
'lon': port['lon'],
'size': size,
'region': region,
'radius_nm': radius_nm,
}
added += 1
print(f" Added: {added} new ports")
print(f" Duplicates skipped: {duplicates}")
# 5. Validate
print("\n5. Validating...")
errors = 0
for key, port in merged.items():
if not (-90 <= port['lat'] <= 90):
print(f" ERROR: {key} lat={port['lat']}")
errors += 1
if not (-180 <= port['lon'] <= 180):
print(f" ERROR: {key} lon={port['lon']}")
errors += 1
if errors:
print(f"\n{errors} validation errors!")
sys.exit(1)
print(f" All {len(merged)} ports validated OK")
# 6. Write JSON
print(f"\n6. Writing {len(merged)} ports -> {OUTPUT_JSON}")
with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
json.dump(merged, f, separators=(',', ':'), ensure_ascii=False)
# 7. Stats
print(f"\n{'=' * 60}")
print(f"RESULT: {len(existing)} existing + {added} new = {len(merged)} total ports")
print(f"{'=' * 60}")
print("\nBy region:")
regions = Counter(p['region'] for p in merged.values())
for r, c in sorted(regions.items(), key=lambda x: -x[1]):
print(f" {r}: {c}")
print("\nBy size:")
sizes = Counter(p['size'] for p in merged.values())
for s, c in sorted(sizes.items()):
print(f" {s}: {c}")
print("\nTop 10 countries:")
countries = Counter(p.get('country', 'Unknown') for p in merged.values())
for country, c in countries.most_common(10):
print(f" {country}: {c}")
if __name__ == '__main__':
main()