#!/usr/bin/env python3 """ Vessel Collector — Background data enrichment for Med, Baltic, Caspian. Discovers vessels via AIS, enriches via Equasis (owner/operator/manager), stores to vessels + contacts tables. Respects Equasis daily limit. Run: auto-started as daemon thread from seafare_api.py """ import threading import time import logging import config import maritime_db as db logger = logging.getLogger('vessel_collector') COLLECT_REGIONS = [ {'name': 'Mediterranean', 'lat': 37.0, 'lon': 18.0, 'radius_nm': 500}, {'name': 'Baltic', 'lat': 57.0, 'lon': 18.0, 'radius_nm': 400}, {'name': 'Caspian', 'lat': 41.0, 'lon': 51.0, 'radius_nm': 300}, ] def _collect_cycle(): """One collection cycle: discover → enrich → save.""" from ais_provider import get_provider from equasis_parser import get_parser as get_equasis provider = get_provider() # Phase 1: AIS discovery all_vessels = {} # mmsi -> {name, imo, type_code} for region in COLLECT_REGIONS: try: vessels = provider.get_vessels_in_area( region['lat'], region['lon'], region['radius_nm']) cargo_count = 0 for v in vessels: mmsi = v.get('mmsi') tc = v.get('type_code') or 0 if mmsi and tc >= config.COLLECTOR_MIN_TYPE_CODE: all_vessels[mmsi] = { 'mmsi': mmsi, 'name': v.get('name', ''), 'imo': v.get('imo', ''), 'type_code': tc, } cargo_count += 1 print(f"[Collector] {region['name']}: {len(vessels)} vessels, " f"{cargo_count} cargo/tanker") except Exception as e: print(f"[Collector] {region['name']} AIS error: {e}") if not all_vessels: print("[Collector] No vessels discovered") return # Phase 2: Queue for enrichment queue = db.get_vessels_needing_enrichment( list(all_vessels.keys()), ttl_days=config.COLLECTOR_ENRICHMENT_TTL_DAYS, limit=config.COLLECTOR_MAX_PER_CYCLE) if not queue: print(f"[Collector] All {len(all_vessels)} vessels already enriched") return remaining = db.get_equasis_remaining() if remaining < config.COLLECTOR_EQUASIS_RESERVE: print(f"[Collector] Equasis budget low ({remaining}), skipping enrichment") return print(f"[Collector] Enrichment queue: {len(queue)} vessels " f"(equasis remaining: {remaining})") # Phase 3: Equasis enrichment equasis = get_equasis() enriched = 0 contacts_added = 0 for mmsi in queue: if remaining < config.COLLECTOR_EQUASIS_RESERVE: print(f"[Collector] Equasis reserve reached, stopping") break vessel_info = all_vessels.get(mmsi, {}) name = vessel_info.get('name', '') imo = vessel_info.get('imo', '') try: # Search for IMO if not known if not imo and name: results = equasis.search_vessel(name) remaining -= 1 if results: imo = results[0].get('imo', '') if not imo: continue # Get full details (owner, operator, companies) details = equasis.get_vessel_details(imo) remaining -= 1 if details: details['mmsi'] = mmsi db.upsert_vessel(details) enriched += 1 # Save contacts from companies for company in details.get('companies', []): if company.get('name'): try: db.add_contact( company.get('role', 'owner'), company['name'], country=company.get('country', ''), address=company.get('address', ''), vessels_mmsi=mmsi, source='equasis') contacts_added += 1 except Exception: pass except Exception as e: print(f"[Collector] Enrichment error {mmsi} ({name}): {e}") time.sleep(2) # politeness between requests print(f"[Collector] Cycle done: discovered={len(all_vessels)}, " f"enriched={enriched}, contacts={contacts_added}, " f"equasis_remaining={remaining}") def _collector_loop(): """Background loop — runs _collect_cycle every N hours.""" interval = config.COLLECTOR_CYCLE_HOURS * 3600 # Wait for app to stabilize before first scan (1 hour delay) time.sleep(3600) while True: try: print(f"[Collector] Starting cycle...") _collect_cycle() except Exception as e: print(f"[Collector] Loop error: {e}") time.sleep(interval) def start_collector(): """Start as daemon thread.""" t = threading.Thread(target=_collector_loop, daemon=True, name='vessel-collector') t.start() print(f"[Collector] Started (cycle every {config.COLLECTOR_CYCLE_HOURS}h)")