#!/usr/bin/env python3 """ AIS Data Provider — Unified real-time vessel tracking SeaFare Montana Priority chain: AISStream (WebSocket) → AISHub (REST) → Digitraffic (REST) → MarineTraffic (scraping) Env vars: AISSTREAM_API_KEY — free, register at aisstream.io AISHUB_USERNAME — contributor ID from aishub.net DIGITRAFFIC_ENABLED — '1' (default) or '0' to disable Finnish AIS """ import json import logging import math import os import threading import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Set import requests import config import maritime_db as db logger = logging.getLogger('ais_provider') # --- Configuration --- AISSTREAM_API_KEY = os.environ.get('AISSTREAM_API_KEY', '') AISHUB_USERNAME = os.environ.get('AISHUB_USERNAME', '') DIGITRAFFIC_ENABLED = os.environ.get('DIGITRAFFIC_ENABLED', '1') # NAV_STATUS_MAP imported from marinetraffic_parser at module level try: from marinetraffic_parser import NAV_STATUS_MAP, classify_vessel_type except ImportError: NAV_STATUS_MAP = { 0: 'underway', 1: 'at anchor', 2: 'not under command', 3: 'restricted maneuverability', 4: 'constrained by draught', 5: 'moored', 6: 'aground', 7: 'fishing', 8: 'underway sailing', 14: 'ais-sart', 15: 'undefined', } classify_vessel_type = None # ============================================================================= # AISSTREAM CLIENT (WebSocket, real-time) # ============================================================================= class AISStreamClient: """ Persistent WebSocket client for AISStream.io. Uses synchronous websocket-client (no asyncio) in a background thread. Stores positions to DB via maritime_db.add_position(). Deferred start: WebSocket connects only when first MMSI or bounding box is subscribed (avoids wasteful global subscription on free tier). """ WS_URL = "wss://stream.aisstream.io/v0/stream" MAX_MMSI_FILTER = 50 RECONNECT_BASE = 5 # seconds RECONNECT_CAP = 60 # seconds def __init__(self, api_key: str): self.api_key = api_key self._tracked_mmsis: Set[str] = set() self._bounding_boxes: list = [] self._thread: Optional[threading.Thread] = None self._running = False self._connected = False self._reconnect_count = 0 self._last_message_time: Optional[float] = None self._positions_received = 0 self._lock = threading.Lock() self._resub_flag = False self._started = False self._last_error = None self._ws = None # websocket-client WebSocket instance def start(self): """Mark client as ready. Actual WebSocket connects on first subscribe.""" self._running = True self._started = True logger.info("AISStream client ready (deferred connect)") def _ensure_thread(self): """Start WebSocket thread if not already running.""" if self._thread and self._thread.is_alive(): return if not self._started: return self._thread = threading.Thread(target=self._run_loop, daemon=True, name="aisstream-ws") self._thread.start() logger.info("AISStream WebSocket thread started") def stop(self): """Graceful shutdown.""" self._running = False if self._ws: try: self._ws.close() except Exception: pass if self._thread: self._thread.join(timeout=5) logger.info("AISStream client stopped") def subscribe_mmsi(self, mmsi: str): """Add MMSI to tracking set (max 50).""" added = False with self._lock: if len(self._tracked_mmsis) < self.MAX_MMSI_FILTER: before = len(self._tracked_mmsis) self._tracked_mmsis.add(str(mmsi)) added = len(self._tracked_mmsis) > before if added: self._resub_flag = True self._ensure_thread() def unsubscribe_mmsi(self, mmsi: str): """Remove MMSI from tracking set.""" with self._lock: self._tracked_mmsis.discard(str(mmsi)) self._resub_flag = True def subscribe_area(self, lat_min: float, lon_min: float, lat_max: float, lon_max: float): """Set bounding box filter for area queries.""" with self._lock: bbox = [[lat_min, lon_min], [lat_max, lon_max]] if len(self._bounding_boxes) >= 3: self._bounding_boxes.pop(0) self._bounding_boxes.append(bbox) self._resub_flag = True self._ensure_thread() def is_connected(self) -> bool: return self._connected and self._running def thread_alive(self) -> bool: return self._thread is not None and self._thread.is_alive() def get_stats(self) -> dict: alive = self.thread_alive() stats = { 'connected': self._connected or alive, 'positions_received': self._positions_received, 'tracked_mmsis': len(self._tracked_mmsis), 'bounding_boxes': len(self._bounding_boxes), 'last_message': self._last_message_time, 'reconnect_count': self._reconnect_count, 'thread_alive': alive, } if self._last_error: stats['last_error'] = str(self._last_error)[:200] return stats # --- Short-lived sync query (runs in calling thread, works on Render) --- def query_position_sync(self, mmsi: str, max_seconds: float = 8) -> Optional[dict]: """Short-lived sync WebSocket: connect, subscribe by MMSI, collect 1 position, close. Runs in the calling thread (Flask request) — guaranteed to work on Render.""" return self._query_sync(mmsis=[str(mmsi)], bboxes=None, max_seconds=max_seconds, target_mmsi=str(mmsi)) def query_area_sync(self, lat_min: float, lon_min: float, lat_max: float, lon_max: float, max_seconds: float = 4) -> List[dict]: """Short-lived sync WebSocket: connect, subscribe by bbox, collect positions, close.""" bbox = [[lat_min, lon_min], [lat_max, lon_max]] positions = self._query_sync(mmsis=None, bboxes=[bbox], max_seconds=max_seconds) return positions if isinstance(positions, list) else [] def _query_sync(self, mmsis: Optional[List[str]] = None, bboxes: Optional[list] = None, max_seconds: float = 8, target_mmsi: str = None) -> any: """Core sync query: short-lived WebSocket connection.""" import websocket as ws_lib sub = { "APIKey": self.api_key, "BoundingBoxes": bboxes or [[[-90, -180], [90, 180]]], "FilterMessageTypes": ["PositionReport", "ShipStaticData"] } if mmsis: sub["FiltersShipMMSI"] = mmsis positions = [] try: ws = ws_lib.create_connection(self.WS_URL, timeout=15, header={"User-Agent": "SeaFare-Montana/1.0"}) except Exception as e: logger.warning(f"AISStream sync query connect failed: {e}") return None if target_mmsi else [] try: ws.send(json.dumps(sub)) ws.settimeout(2.0) deadline = time.time() + max_seconds while time.time() < deadline: try: raw = ws.recv() if not raw: continue data = json.loads(raw) if 'error' in data: logger.warning(f"AISStream sync error: {data['error']}") break # Store to DB (same as _handle_message) try: self._handle_message(raw) except Exception: pass msg_type = data.get('MessageType', '') if msg_type == 'PositionReport': pos = data.get('Message', {}).get('PositionReport', {}) mmsi_str = str(pos.get('UserID', data.get('MetaData', {}).get('MMSI', ''))) lat = pos.get('Latitude', 0) lon = pos.get('Longitude', 0) if abs(lat) <= 90 and abs(lon) <= 180: p = { 'mmsi': mmsi_str, 'latitude': lat, 'longitude': lon, 'speed': pos.get('Sog'), 'course': pos.get('Cog'), 'heading': pos.get('TrueHeading'), 'timestamp': data.get('MetaData', {}).get('time_utc'), 'source': 'aisstream', } positions.append(p) self._positions_received += 1 self._last_message_time = time.time() # If looking for specific MMSI and found it — return immediately if target_mmsi and mmsi_str == target_mmsi: return p except ws_lib.WebSocketTimeoutException: continue except ws_lib.WebSocketConnectionClosedException: break finally: try: ws.close() except Exception: pass if target_mmsi: # Didn't find the specific MMSI return None return positions # --- Internal (persistent thread, no asyncio) --- def _run_loop(self): """Thread target: synchronous WebSocket loop with auto-reconnect. Catches BaseException to survive gunicorn worker signals.""" try: self._run_loop_inner() except BaseException as e: self._last_error = f"_run_loop KILLED: {type(e).__name__}: {e}" logger.error(f"AISStream _run_loop killed by {type(e).__name__}: {e}") # Re-enable for watchdog restart self._running = True STARTUP_DELAY = 15 # seconds — let gunicorn finish worker setup before I/O def _run_loop_inner(self): """Actual reconnect loop.""" logger.info("AISStream sync _run_loop STARTED, waiting %ds for gunicorn init", self.STARTUP_DELAY) self._last_error = f"startup delay ({self.STARTUP_DELAY}s)..." time.sleep(self.STARTUP_DELAY) logger.info("AISStream startup delay done, entering main loop") while self._running: # Wait until we have something to subscribe to while self._running and not self._tracked_mmsis and not self._bounding_boxes: time.sleep(1) if not self._running: break try: self._connect_and_listen() except BaseException as e: self._last_error = f"connection error: {type(e).__name__}: {e}" logger.warning(f"AISStream connection error: {type(e).__name__}: {e}") self._connected = False if not self._running: break # Reconnect with exponential backoff delay = min(self.RECONNECT_BASE * (2 ** min(self._reconnect_count, 6)), self.RECONNECT_CAP) self._reconnect_count += 1 logger.info(f"AISStream reconnecting in {delay}s (attempt {self._reconnect_count})") time.sleep(delay) self._last_error = f"_run_loop exited, _running={self._running}" logger.info("AISStream _run_loop exited") def _connect_and_listen(self): """Connect to AISStream via sync websocket-client, listen for messages.""" import websocket as ws_lib # websocket-client package self._last_error = "pre-connect: resolving DNS..." logger.info("AISStream connecting (sync)...") # Pre-resolve DNS (helps diagnose where it hangs) import socket try: addr = socket.getaddrinfo("stream.aisstream.io", 443, socket.AF_INET) self._last_error = f"DNS ok ({addr[0][4][0]}), creating connection..." except Exception as e: self._last_error = f"DNS failed: {e}" raise self._ws = ws_lib.create_connection( self.WS_URL, timeout=20, header={"User-Agent": "SeaFare-Montana/1.0"} ) self._last_error = "connected, sending subscription..." try: # Send subscription self._resub_flag = False sub = self._build_subscription() self._ws.send(json.dumps(sub)) self._connected = True self._reconnect_count = 0 self._last_error = "connected, receiving data..." logger.info(f"AISStream subscribed (MMSIs: {len(self._tracked_mmsis)}, " f"BBoxes: {len(self._bounding_boxes)})") self._ws.settimeout(5.0) # recv timeout for checking resub/running while self._running: # Re-subscribe if needed if self._resub_flag: self._resub_flag = False sub = self._build_subscription() self._ws.send(json.dumps(sub)) logger.info(f"AISStream re-subscribed") try: raw = self._ws.recv() if raw: try: self._handle_message(raw) except Exception as e: logger.debug(f"AISStream message error: {e}") except ws_lib.WebSocketTimeoutException: continue # Timeout is normal — loop back to check flags except ws_lib.WebSocketConnectionClosedException: self._last_error = "connection closed by server" logger.warning("AISStream connection closed by server") break finally: self._connected = False try: self._ws.close() except Exception: pass self._ws = None def _build_subscription(self) -> dict: """Build AISStream subscription message. No global fallback — only explicit filters.""" sub = { "APIKey": self.api_key, "BoundingBoxes": self._bounding_boxes if self._bounding_boxes else [ [[-90, -180], [90, 180]] # Required by API, but filtered by MMSI ], "FilterMessageTypes": ["PositionReport", "ShipStaticData"] } with self._lock: if self._tracked_mmsis: sub["FiltersShipMMSI"] = list(self._tracked_mmsis)[:self.MAX_MMSI_FILTER] return sub def _handle_message(self, raw: str): """Parse AISStream message and store position in DB.""" data = json.loads(raw) # Check for error response if 'error' in data: self._last_error = f"API error: {data['error']}" logger.warning(f"AISStream error: {data['error']}") return msg_type = data.get('MessageType', '') meta = data.get('MetaData', {}) if msg_type == 'PositionReport': pos = data.get('Message', {}).get('PositionReport', {}) mmsi = str(pos.get('UserID', meta.get('MMSI', ''))) if not mmsi or mmsi == '0': return lat = pos.get('Latitude', 0) lon = pos.get('Longitude', 0) # AIS uses 91/181 as "not available" sentinels if abs(lat) > 90 or abs(lon) > 180: return nav_code = pos.get('NavigationStatus') nav_status = NAV_STATUS_MAP.get(nav_code) if nav_code is not None else None timestamp = meta.get('time_utc', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')) try: db.add_position( mmsi=mmsi, lat=lat, lon=lon, speed=pos.get('Sog'), course=pos.get('Cog'), heading=pos.get('TrueHeading'), status=nav_status, destination=None, # Not in PositionReport eta=None, timestamp=timestamp, source='aisstream' ) except Exception as db_err: logger.debug(f"AISStream DB write error for {mmsi}: {db_err}") self._positions_received += 1 self._last_message_time = time.time() elif msg_type == 'ShipStaticData': static = data.get('Message', {}).get('ShipStaticData', {}) mmsi = str(meta.get('MMSI', '')) ship_name = meta.get('ShipName', '').strip() if mmsi and ship_name: try: vessel_data = { 'mmsi': mmsi, 'name': ship_name, } # AIS type code (e.g. 70=cargo, 80=tanker) type_code = static.get('Type') if type_code is not None: vessel_data['type_code'] = int(type_code) imo_raw = static.get('ImoNumber') if imo_raw and str(imo_raw) != '0': vessel_data['imo'] = str(imo_raw) callsign = static.get('CallSign', '').strip() if callsign: vessel_data['callsign'] = callsign dim = static.get('Dimension', {}) if dim: a, b = dim.get('A', 0), dim.get('B', 0) c_val, d_val = dim.get('C', 0), dim.get('D', 0) length = a + b width = c_val + d_val if length > 0: vessel_data['length'] = length if width > 0: vessel_data['width'] = width db.upsert_vessel(vessel_data) # Destination/ETA belong to positions table, not vessels dest = (static.get('Destination') or '').strip() if dest: eta_raw = static.get('Eta') eta_str = None if isinstance(eta_raw, dict) and eta_raw.get('Month'): eta_str = f"{eta_raw.get('Month',0):02d}-{eta_raw.get('Day',0):02d} {eta_raw.get('Hour',0):02d}:{eta_raw.get('Minute',0):02d}" elif isinstance(eta_raw, str) and eta_raw.strip(): eta_str = eta_raw.strip() db.update_position_destination(mmsi, dest, eta_str) except Exception as e: logger.debug(f"ShipStaticData error for {mmsi}: {e}") # ============================================================================= # AISHUB CLIENT (REST, on-demand) # ============================================================================= class AISHubClient: """ REST client for AISHub.net API. Strict rate limit: max 1 request per minute. """ BASE_URL = "https://data.aishub.net/ws.php" MIN_REQUEST_INTERVAL = 61 # seconds (1 min + 1s safety) def __init__(self, username: str): self.username = username self._last_request_time = 0.0 self._lock = threading.Lock() def can_request(self) -> bool: """Check if rate limit allows a request right now.""" return (time.time() - self._last_request_time) >= self.MIN_REQUEST_INTERVAL def get_vessel_position(self, mmsi: str) -> Optional[dict]: """Get position for a single vessel by MMSI.""" vessels = self._request({'mmsi': mmsi}) if vessels: v = vessels[0] self._store_positions(vessels) return v return None def get_vessel_by_imo(self, imo: str) -> Optional[dict]: """Get position for a single vessel by IMO.""" vessels = self._request({'imo': imo}) if vessels: self._store_positions(vessels) return vessels[0] return None def get_vessels_in_area(self, lat_min: float, lat_max: float, lon_min: float, lon_max: float) -> List[dict]: """Get all vessels in a geographic bounding box.""" vessels = self._request({ 'latmin': lat_min, 'latmax': lat_max, 'lonmin': lon_min, 'lonmax': lon_max }) if vessels: self._store_positions(vessels) return vessels or [] def _request(self, extra_params: dict) -> Optional[List[dict]]: """Execute HTTP GET with rate limiting.""" with self._lock: elapsed = time.time() - self._last_request_time if elapsed < self.MIN_REQUEST_INTERVAL: wait = self.MIN_REQUEST_INTERVAL - elapsed logger.info(f"AISHub rate limit: waiting {wait:.0f}s") time.sleep(wait) self._last_request_time = time.time() params = { 'username': self.username, 'format': '1', # human-readable 'output': 'json', 'compress': '0', } params.update(extra_params) try: resp = requests.get(self.BASE_URL, params=params, timeout=15) if resp.status_code != 200: logger.warning(f"AISHub HTTP {resp.status_code}") return None data = resp.json() # AISHub returns list of objects (or error string) if isinstance(data, list): return [self._parse_vessel(item) for item in data if isinstance(item, dict)] if isinstance(data, dict) and data.get('ERROR'): logger.warning(f"AISHub error: {data['ERROR']}") return None return None except requests.RequestException as e: logger.warning(f"AISHub request failed: {e}") return None except (ValueError, KeyError) as e: logger.warning(f"AISHub parse error: {e}") return None def _parse_vessel(self, item: dict) -> dict: """Normalize AISHub response to internal format.""" nav_code = item.get('NAVSTAT') nav_status = NAV_STATUS_MAP.get(int(nav_code)) if nav_code is not None else None return { 'mmsi': str(item.get('MMSI', '')), 'name': (item.get('NAME') or '').strip(), 'imo': str(item.get('IMO', '')) if item.get('IMO') else None, 'latitude': item.get('LATITUDE'), 'longitude': item.get('LONGITUDE'), 'speed': item.get('SOG'), 'course': item.get('COG'), 'heading': item.get('HEADING'), 'status': nav_status, 'type_code': item.get('TYPE'), 'destination': (item.get('DEST') or '').strip(), 'eta': item.get('ETA', ''), 'draught': item.get('DRAUGHT'), 'callsign': (item.get('CALLSIGN') or '').strip(), 'timestamp': item.get('TIME', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')), 'source': 'aishub', } def _store_positions(self, vessels: List[dict]): """Batch-store positions in DB.""" for v in vessels: mmsi = v.get('mmsi') lat = v.get('latitude') lon = v.get('longitude') if mmsi and lat is not None and lon is not None: try: db.add_position( mmsi=mmsi, lat=lat, lon=lon, speed=v.get('speed'), course=v.get('course'), heading=v.get('heading'), status=v.get('status'), destination=v.get('destination'), eta=v.get('eta'), timestamp=v.get('timestamp'), source='aishub' ) except Exception: pass # ============================================================================= # DIGITRAFFIC CLIENT (REST, Finnish/Baltic AIS, free, no key) # ============================================================================= class DigitrafficClient: """Finnish Digitraffic — free real-time AIS, no API key needed. Coverage: Finnish/Baltic waters. API: https://meri.digitraffic.fi/api/ais/v1/ License: CC BY 4.0 """ BASE_URL = "https://meri.digitraffic.fi/api/ais/v1" MIN_REQUEST_INTERVAL = 10 # seconds between requests (be polite) def __init__(self): self._last_request_time = 0.0 self._lock = threading.Lock() def can_request(self) -> bool: """Check if rate limit allows a request right now.""" return (time.time() - self._last_request_time) >= self.MIN_REQUEST_INTERVAL def get_vessel_position(self, mmsi: str) -> Optional[dict]: """Get single vessel position by MMSI.""" data = self._request(f"/locations/{mmsi}") if not data: return None feature = data if data.get('type') == 'FeatureCollection': features = data.get('features', []) feature = features[0] if features else None if not feature: return None vessel = self._parse_feature(feature) if vessel: self._store_positions([vessel]) return vessel def get_vessels_in_area(self, lat_min: float, lat_max: float, lon_min: float, lon_max: float) -> List[dict]: """Get all vessels, filter by bounding box client-side.""" data = self._request("/locations") if not data or data.get('type') != 'FeatureCollection': return [] vessels = [] for feature in data.get('features', []): coords = feature.get('geometry', {}).get('coordinates', []) if len(coords) >= 2: lon, lat = coords[0], coords[1] if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max: v = self._parse_feature(feature) if v: vessels.append(v) if vessels: self._store_positions(vessels) return vessels def _request(self, path: str) -> Optional[dict]: """Execute HTTP GET with rate limiting.""" with self._lock: elapsed = time.time() - self._last_request_time if elapsed < self.MIN_REQUEST_INTERVAL: time.sleep(self.MIN_REQUEST_INTERVAL - elapsed) self._last_request_time = time.time() try: resp = requests.get(f"{self.BASE_URL}{path}", timeout=15, headers={'Accept-Encoding': 'gzip'}) if resp.status_code != 200: logger.warning(f"Digitraffic HTTP {resp.status_code}") return None return resp.json() except Exception as e: logger.warning(f"Digitraffic request failed: {e}") return None def _parse_feature(self, feature: dict) -> Optional[dict]: """Parse GeoJSON feature to internal vessel format.""" props = feature.get('properties', {}) coords = feature.get('geometry', {}).get('coordinates', []) if len(coords) < 2: return None lon, lat = coords[0], coords[1] if lat == 0 and lon == 0: return None nav_code = props.get('navStat') nav_status = NAV_STATUS_MAP.get(nav_code) if nav_code is not None else None mmsi = str(props.get('mmsi', '')) if not mmsi: return None return { 'mmsi': mmsi, 'name': '', 'imo': None, 'latitude': lat, 'longitude': lon, 'speed': props.get('sog'), 'course': props.get('cog'), 'heading': props.get('heading'), 'status': nav_status, 'type_code': props.get('shipType'), 'destination': '', 'eta': '', 'draught': props.get('draught'), 'callsign': '', 'timestamp': props.get('timestampExternal', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')), 'source': 'digitraffic', } def _store_positions(self, vessels: List[dict]): """Store positions in DB.""" for v in vessels: mmsi = v.get('mmsi') lat = v.get('latitude') lon = v.get('longitude') if mmsi and lat is not None and lon is not None: try: db.add_position( mmsi=mmsi, lat=lat, lon=lon, speed=v.get('speed'), course=v.get('course'), heading=v.get('heading'), status=v.get('status'), destination=v.get('destination'), eta=v.get('eta'), timestamp=v.get('timestamp'), source='digitraffic' ) except Exception: pass # ============================================================================= # AIS PROVIDER (unified facade) # ============================================================================= class AISProvider: """ Unified AIS data provider with priority chain: 1. AISStream (real-time DB cache from WebSocket) 2. AISHub (REST query, 1 req/min) 3. Digitraffic (Finnish/Baltic AIS, free, no key) 4. MarineTraffic (scraping fallback) """ POSITION_FRESH_MINUTES = 30 # single vessel: accept positions up to 30 min AREA_FRESH_MINUTES = 30 # area queries: cache TTL def __init__(self): self.aisstream: Optional[AISStreamClient] = None self.aishub: Optional[AISHubClient] = None self.digitraffic: Optional[DigitrafficClient] = None # Major global shipping areas for pre-subscription on startup _STARTUP_AREAS = [ [[25, -10], [45, 40]], # Mediterranean + Black Sea [[48, -10], [62, 15]], # North Sea + English Channel + Baltic [[36, 46], [47, 55]], # Caspian Sea ] def initialize(self): """Start available AIS clients based on env vars.""" if AISSTREAM_API_KEY: self.aisstream = AISStreamClient(AISSTREAM_API_KEY) self.aisstream.start() # Pre-subscribe to major shipping lanes so data flows before first user request for bbox in self._STARTUP_AREAS: # max 3 bounding boxes self.aisstream.subscribe_area(bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1]) logger.info("AISStream client started with pre-subscribed shipping lanes") # Start watchdog thread to auto-restart dead AISStream thread self._watchdog_thread = threading.Thread( target=self._watchdog_loop, daemon=True, name="ais-watchdog") self._watchdog_thread.start() else: logger.info("AISStream: no API key — skipped") if AISHUB_USERNAME: self.aishub = AISHubClient(AISHUB_USERNAME) logger.info("AISHub client initialized") else: logger.info("AISHub: no username — skipped") if DIGITRAFFIC_ENABLED == '1': self.digitraffic = DigitrafficClient() logger.info("Digitraffic client initialized (Finnish AIS)") else: logger.info("Digitraffic: disabled via env") _watchdog_running = True def _watchdog_loop(self): """Background watchdog: checks AISStream thread every 30s, restarts if dead. Uses BaseException to survive gunicorn worker signals.""" logger.info("AIS Watchdog STARTED") while self._watchdog_running and self.aisstream: try: time.sleep(30) if not self.aisstream._running: self.aisstream._running = True self.aisstream._started = True logger.warning("AIS Watchdog: re-enabled _running flag") thread = self.aisstream._thread if thread is None or not thread.is_alive(): logger.warning(f"AIS Watchdog: thread dead (last_error={self.aisstream._last_error})") self.aisstream._ensure_thread() if not self.aisstream._thread or not self.aisstream._thread.is_alive(): for bbox in self._STARTUP_AREAS: self.aisstream.subscribe_area(bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1]) logger.info("AIS Watchdog: re-subscribed to shipping lanes") except BaseException as e: logger.error(f"AIS Watchdog error: {type(e).__name__}: {e}") try: time.sleep(15) except BaseException: pass logger.info("AIS Watchdog stopped") def shutdown(self): """Graceful shutdown.""" self._watchdog_running = False if self.aisstream: self.aisstream.stop() def get_vessel_position(self, mmsi: str = None, imo: str = None, name: str = None) -> Optional[dict]: """ Get current position for a single vessel. Priority: DB (recent) → AISHub REST → stale DB. """ resolved_mmsi = mmsi # Resolve MMSI from IMO/name via DB if not resolved_mmsi and (imo or name): vessel = None if imo: vessel = db.get_vessel(imo=imo) if not vessel and name: vessel = db.get_vessel(name=name) if vessel: resolved_mmsi = vessel.get('mmsi') if not resolved_mmsi: return None # 1. Check DB for recent position (from AISStream or previous queries) pos = db.get_last_position(resolved_mmsi) if pos and self._is_fresh(pos, self.POSITION_FRESH_MINUTES): pos['source'] = pos.get('source', 'cache') return pos # 2. AISStream: persistent thread OR sync fallback if self.aisstream: if self.aisstream.thread_alive(): # Background thread is alive — subscribe and poll DB self.aisstream.subscribe_mmsi(resolved_mmsi) for _ in range(5): time.sleep(1.5) pos = db.get_last_position(resolved_mmsi) if pos and self._is_fresh(pos, self.POSITION_FRESH_MINUTES): pos['source'] = pos.get('source', 'aisstream') return pos else: # Thread dead (Render) — short-lived sync query in request thread try: result = self.aisstream.query_position_sync(resolved_mmsi, max_seconds=8) if result: return result # Also check DB (query_position_sync stores to DB) pos = db.get_last_position(resolved_mmsi) if pos and self._is_fresh(pos, self.POSITION_FRESH_MINUTES): pos['source'] = pos.get('source', 'aisstream') return pos except Exception as e: logger.warning(f"AISStream sync query failed: {e}") # 3. AISHub REST query if self.aishub and self.aishub.can_request(): try: result = self.aishub.get_vessel_position(resolved_mmsi) if result: return result except Exception as e: logger.warning(f"AISHub query failed: {e}") # If we have IMO and no MMSI result, try IMO on AISHub if not pos and imo and self.aishub and self.aishub.can_request(): try: result = self.aishub.get_vessel_by_imo(imo) if result: return result except Exception as e: logger.warning(f"AISHub IMO query failed: {e}") # 4. Digitraffic REST query (Finnish waters, free, no key) if self.digitraffic and self.digitraffic.can_request(): try: result = self.digitraffic.get_vessel_position(resolved_mmsi) if result: return result except Exception as e: logger.warning(f"Digitraffic query failed: {e}") # 5. MarineTraffic scraping fallback try: import marinetraffic_parser as mt parser = mt.get_parser() mt_pos = parser.get_vessel_page(resolved_mmsi) # get_vessel_page returns 'latitude'/'longitude' keys (not 'lat'/'lon') mt_lat = mt_pos.get('latitude') or mt_pos.get('lat') if mt_pos else None mt_lon = mt_pos.get('longitude') or mt_pos.get('lon') if mt_pos else None if mt_lat and mt_lon: mt_lat = float(mt_lat) mt_lon = float(mt_lon) db.add_position( mmsi=resolved_mmsi, lat=mt_lat, lon=mt_lon, speed=mt_pos.get('speed'), course=mt_pos.get('course'), timestamp=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), source='marinetraffic' ) return { 'mmsi': resolved_mmsi, 'latitude': mt_lat, 'longitude': mt_lon, 'speed': mt_pos.get('speed'), 'course': mt_pos.get('course'), 'heading': mt_pos.get('heading'), 'status': mt_pos.get('nav_status') or mt_pos.get('status'), 'destination': mt_pos.get('destination'), 'timestamp': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), 'source': 'marinetraffic', } except Exception as e: logger.debug(f"MarineTraffic fallback failed: {e}") # 6. Return stale DB position if exists (better than nothing) if pos: pos['source'] = 'cache_stale' pos['stale'] = True return pos return None def get_vessels_in_area(self, lat: float, lon: float, radius_nm: float = 15, vessel_type: str = None, min_dwt: float = None, max_dwt: float = None) -> List[dict]: """ Get vessels in geographic area. Uses ALL sources and merges results for maximum coverage. """ lat_delta = radius_nm / 60.0 cos_lat = max(math.cos(math.radians(lat)), 0.01) lon_delta = radius_nm / (60.0 * cos_lat) lat_min = lat - lat_delta lat_max = lat + lat_delta lon_min = lon - lon_delta lon_max = lon + lon_delta vessels = [] existing_mmsis = set() def _merge(new_list): for v in (new_list or []): mmsi = v.get('mmsi') if mmsi and mmsi not in existing_mmsis: existing_mmsis.add(mmsi) vessels.append(v) # 1. DB cache (fast, already have data from previous scrapes) _merge(db.get_positions_in_area(lat_min, lat_max, lon_min, lon_max, max_age_minutes=self.AREA_FRESH_MINUTES)) # 2. MarineTraffic data comes from mt_bulk_staging (background z:4 scan, ~31K vessels) # Free MT tile API only works at z:2-4 — tiles too large for port-level live queries. # mt_bulk_staging is queried separately in _tool_search_vessels_near_port(). # 3. AISStream: persistent thread OR sync fallback if self.aisstream: if self.aisstream.thread_alive(): self.aisstream.subscribe_area(lat_min, lon_min, lat_max, lon_max) elif len(vessels) < 20: # Thread dead — use sync query to supplement results try: ais_vessels = self.aisstream.query_area_sync( lat_min, lon_min, lat_max, lon_max, max_seconds=6) _merge(ais_vessels) except Exception as e: logger.warning(f"AISStream sync area query failed: {e}") # 4. Digitraffic (free, no key — Finnish/Baltic) if self.digitraffic and self.digitraffic.can_request(): try: _merge(self.digitraffic.get_vessels_in_area( lat_min, lat_max, lon_min, lon_max)) except Exception as e: logger.warning(f"Digitraffic area query failed: {e}") # 5. AISHub (if configured) if len(vessels) < 10 and self.aishub and self.aishub.can_request(): try: _merge(self.aishub.get_vessels_in_area( lat_min, lat_max, lon_min, lon_max)) except Exception as e: logger.warning(f"AISHub area query failed: {e}") # Filter results return self._filter_vessels(vessels, vessel_type, min_dwt, max_dwt) def get_status(self) -> dict: """Get provider status for /health endpoint.""" result = {} if self.aisstream: stats = self.aisstream.get_stats() stats['watchdog_alive'] = hasattr(self, '_watchdog_thread') and self._watchdog_thread.is_alive() result['aisstream'] = stats else: result['aisstream'] = 'not configured' if self.aishub: result['aishub'] = {'configured': True, 'can_request': self.aishub.can_request()} else: result['aishub'] = 'not configured' if self.digitraffic: result['digitraffic'] = {'configured': True, 'can_request': self.digitraffic.can_request()} else: result['digitraffic'] = 'not configured' return result # --- Helpers --- @staticmethod def _is_fresh(position: dict, max_age_minutes: int) -> bool: """Check if a position is fresh enough. Handles str (SQLite) and datetime (PostgreSQL).""" ts = position.get('timestamp') if not ts: return False try: if isinstance(ts, datetime): ts_dt = ts.replace(tzinfo=None) if ts.tzinfo else ts elif isinstance(ts, str): ts_clean = ts.replace('Z', '').replace('+00:00', '') ts_dt = datetime.fromisoformat(ts_clean) else: return False age = datetime.utcnow() - ts_dt return age < timedelta(minutes=max_age_minutes) except (ValueError, TypeError): return False @staticmethod def _filter_vessels(vessels: List[dict], vessel_type: str = None, min_dwt: float = None, max_dwt: float = None) -> List[dict]: """Filter vessels by type and DWT.""" result = vessels if config.FREIGHT_VESSELS_ONLY: result = [v for v in result if v.get('type_code') is None or (v.get('type_code') or 0) >= config.FREIGHT_MIN_TYPE_CODE] if vessel_type: result = [v for v in result if v.get('type_category') == vessel_type or vessel_type.lower() in (v.get('type', '') or '').lower()] if min_dwt is not None: result = [v for v in result if v.get('dwt') is not None and float(v['dwt']) >= min_dwt] if max_dwt is not None: result = [v for v in result if v.get('dwt') is not None and float(v['dwt']) <= max_dwt] return result # ============================================================================= # SINGLETON & STARTUP # ============================================================================= _provider: Optional[AISProvider] = None _provider_lock = threading.Lock() def get_provider() -> AISProvider: """Get or create the singleton AIS provider.""" global _provider if _provider is None: with _provider_lock: if _provider is None: _provider = AISProvider() _provider.initialize() return _provider def start_ais_provider(): """Initialize and start the AIS provider. Called from seafare_api.py.""" provider = get_provider() ais_on = 'ON' if provider.aisstream else 'OFF' hub_on = 'ON' if provider.aishub else 'OFF' dt_on = 'ON' if provider.digitraffic else 'OFF' print(f" AIS Provider: AISStream={ais_on}, AISHub={hub_on}, Digitraffic={dt_on}") return provider # ============================================================================= # STANDALONE TEST # ============================================================================= if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) print("AIS Provider — standalone test") print(f" AISSTREAM_API_KEY: {'set' if AISSTREAM_API_KEY else 'not set'}") print(f" AISHUB_USERNAME: {'set' if AISHUB_USERNAME else 'not set'}") print(f" DIGITRAFFIC_ENABLED: {DIGITRAFFIC_ENABLED}") provider = AISProvider() provider.initialize() print("\nProvider status:", json.dumps(provider.get_status(), indent=2, default=str)) # Test: get position for a well-known MMSI (EVER GIVEN = 353136000) print("\nTesting get_vessel_position(mmsi='353136000')...") pos = provider.get_vessel_position(mmsi='353136000') if pos: print(f" Position: lat={pos.get('latitude')}, lon={pos.get('longitude')}, " f"speed={pos.get('speed')}, source={pos.get('source')}") else: print(" No position data available") if provider.aisstream: print("\nWaiting 15s for AISStream data...") time.sleep(15) stats = provider.aisstream.get_stats() print(f" Positions received: {stats['positions_received']}") provider.shutdown() print("\nDone.")