1141 lines
46 KiB
Python
1141 lines
46 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
AIS Data Provider — Unified real-time vessel tracking
|
||
|
|
SeaFare Montana
|
||
|
|
|
||
|
|
Priority chain: AISStream (WebSocket) → AISHub (REST) → Digitraffic (REST) → MarineTraffic (scraping)
|
||
|
|
|
||
|
|
Env vars:
|
||
|
|
AISSTREAM_API_KEY — free, register at aisstream.io
|
||
|
|
AISHUB_USERNAME — contributor ID from aishub.net
|
||
|
|
DIGITRAFFIC_ENABLED — '1' (default) or '0' to disable Finnish AIS
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
import math
|
||
|
|
import os
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
from typing import Dict, List, Optional, Set
|
||
|
|
|
||
|
|
import requests
|
||
|
|
|
||
|
|
import config
|
||
|
|
import maritime_db as db
|
||
|
|
|
||
|
|
logger = logging.getLogger('ais_provider')
|
||
|
|
|
||
|
|
# --- Configuration ---
|
||
|
|
AISSTREAM_API_KEY = os.environ.get('AISSTREAM_API_KEY', '')
|
||
|
|
AISHUB_USERNAME = os.environ.get('AISHUB_USERNAME', '')
|
||
|
|
DIGITRAFFIC_ENABLED = os.environ.get('DIGITRAFFIC_ENABLED', '1')
|
||
|
|
|
||
|
|
# NAV_STATUS_MAP imported from marinetraffic_parser at module level
|
||
|
|
try:
|
||
|
|
from marinetraffic_parser import NAV_STATUS_MAP, classify_vessel_type
|
||
|
|
except ImportError:
|
||
|
|
NAV_STATUS_MAP = {
|
||
|
|
0: 'underway', 1: 'at anchor', 2: 'not under command',
|
||
|
|
3: 'restricted maneuverability', 4: 'constrained by draught',
|
||
|
|
5: 'moored', 6: 'aground', 7: 'fishing', 8: 'underway sailing',
|
||
|
|
14: 'ais-sart', 15: 'undefined',
|
||
|
|
}
|
||
|
|
classify_vessel_type = None
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# AISSTREAM CLIENT (WebSocket, real-time)
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
class AISStreamClient:
|
||
|
|
"""
|
||
|
|
Persistent WebSocket client for AISStream.io.
|
||
|
|
Uses synchronous websocket-client (no asyncio) in a background thread.
|
||
|
|
Stores positions to DB via maritime_db.add_position().
|
||
|
|
|
||
|
|
Deferred start: WebSocket connects only when first MMSI or bounding box is
|
||
|
|
subscribed (avoids wasteful global subscription on free tier).
|
||
|
|
"""
|
||
|
|
|
||
|
|
WS_URL = "wss://stream.aisstream.io/v0/stream"
|
||
|
|
MAX_MMSI_FILTER = 50
|
||
|
|
RECONNECT_BASE = 5 # seconds
|
||
|
|
RECONNECT_CAP = 60 # seconds
|
||
|
|
|
||
|
|
def __init__(self, api_key: str):
|
||
|
|
self.api_key = api_key
|
||
|
|
self._tracked_mmsis: Set[str] = set()
|
||
|
|
self._bounding_boxes: list = []
|
||
|
|
self._thread: Optional[threading.Thread] = None
|
||
|
|
self._running = False
|
||
|
|
self._connected = False
|
||
|
|
self._reconnect_count = 0
|
||
|
|
self._last_message_time: Optional[float] = None
|
||
|
|
self._positions_received = 0
|
||
|
|
self._lock = threading.Lock()
|
||
|
|
self._resub_flag = False
|
||
|
|
self._started = False
|
||
|
|
self._last_error = None
|
||
|
|
self._ws = None # websocket-client WebSocket instance
|
||
|
|
|
||
|
|
def start(self):
|
||
|
|
"""Mark client as ready. Actual WebSocket connects on first subscribe."""
|
||
|
|
self._running = True
|
||
|
|
self._started = True
|
||
|
|
logger.info("AISStream client ready (deferred connect)")
|
||
|
|
|
||
|
|
def _ensure_thread(self):
|
||
|
|
"""Start WebSocket thread if not already running."""
|
||
|
|
if self._thread and self._thread.is_alive():
|
||
|
|
return
|
||
|
|
if not self._started:
|
||
|
|
return
|
||
|
|
self._thread = threading.Thread(target=self._run_loop, daemon=True,
|
||
|
|
name="aisstream-ws")
|
||
|
|
self._thread.start()
|
||
|
|
logger.info("AISStream WebSocket thread started")
|
||
|
|
|
||
|
|
def stop(self):
|
||
|
|
"""Graceful shutdown."""
|
||
|
|
self._running = False
|
||
|
|
if self._ws:
|
||
|
|
try:
|
||
|
|
self._ws.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
if self._thread:
|
||
|
|
self._thread.join(timeout=5)
|
||
|
|
logger.info("AISStream client stopped")
|
||
|
|
|
||
|
|
def subscribe_mmsi(self, mmsi: str):
|
||
|
|
"""Add MMSI to tracking set (max 50)."""
|
||
|
|
added = False
|
||
|
|
with self._lock:
|
||
|
|
if len(self._tracked_mmsis) < self.MAX_MMSI_FILTER:
|
||
|
|
before = len(self._tracked_mmsis)
|
||
|
|
self._tracked_mmsis.add(str(mmsi))
|
||
|
|
added = len(self._tracked_mmsis) > before
|
||
|
|
if added:
|
||
|
|
self._resub_flag = True
|
||
|
|
self._ensure_thread()
|
||
|
|
|
||
|
|
def unsubscribe_mmsi(self, mmsi: str):
|
||
|
|
"""Remove MMSI from tracking set."""
|
||
|
|
with self._lock:
|
||
|
|
self._tracked_mmsis.discard(str(mmsi))
|
||
|
|
self._resub_flag = True
|
||
|
|
|
||
|
|
def subscribe_area(self, lat_min: float, lon_min: float,
|
||
|
|
lat_max: float, lon_max: float):
|
||
|
|
"""Set bounding box filter for area queries."""
|
||
|
|
with self._lock:
|
||
|
|
bbox = [[lat_min, lon_min], [lat_max, lon_max]]
|
||
|
|
if len(self._bounding_boxes) >= 3:
|
||
|
|
self._bounding_boxes.pop(0)
|
||
|
|
self._bounding_boxes.append(bbox)
|
||
|
|
self._resub_flag = True
|
||
|
|
self._ensure_thread()
|
||
|
|
|
||
|
|
def is_connected(self) -> bool:
|
||
|
|
return self._connected and self._running
|
||
|
|
|
||
|
|
def thread_alive(self) -> bool:
|
||
|
|
return self._thread is not None and self._thread.is_alive()
|
||
|
|
|
||
|
|
def get_stats(self) -> dict:
|
||
|
|
alive = self.thread_alive()
|
||
|
|
stats = {
|
||
|
|
'connected': self._connected or alive,
|
||
|
|
'positions_received': self._positions_received,
|
||
|
|
'tracked_mmsis': len(self._tracked_mmsis),
|
||
|
|
'bounding_boxes': len(self._bounding_boxes),
|
||
|
|
'last_message': self._last_message_time,
|
||
|
|
'reconnect_count': self._reconnect_count,
|
||
|
|
'thread_alive': alive,
|
||
|
|
}
|
||
|
|
if self._last_error:
|
||
|
|
stats['last_error'] = str(self._last_error)[:200]
|
||
|
|
return stats
|
||
|
|
|
||
|
|
# --- Short-lived sync query (runs in calling thread, works on Render) ---
|
||
|
|
|
||
|
|
def query_position_sync(self, mmsi: str, max_seconds: float = 8) -> Optional[dict]:
|
||
|
|
"""Short-lived sync WebSocket: connect, subscribe by MMSI, collect 1 position, close.
|
||
|
|
Runs in the calling thread (Flask request) — guaranteed to work on Render."""
|
||
|
|
return self._query_sync(mmsis=[str(mmsi)], bboxes=None,
|
||
|
|
max_seconds=max_seconds, target_mmsi=str(mmsi))
|
||
|
|
|
||
|
|
def query_area_sync(self, lat_min: float, lon_min: float,
|
||
|
|
lat_max: float, lon_max: float,
|
||
|
|
max_seconds: float = 4) -> List[dict]:
|
||
|
|
"""Short-lived sync WebSocket: connect, subscribe by bbox, collect positions, close."""
|
||
|
|
bbox = [[lat_min, lon_min], [lat_max, lon_max]]
|
||
|
|
positions = self._query_sync(mmsis=None, bboxes=[bbox], max_seconds=max_seconds)
|
||
|
|
return positions if isinstance(positions, list) else []
|
||
|
|
|
||
|
|
def _query_sync(self, mmsis: Optional[List[str]] = None,
|
||
|
|
bboxes: Optional[list] = None,
|
||
|
|
max_seconds: float = 8,
|
||
|
|
target_mmsi: str = None) -> any:
|
||
|
|
"""Core sync query: short-lived WebSocket connection."""
|
||
|
|
import websocket as ws_lib
|
||
|
|
|
||
|
|
sub = {
|
||
|
|
"APIKey": self.api_key,
|
||
|
|
"BoundingBoxes": bboxes or [[[-90, -180], [90, 180]]],
|
||
|
|
"FilterMessageTypes": ["PositionReport", "ShipStaticData"]
|
||
|
|
}
|
||
|
|
if mmsis:
|
||
|
|
sub["FiltersShipMMSI"] = mmsis
|
||
|
|
|
||
|
|
positions = []
|
||
|
|
try:
|
||
|
|
ws = ws_lib.create_connection(self.WS_URL, timeout=15,
|
||
|
|
header={"User-Agent": "SeaFare-Montana/1.0"})
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"AISStream sync query connect failed: {e}")
|
||
|
|
return None if target_mmsi else []
|
||
|
|
|
||
|
|
try:
|
||
|
|
ws.send(json.dumps(sub))
|
||
|
|
ws.settimeout(2.0)
|
||
|
|
deadline = time.time() + max_seconds
|
||
|
|
|
||
|
|
while time.time() < deadline:
|
||
|
|
try:
|
||
|
|
raw = ws.recv()
|
||
|
|
if not raw:
|
||
|
|
continue
|
||
|
|
data = json.loads(raw)
|
||
|
|
|
||
|
|
if 'error' in data:
|
||
|
|
logger.warning(f"AISStream sync error: {data['error']}")
|
||
|
|
break
|
||
|
|
|
||
|
|
# Store to DB (same as _handle_message)
|
||
|
|
try:
|
||
|
|
self._handle_message(raw)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
msg_type = data.get('MessageType', '')
|
||
|
|
if msg_type == 'PositionReport':
|
||
|
|
pos = data.get('Message', {}).get('PositionReport', {})
|
||
|
|
mmsi_str = str(pos.get('UserID', data.get('MetaData', {}).get('MMSI', '')))
|
||
|
|
lat = pos.get('Latitude', 0)
|
||
|
|
lon = pos.get('Longitude', 0)
|
||
|
|
if abs(lat) <= 90 and abs(lon) <= 180:
|
||
|
|
p = {
|
||
|
|
'mmsi': mmsi_str,
|
||
|
|
'latitude': lat,
|
||
|
|
'longitude': lon,
|
||
|
|
'speed': pos.get('Sog'),
|
||
|
|
'course': pos.get('Cog'),
|
||
|
|
'heading': pos.get('TrueHeading'),
|
||
|
|
'timestamp': data.get('MetaData', {}).get('time_utc'),
|
||
|
|
'source': 'aisstream',
|
||
|
|
}
|
||
|
|
positions.append(p)
|
||
|
|
self._positions_received += 1
|
||
|
|
self._last_message_time = time.time()
|
||
|
|
|
||
|
|
# If looking for specific MMSI and found it — return immediately
|
||
|
|
if target_mmsi and mmsi_str == target_mmsi:
|
||
|
|
return p
|
||
|
|
|
||
|
|
except ws_lib.WebSocketTimeoutException:
|
||
|
|
continue
|
||
|
|
except ws_lib.WebSocketConnectionClosedException:
|
||
|
|
break
|
||
|
|
finally:
|
||
|
|
try:
|
||
|
|
ws.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
if target_mmsi:
|
||
|
|
# Didn't find the specific MMSI
|
||
|
|
return None
|
||
|
|
return positions
|
||
|
|
|
||
|
|
# --- Internal (persistent thread, no asyncio) ---
|
||
|
|
|
||
|
|
def _run_loop(self):
|
||
|
|
"""Thread target: synchronous WebSocket loop with auto-reconnect.
|
||
|
|
Catches BaseException to survive gunicorn worker signals."""
|
||
|
|
try:
|
||
|
|
self._run_loop_inner()
|
||
|
|
except BaseException as e:
|
||
|
|
self._last_error = f"_run_loop KILLED: {type(e).__name__}: {e}"
|
||
|
|
logger.error(f"AISStream _run_loop killed by {type(e).__name__}: {e}")
|
||
|
|
# Re-enable for watchdog restart
|
||
|
|
self._running = True
|
||
|
|
|
||
|
|
STARTUP_DELAY = 15 # seconds — let gunicorn finish worker setup before I/O
|
||
|
|
|
||
|
|
def _run_loop_inner(self):
|
||
|
|
"""Actual reconnect loop."""
|
||
|
|
logger.info("AISStream sync _run_loop STARTED, waiting %ds for gunicorn init",
|
||
|
|
self.STARTUP_DELAY)
|
||
|
|
self._last_error = f"startup delay ({self.STARTUP_DELAY}s)..."
|
||
|
|
time.sleep(self.STARTUP_DELAY)
|
||
|
|
logger.info("AISStream startup delay done, entering main loop")
|
||
|
|
while self._running:
|
||
|
|
# Wait until we have something to subscribe to
|
||
|
|
while self._running and not self._tracked_mmsis and not self._bounding_boxes:
|
||
|
|
time.sleep(1)
|
||
|
|
if not self._running:
|
||
|
|
break
|
||
|
|
|
||
|
|
try:
|
||
|
|
self._connect_and_listen()
|
||
|
|
except BaseException as e:
|
||
|
|
self._last_error = f"connection error: {type(e).__name__}: {e}"
|
||
|
|
logger.warning(f"AISStream connection error: {type(e).__name__}: {e}")
|
||
|
|
|
||
|
|
self._connected = False
|
||
|
|
if not self._running:
|
||
|
|
break
|
||
|
|
|
||
|
|
# Reconnect with exponential backoff
|
||
|
|
delay = min(self.RECONNECT_BASE * (2 ** min(self._reconnect_count, 6)),
|
||
|
|
self.RECONNECT_CAP)
|
||
|
|
self._reconnect_count += 1
|
||
|
|
logger.info(f"AISStream reconnecting in {delay}s (attempt {self._reconnect_count})")
|
||
|
|
time.sleep(delay)
|
||
|
|
|
||
|
|
self._last_error = f"_run_loop exited, _running={self._running}"
|
||
|
|
logger.info("AISStream _run_loop exited")
|
||
|
|
|
||
|
|
def _connect_and_listen(self):
|
||
|
|
"""Connect to AISStream via sync websocket-client, listen for messages."""
|
||
|
|
import websocket as ws_lib # websocket-client package
|
||
|
|
|
||
|
|
self._last_error = "pre-connect: resolving DNS..."
|
||
|
|
logger.info("AISStream connecting (sync)...")
|
||
|
|
|
||
|
|
# Pre-resolve DNS (helps diagnose where it hangs)
|
||
|
|
import socket
|
||
|
|
try:
|
||
|
|
addr = socket.getaddrinfo("stream.aisstream.io", 443, socket.AF_INET)
|
||
|
|
self._last_error = f"DNS ok ({addr[0][4][0]}), creating connection..."
|
||
|
|
except Exception as e:
|
||
|
|
self._last_error = f"DNS failed: {e}"
|
||
|
|
raise
|
||
|
|
|
||
|
|
self._ws = ws_lib.create_connection(
|
||
|
|
self.WS_URL, timeout=20,
|
||
|
|
header={"User-Agent": "SeaFare-Montana/1.0"}
|
||
|
|
)
|
||
|
|
self._last_error = "connected, sending subscription..."
|
||
|
|
try:
|
||
|
|
# Send subscription
|
||
|
|
self._resub_flag = False
|
||
|
|
sub = self._build_subscription()
|
||
|
|
self._ws.send(json.dumps(sub))
|
||
|
|
self._connected = True
|
||
|
|
self._reconnect_count = 0
|
||
|
|
self._last_error = "connected, receiving data..."
|
||
|
|
logger.info(f"AISStream subscribed (MMSIs: {len(self._tracked_mmsis)}, "
|
||
|
|
f"BBoxes: {len(self._bounding_boxes)})")
|
||
|
|
|
||
|
|
self._ws.settimeout(5.0) # recv timeout for checking resub/running
|
||
|
|
|
||
|
|
while self._running:
|
||
|
|
# Re-subscribe if needed
|
||
|
|
if self._resub_flag:
|
||
|
|
self._resub_flag = False
|
||
|
|
sub = self._build_subscription()
|
||
|
|
self._ws.send(json.dumps(sub))
|
||
|
|
logger.info(f"AISStream re-subscribed")
|
||
|
|
|
||
|
|
try:
|
||
|
|
raw = self._ws.recv()
|
||
|
|
if raw:
|
||
|
|
try:
|
||
|
|
self._handle_message(raw)
|
||
|
|
except Exception as e:
|
||
|
|
logger.debug(f"AISStream message error: {e}")
|
||
|
|
except ws_lib.WebSocketTimeoutException:
|
||
|
|
continue # Timeout is normal — loop back to check flags
|
||
|
|
except ws_lib.WebSocketConnectionClosedException:
|
||
|
|
self._last_error = "connection closed by server"
|
||
|
|
logger.warning("AISStream connection closed by server")
|
||
|
|
break
|
||
|
|
finally:
|
||
|
|
self._connected = False
|
||
|
|
try:
|
||
|
|
self._ws.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
self._ws = None
|
||
|
|
|
||
|
|
def _build_subscription(self) -> dict:
|
||
|
|
"""Build AISStream subscription message. No global fallback — only explicit filters."""
|
||
|
|
sub = {
|
||
|
|
"APIKey": self.api_key,
|
||
|
|
"BoundingBoxes": self._bounding_boxes if self._bounding_boxes else [
|
||
|
|
[[-90, -180], [90, 180]] # Required by API, but filtered by MMSI
|
||
|
|
],
|
||
|
|
"FilterMessageTypes": ["PositionReport", "ShipStaticData"]
|
||
|
|
}
|
||
|
|
with self._lock:
|
||
|
|
if self._tracked_mmsis:
|
||
|
|
sub["FiltersShipMMSI"] = list(self._tracked_mmsis)[:self.MAX_MMSI_FILTER]
|
||
|
|
return sub
|
||
|
|
|
||
|
|
def _handle_message(self, raw: str):
|
||
|
|
"""Parse AISStream message and store position in DB."""
|
||
|
|
data = json.loads(raw)
|
||
|
|
|
||
|
|
# Check for error response
|
||
|
|
if 'error' in data:
|
||
|
|
self._last_error = f"API error: {data['error']}"
|
||
|
|
logger.warning(f"AISStream error: {data['error']}")
|
||
|
|
return
|
||
|
|
|
||
|
|
msg_type = data.get('MessageType', '')
|
||
|
|
meta = data.get('MetaData', {})
|
||
|
|
|
||
|
|
if msg_type == 'PositionReport':
|
||
|
|
pos = data.get('Message', {}).get('PositionReport', {})
|
||
|
|
mmsi = str(pos.get('UserID', meta.get('MMSI', '')))
|
||
|
|
if not mmsi or mmsi == '0':
|
||
|
|
return
|
||
|
|
|
||
|
|
lat = pos.get('Latitude', 0)
|
||
|
|
lon = pos.get('Longitude', 0)
|
||
|
|
|
||
|
|
# AIS uses 91/181 as "not available" sentinels
|
||
|
|
if abs(lat) > 90 or abs(lon) > 180:
|
||
|
|
return
|
||
|
|
|
||
|
|
nav_code = pos.get('NavigationStatus')
|
||
|
|
nav_status = NAV_STATUS_MAP.get(nav_code) if nav_code is not None else None
|
||
|
|
timestamp = meta.get('time_utc', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
|
||
|
|
|
||
|
|
try:
|
||
|
|
db.add_position(
|
||
|
|
mmsi=mmsi, lat=lat, lon=lon,
|
||
|
|
speed=pos.get('Sog'), course=pos.get('Cog'),
|
||
|
|
heading=pos.get('TrueHeading'),
|
||
|
|
status=nav_status,
|
||
|
|
destination=None, # Not in PositionReport
|
||
|
|
eta=None,
|
||
|
|
timestamp=timestamp,
|
||
|
|
source='aisstream'
|
||
|
|
)
|
||
|
|
except Exception as db_err:
|
||
|
|
logger.debug(f"AISStream DB write error for {mmsi}: {db_err}")
|
||
|
|
|
||
|
|
self._positions_received += 1
|
||
|
|
self._last_message_time = time.time()
|
||
|
|
|
||
|
|
elif msg_type == 'ShipStaticData':
|
||
|
|
static = data.get('Message', {}).get('ShipStaticData', {})
|
||
|
|
mmsi = str(meta.get('MMSI', ''))
|
||
|
|
ship_name = meta.get('ShipName', '').strip()
|
||
|
|
if mmsi and ship_name:
|
||
|
|
try:
|
||
|
|
vessel_data = {
|
||
|
|
'mmsi': mmsi,
|
||
|
|
'name': ship_name,
|
||
|
|
}
|
||
|
|
# AIS type code (e.g. 70=cargo, 80=tanker)
|
||
|
|
type_code = static.get('Type')
|
||
|
|
if type_code is not None:
|
||
|
|
vessel_data['type_code'] = int(type_code)
|
||
|
|
imo_raw = static.get('ImoNumber')
|
||
|
|
if imo_raw and str(imo_raw) != '0':
|
||
|
|
vessel_data['imo'] = str(imo_raw)
|
||
|
|
callsign = static.get('CallSign', '').strip()
|
||
|
|
if callsign:
|
||
|
|
vessel_data['callsign'] = callsign
|
||
|
|
dim = static.get('Dimension', {})
|
||
|
|
if dim:
|
||
|
|
a, b = dim.get('A', 0), dim.get('B', 0)
|
||
|
|
c_val, d_val = dim.get('C', 0), dim.get('D', 0)
|
||
|
|
length = a + b
|
||
|
|
width = c_val + d_val
|
||
|
|
if length > 0:
|
||
|
|
vessel_data['length'] = length
|
||
|
|
if width > 0:
|
||
|
|
vessel_data['width'] = width
|
||
|
|
db.upsert_vessel(vessel_data)
|
||
|
|
# Destination/ETA belong to positions table, not vessels
|
||
|
|
dest = (static.get('Destination') or '').strip()
|
||
|
|
if dest:
|
||
|
|
eta_raw = static.get('Eta')
|
||
|
|
eta_str = None
|
||
|
|
if isinstance(eta_raw, dict) and eta_raw.get('Month'):
|
||
|
|
eta_str = f"{eta_raw.get('Month',0):02d}-{eta_raw.get('Day',0):02d} {eta_raw.get('Hour',0):02d}:{eta_raw.get('Minute',0):02d}"
|
||
|
|
elif isinstance(eta_raw, str) and eta_raw.strip():
|
||
|
|
eta_str = eta_raw.strip()
|
||
|
|
db.update_position_destination(mmsi, dest, eta_str)
|
||
|
|
except Exception as e:
|
||
|
|
logger.debug(f"ShipStaticData error for {mmsi}: {e}")
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# AISHUB CLIENT (REST, on-demand)
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
class AISHubClient:
|
||
|
|
"""
|
||
|
|
REST client for AISHub.net API.
|
||
|
|
Strict rate limit: max 1 request per minute.
|
||
|
|
"""
|
||
|
|
|
||
|
|
BASE_URL = "https://data.aishub.net/ws.php"
|
||
|
|
MIN_REQUEST_INTERVAL = 61 # seconds (1 min + 1s safety)
|
||
|
|
|
||
|
|
def __init__(self, username: str):
|
||
|
|
self.username = username
|
||
|
|
self._last_request_time = 0.0
|
||
|
|
self._lock = threading.Lock()
|
||
|
|
|
||
|
|
def can_request(self) -> bool:
|
||
|
|
"""Check if rate limit allows a request right now."""
|
||
|
|
return (time.time() - self._last_request_time) >= self.MIN_REQUEST_INTERVAL
|
||
|
|
|
||
|
|
def get_vessel_position(self, mmsi: str) -> Optional[dict]:
|
||
|
|
"""Get position for a single vessel by MMSI."""
|
||
|
|
vessels = self._request({'mmsi': mmsi})
|
||
|
|
if vessels:
|
||
|
|
v = vessels[0]
|
||
|
|
self._store_positions(vessels)
|
||
|
|
return v
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_vessel_by_imo(self, imo: str) -> Optional[dict]:
|
||
|
|
"""Get position for a single vessel by IMO."""
|
||
|
|
vessels = self._request({'imo': imo})
|
||
|
|
if vessels:
|
||
|
|
self._store_positions(vessels)
|
||
|
|
return vessels[0]
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_vessels_in_area(self, lat_min: float, lat_max: float,
|
||
|
|
lon_min: float, lon_max: float) -> List[dict]:
|
||
|
|
"""Get all vessels in a geographic bounding box."""
|
||
|
|
vessels = self._request({
|
||
|
|
'latmin': lat_min, 'latmax': lat_max,
|
||
|
|
'lonmin': lon_min, 'lonmax': lon_max
|
||
|
|
})
|
||
|
|
if vessels:
|
||
|
|
self._store_positions(vessels)
|
||
|
|
return vessels or []
|
||
|
|
|
||
|
|
def _request(self, extra_params: dict) -> Optional[List[dict]]:
|
||
|
|
"""Execute HTTP GET with rate limiting."""
|
||
|
|
with self._lock:
|
||
|
|
elapsed = time.time() - self._last_request_time
|
||
|
|
if elapsed < self.MIN_REQUEST_INTERVAL:
|
||
|
|
wait = self.MIN_REQUEST_INTERVAL - elapsed
|
||
|
|
logger.info(f"AISHub rate limit: waiting {wait:.0f}s")
|
||
|
|
time.sleep(wait)
|
||
|
|
self._last_request_time = time.time()
|
||
|
|
|
||
|
|
params = {
|
||
|
|
'username': self.username,
|
||
|
|
'format': '1', # human-readable
|
||
|
|
'output': 'json',
|
||
|
|
'compress': '0',
|
||
|
|
}
|
||
|
|
params.update(extra_params)
|
||
|
|
|
||
|
|
try:
|
||
|
|
resp = requests.get(self.BASE_URL, params=params, timeout=15)
|
||
|
|
if resp.status_code != 200:
|
||
|
|
logger.warning(f"AISHub HTTP {resp.status_code}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
data = resp.json()
|
||
|
|
# AISHub returns list of objects (or error string)
|
||
|
|
if isinstance(data, list):
|
||
|
|
return [self._parse_vessel(item) for item in data if isinstance(item, dict)]
|
||
|
|
if isinstance(data, dict) and data.get('ERROR'):
|
||
|
|
logger.warning(f"AISHub error: {data['ERROR']}")
|
||
|
|
return None
|
||
|
|
return None
|
||
|
|
except requests.RequestException as e:
|
||
|
|
logger.warning(f"AISHub request failed: {e}")
|
||
|
|
return None
|
||
|
|
except (ValueError, KeyError) as e:
|
||
|
|
logger.warning(f"AISHub parse error: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _parse_vessel(self, item: dict) -> dict:
|
||
|
|
"""Normalize AISHub response to internal format."""
|
||
|
|
nav_code = item.get('NAVSTAT')
|
||
|
|
nav_status = NAV_STATUS_MAP.get(int(nav_code)) if nav_code is not None else None
|
||
|
|
return {
|
||
|
|
'mmsi': str(item.get('MMSI', '')),
|
||
|
|
'name': (item.get('NAME') or '').strip(),
|
||
|
|
'imo': str(item.get('IMO', '')) if item.get('IMO') else None,
|
||
|
|
'latitude': item.get('LATITUDE'),
|
||
|
|
'longitude': item.get('LONGITUDE'),
|
||
|
|
'speed': item.get('SOG'),
|
||
|
|
'course': item.get('COG'),
|
||
|
|
'heading': item.get('HEADING'),
|
||
|
|
'status': nav_status,
|
||
|
|
'type_code': item.get('TYPE'),
|
||
|
|
'destination': (item.get('DEST') or '').strip(),
|
||
|
|
'eta': item.get('ETA', ''),
|
||
|
|
'draught': item.get('DRAUGHT'),
|
||
|
|
'callsign': (item.get('CALLSIGN') or '').strip(),
|
||
|
|
'timestamp': item.get('TIME', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')),
|
||
|
|
'source': 'aishub',
|
||
|
|
}
|
||
|
|
|
||
|
|
def _store_positions(self, vessels: List[dict]):
|
||
|
|
"""Batch-store positions in DB."""
|
||
|
|
for v in vessels:
|
||
|
|
mmsi = v.get('mmsi')
|
||
|
|
lat = v.get('latitude')
|
||
|
|
lon = v.get('longitude')
|
||
|
|
if mmsi and lat is not None and lon is not None:
|
||
|
|
try:
|
||
|
|
db.add_position(
|
||
|
|
mmsi=mmsi, lat=lat, lon=lon,
|
||
|
|
speed=v.get('speed'), course=v.get('course'),
|
||
|
|
heading=v.get('heading'), status=v.get('status'),
|
||
|
|
destination=v.get('destination'), eta=v.get('eta'),
|
||
|
|
timestamp=v.get('timestamp'),
|
||
|
|
source='aishub'
|
||
|
|
)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# DIGITRAFFIC CLIENT (REST, Finnish/Baltic AIS, free, no key)
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
class DigitrafficClient:
|
||
|
|
"""Finnish Digitraffic — free real-time AIS, no API key needed.
|
||
|
|
Coverage: Finnish/Baltic waters.
|
||
|
|
API: https://meri.digitraffic.fi/api/ais/v1/
|
||
|
|
License: CC BY 4.0
|
||
|
|
"""
|
||
|
|
|
||
|
|
BASE_URL = "https://meri.digitraffic.fi/api/ais/v1"
|
||
|
|
MIN_REQUEST_INTERVAL = 10 # seconds between requests (be polite)
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self._last_request_time = 0.0
|
||
|
|
self._lock = threading.Lock()
|
||
|
|
|
||
|
|
def can_request(self) -> bool:
|
||
|
|
"""Check if rate limit allows a request right now."""
|
||
|
|
return (time.time() - self._last_request_time) >= self.MIN_REQUEST_INTERVAL
|
||
|
|
|
||
|
|
def get_vessel_position(self, mmsi: str) -> Optional[dict]:
|
||
|
|
"""Get single vessel position by MMSI."""
|
||
|
|
data = self._request(f"/locations/{mmsi}")
|
||
|
|
if not data:
|
||
|
|
return None
|
||
|
|
feature = data
|
||
|
|
if data.get('type') == 'FeatureCollection':
|
||
|
|
features = data.get('features', [])
|
||
|
|
feature = features[0] if features else None
|
||
|
|
if not feature:
|
||
|
|
return None
|
||
|
|
vessel = self._parse_feature(feature)
|
||
|
|
if vessel:
|
||
|
|
self._store_positions([vessel])
|
||
|
|
return vessel
|
||
|
|
|
||
|
|
def get_vessels_in_area(self, lat_min: float, lat_max: float,
|
||
|
|
lon_min: float, lon_max: float) -> List[dict]:
|
||
|
|
"""Get all vessels, filter by bounding box client-side."""
|
||
|
|
data = self._request("/locations")
|
||
|
|
if not data or data.get('type') != 'FeatureCollection':
|
||
|
|
return []
|
||
|
|
vessels = []
|
||
|
|
for feature in data.get('features', []):
|
||
|
|
coords = feature.get('geometry', {}).get('coordinates', [])
|
||
|
|
if len(coords) >= 2:
|
||
|
|
lon, lat = coords[0], coords[1]
|
||
|
|
if lat_min <= lat <= lat_max and lon_min <= lon <= lon_max:
|
||
|
|
v = self._parse_feature(feature)
|
||
|
|
if v:
|
||
|
|
vessels.append(v)
|
||
|
|
if vessels:
|
||
|
|
self._store_positions(vessels)
|
||
|
|
return vessels
|
||
|
|
|
||
|
|
def _request(self, path: str) -> Optional[dict]:
|
||
|
|
"""Execute HTTP GET with rate limiting."""
|
||
|
|
with self._lock:
|
||
|
|
elapsed = time.time() - self._last_request_time
|
||
|
|
if elapsed < self.MIN_REQUEST_INTERVAL:
|
||
|
|
time.sleep(self.MIN_REQUEST_INTERVAL - elapsed)
|
||
|
|
self._last_request_time = time.time()
|
||
|
|
try:
|
||
|
|
resp = requests.get(f"{self.BASE_URL}{path}", timeout=15,
|
||
|
|
headers={'Accept-Encoding': 'gzip'})
|
||
|
|
if resp.status_code != 200:
|
||
|
|
logger.warning(f"Digitraffic HTTP {resp.status_code}")
|
||
|
|
return None
|
||
|
|
return resp.json()
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Digitraffic request failed: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _parse_feature(self, feature: dict) -> Optional[dict]:
|
||
|
|
"""Parse GeoJSON feature to internal vessel format."""
|
||
|
|
props = feature.get('properties', {})
|
||
|
|
coords = feature.get('geometry', {}).get('coordinates', [])
|
||
|
|
if len(coords) < 2:
|
||
|
|
return None
|
||
|
|
lon, lat = coords[0], coords[1]
|
||
|
|
if lat == 0 and lon == 0:
|
||
|
|
return None
|
||
|
|
nav_code = props.get('navStat')
|
||
|
|
nav_status = NAV_STATUS_MAP.get(nav_code) if nav_code is not None else None
|
||
|
|
mmsi = str(props.get('mmsi', ''))
|
||
|
|
if not mmsi:
|
||
|
|
return None
|
||
|
|
return {
|
||
|
|
'mmsi': mmsi,
|
||
|
|
'name': '',
|
||
|
|
'imo': None,
|
||
|
|
'latitude': lat,
|
||
|
|
'longitude': lon,
|
||
|
|
'speed': props.get('sog'),
|
||
|
|
'course': props.get('cog'),
|
||
|
|
'heading': props.get('heading'),
|
||
|
|
'status': nav_status,
|
||
|
|
'type_code': props.get('shipType'),
|
||
|
|
'destination': '',
|
||
|
|
'eta': '',
|
||
|
|
'draught': props.get('draught'),
|
||
|
|
'callsign': '',
|
||
|
|
'timestamp': props.get('timestampExternal', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')),
|
||
|
|
'source': 'digitraffic',
|
||
|
|
}
|
||
|
|
|
||
|
|
def _store_positions(self, vessels: List[dict]):
|
||
|
|
"""Store positions in DB."""
|
||
|
|
for v in vessels:
|
||
|
|
mmsi = v.get('mmsi')
|
||
|
|
lat = v.get('latitude')
|
||
|
|
lon = v.get('longitude')
|
||
|
|
if mmsi and lat is not None and lon is not None:
|
||
|
|
try:
|
||
|
|
db.add_position(
|
||
|
|
mmsi=mmsi, lat=lat, lon=lon,
|
||
|
|
speed=v.get('speed'), course=v.get('course'),
|
||
|
|
heading=v.get('heading'), status=v.get('status'),
|
||
|
|
destination=v.get('destination'), eta=v.get('eta'),
|
||
|
|
timestamp=v.get('timestamp'),
|
||
|
|
source='digitraffic'
|
||
|
|
)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# AIS PROVIDER (unified facade)
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
class AISProvider:
|
||
|
|
"""
|
||
|
|
Unified AIS data provider with priority chain:
|
||
|
|
1. AISStream (real-time DB cache from WebSocket)
|
||
|
|
2. AISHub (REST query, 1 req/min)
|
||
|
|
3. Digitraffic (Finnish/Baltic AIS, free, no key)
|
||
|
|
4. MarineTraffic (scraping fallback)
|
||
|
|
"""
|
||
|
|
|
||
|
|
POSITION_FRESH_MINUTES = 30 # single vessel: accept positions up to 30 min
|
||
|
|
AREA_FRESH_MINUTES = 30 # area queries: cache TTL
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.aisstream: Optional[AISStreamClient] = None
|
||
|
|
self.aishub: Optional[AISHubClient] = None
|
||
|
|
self.digitraffic: Optional[DigitrafficClient] = None
|
||
|
|
|
||
|
|
# Major global shipping areas for pre-subscription on startup
|
||
|
|
_STARTUP_AREAS = [
|
||
|
|
[[25, -10], [45, 40]], # Mediterranean + Black Sea
|
||
|
|
[[48, -10], [62, 15]], # North Sea + English Channel + Baltic
|
||
|
|
[[36, 46], [47, 55]], # Caspian Sea
|
||
|
|
]
|
||
|
|
|
||
|
|
def initialize(self):
|
||
|
|
"""Start available AIS clients based on env vars."""
|
||
|
|
if AISSTREAM_API_KEY:
|
||
|
|
self.aisstream = AISStreamClient(AISSTREAM_API_KEY)
|
||
|
|
self.aisstream.start()
|
||
|
|
# Pre-subscribe to major shipping lanes so data flows before first user request
|
||
|
|
for bbox in self._STARTUP_AREAS: # max 3 bounding boxes
|
||
|
|
self.aisstream.subscribe_area(bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1])
|
||
|
|
logger.info("AISStream client started with pre-subscribed shipping lanes")
|
||
|
|
# Start watchdog thread to auto-restart dead AISStream thread
|
||
|
|
self._watchdog_thread = threading.Thread(
|
||
|
|
target=self._watchdog_loop, daemon=True, name="ais-watchdog")
|
||
|
|
self._watchdog_thread.start()
|
||
|
|
else:
|
||
|
|
logger.info("AISStream: no API key — skipped")
|
||
|
|
|
||
|
|
if AISHUB_USERNAME:
|
||
|
|
self.aishub = AISHubClient(AISHUB_USERNAME)
|
||
|
|
logger.info("AISHub client initialized")
|
||
|
|
else:
|
||
|
|
logger.info("AISHub: no username — skipped")
|
||
|
|
|
||
|
|
if DIGITRAFFIC_ENABLED == '1':
|
||
|
|
self.digitraffic = DigitrafficClient()
|
||
|
|
logger.info("Digitraffic client initialized (Finnish AIS)")
|
||
|
|
else:
|
||
|
|
logger.info("Digitraffic: disabled via env")
|
||
|
|
|
||
|
|
_watchdog_running = True
|
||
|
|
|
||
|
|
def _watchdog_loop(self):
|
||
|
|
"""Background watchdog: checks AISStream thread every 30s, restarts if dead.
|
||
|
|
Uses BaseException to survive gunicorn worker signals."""
|
||
|
|
logger.info("AIS Watchdog STARTED")
|
||
|
|
while self._watchdog_running and self.aisstream:
|
||
|
|
try:
|
||
|
|
time.sleep(30)
|
||
|
|
if not self.aisstream._running:
|
||
|
|
self.aisstream._running = True
|
||
|
|
self.aisstream._started = True
|
||
|
|
logger.warning("AIS Watchdog: re-enabled _running flag")
|
||
|
|
thread = self.aisstream._thread
|
||
|
|
if thread is None or not thread.is_alive():
|
||
|
|
logger.warning(f"AIS Watchdog: thread dead (last_error={self.aisstream._last_error})")
|
||
|
|
self.aisstream._ensure_thread()
|
||
|
|
if not self.aisstream._thread or not self.aisstream._thread.is_alive():
|
||
|
|
for bbox in self._STARTUP_AREAS:
|
||
|
|
self.aisstream.subscribe_area(bbox[0][0], bbox[0][1], bbox[1][0], bbox[1][1])
|
||
|
|
logger.info("AIS Watchdog: re-subscribed to shipping lanes")
|
||
|
|
except BaseException as e:
|
||
|
|
logger.error(f"AIS Watchdog error: {type(e).__name__}: {e}")
|
||
|
|
try:
|
||
|
|
time.sleep(15)
|
||
|
|
except BaseException:
|
||
|
|
pass
|
||
|
|
logger.info("AIS Watchdog stopped")
|
||
|
|
|
||
|
|
def shutdown(self):
|
||
|
|
"""Graceful shutdown."""
|
||
|
|
self._watchdog_running = False
|
||
|
|
if self.aisstream:
|
||
|
|
self.aisstream.stop()
|
||
|
|
|
||
|
|
def get_vessel_position(self, mmsi: str = None, imo: str = None,
|
||
|
|
name: str = None) -> Optional[dict]:
|
||
|
|
"""
|
||
|
|
Get current position for a single vessel.
|
||
|
|
Priority: DB (recent) → AISHub REST → stale DB.
|
||
|
|
"""
|
||
|
|
resolved_mmsi = mmsi
|
||
|
|
|
||
|
|
# Resolve MMSI from IMO/name via DB
|
||
|
|
if not resolved_mmsi and (imo or name):
|
||
|
|
vessel = None
|
||
|
|
if imo:
|
||
|
|
vessel = db.get_vessel(imo=imo)
|
||
|
|
if not vessel and name:
|
||
|
|
vessel = db.get_vessel(name=name)
|
||
|
|
if vessel:
|
||
|
|
resolved_mmsi = vessel.get('mmsi')
|
||
|
|
|
||
|
|
if not resolved_mmsi:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# 1. Check DB for recent position (from AISStream or previous queries)
|
||
|
|
pos = db.get_last_position(resolved_mmsi)
|
||
|
|
if pos and self._is_fresh(pos, self.POSITION_FRESH_MINUTES):
|
||
|
|
pos['source'] = pos.get('source', 'cache')
|
||
|
|
return pos
|
||
|
|
|
||
|
|
# 2. AISStream: persistent thread OR sync fallback
|
||
|
|
if self.aisstream:
|
||
|
|
if self.aisstream.thread_alive():
|
||
|
|
# Background thread is alive — subscribe and poll DB
|
||
|
|
self.aisstream.subscribe_mmsi(resolved_mmsi)
|
||
|
|
for _ in range(5):
|
||
|
|
time.sleep(1.5)
|
||
|
|
pos = db.get_last_position(resolved_mmsi)
|
||
|
|
if pos and self._is_fresh(pos, self.POSITION_FRESH_MINUTES):
|
||
|
|
pos['source'] = pos.get('source', 'aisstream')
|
||
|
|
return pos
|
||
|
|
else:
|
||
|
|
# Thread dead (Render) — short-lived sync query in request thread
|
||
|
|
try:
|
||
|
|
result = self.aisstream.query_position_sync(resolved_mmsi, max_seconds=8)
|
||
|
|
if result:
|
||
|
|
return result
|
||
|
|
# Also check DB (query_position_sync stores to DB)
|
||
|
|
pos = db.get_last_position(resolved_mmsi)
|
||
|
|
if pos and self._is_fresh(pos, self.POSITION_FRESH_MINUTES):
|
||
|
|
pos['source'] = pos.get('source', 'aisstream')
|
||
|
|
return pos
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"AISStream sync query failed: {e}")
|
||
|
|
|
||
|
|
# 3. AISHub REST query
|
||
|
|
if self.aishub and self.aishub.can_request():
|
||
|
|
try:
|
||
|
|
result = self.aishub.get_vessel_position(resolved_mmsi)
|
||
|
|
if result:
|
||
|
|
return result
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"AISHub query failed: {e}")
|
||
|
|
|
||
|
|
# If we have IMO and no MMSI result, try IMO on AISHub
|
||
|
|
if not pos and imo and self.aishub and self.aishub.can_request():
|
||
|
|
try:
|
||
|
|
result = self.aishub.get_vessel_by_imo(imo)
|
||
|
|
if result:
|
||
|
|
return result
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"AISHub IMO query failed: {e}")
|
||
|
|
|
||
|
|
# 4. Digitraffic REST query (Finnish waters, free, no key)
|
||
|
|
if self.digitraffic and self.digitraffic.can_request():
|
||
|
|
try:
|
||
|
|
result = self.digitraffic.get_vessel_position(resolved_mmsi)
|
||
|
|
if result:
|
||
|
|
return result
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Digitraffic query failed: {e}")
|
||
|
|
|
||
|
|
# 5. MarineTraffic scraping fallback
|
||
|
|
try:
|
||
|
|
import marinetraffic_parser as mt
|
||
|
|
parser = mt.get_parser()
|
||
|
|
mt_pos = parser.get_vessel_page(resolved_mmsi)
|
||
|
|
# get_vessel_page returns 'latitude'/'longitude' keys (not 'lat'/'lon')
|
||
|
|
mt_lat = mt_pos.get('latitude') or mt_pos.get('lat') if mt_pos else None
|
||
|
|
mt_lon = mt_pos.get('longitude') or mt_pos.get('lon') if mt_pos else None
|
||
|
|
if mt_lat and mt_lon:
|
||
|
|
mt_lat = float(mt_lat)
|
||
|
|
mt_lon = float(mt_lon)
|
||
|
|
db.add_position(
|
||
|
|
mmsi=resolved_mmsi,
|
||
|
|
lat=mt_lat, lon=mt_lon,
|
||
|
|
speed=mt_pos.get('speed'), course=mt_pos.get('course'),
|
||
|
|
timestamp=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
|
||
|
|
source='marinetraffic'
|
||
|
|
)
|
||
|
|
return {
|
||
|
|
'mmsi': resolved_mmsi,
|
||
|
|
'latitude': mt_lat,
|
||
|
|
'longitude': mt_lon,
|
||
|
|
'speed': mt_pos.get('speed'),
|
||
|
|
'course': mt_pos.get('course'),
|
||
|
|
'heading': mt_pos.get('heading'),
|
||
|
|
'status': mt_pos.get('nav_status') or mt_pos.get('status'),
|
||
|
|
'destination': mt_pos.get('destination'),
|
||
|
|
'timestamp': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
|
||
|
|
'source': 'marinetraffic',
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
logger.debug(f"MarineTraffic fallback failed: {e}")
|
||
|
|
|
||
|
|
# 6. Return stale DB position if exists (better than nothing)
|
||
|
|
if pos:
|
||
|
|
pos['source'] = 'cache_stale'
|
||
|
|
pos['stale'] = True
|
||
|
|
return pos
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_vessels_in_area(self, lat: float, lon: float, radius_nm: float = 15,
|
||
|
|
vessel_type: str = None, min_dwt: float = None,
|
||
|
|
max_dwt: float = None) -> List[dict]:
|
||
|
|
"""
|
||
|
|
Get vessels in geographic area.
|
||
|
|
Uses ALL sources and merges results for maximum coverage.
|
||
|
|
"""
|
||
|
|
lat_delta = radius_nm / 60.0
|
||
|
|
cos_lat = max(math.cos(math.radians(lat)), 0.01)
|
||
|
|
lon_delta = radius_nm / (60.0 * cos_lat)
|
||
|
|
lat_min = lat - lat_delta
|
||
|
|
lat_max = lat + lat_delta
|
||
|
|
lon_min = lon - lon_delta
|
||
|
|
lon_max = lon + lon_delta
|
||
|
|
|
||
|
|
vessels = []
|
||
|
|
existing_mmsis = set()
|
||
|
|
|
||
|
|
def _merge(new_list):
|
||
|
|
for v in (new_list or []):
|
||
|
|
mmsi = v.get('mmsi')
|
||
|
|
if mmsi and mmsi not in existing_mmsis:
|
||
|
|
existing_mmsis.add(mmsi)
|
||
|
|
vessels.append(v)
|
||
|
|
|
||
|
|
# 1. DB cache (fast, already have data from previous scrapes)
|
||
|
|
_merge(db.get_positions_in_area(lat_min, lat_max, lon_min, lon_max,
|
||
|
|
max_age_minutes=self.AREA_FRESH_MINUTES))
|
||
|
|
|
||
|
|
# 2. MarineTraffic data comes from mt_bulk_staging (background z:4 scan, ~31K vessels)
|
||
|
|
# Free MT tile API only works at z:2-4 — tiles too large for port-level live queries.
|
||
|
|
# mt_bulk_staging is queried separately in _tool_search_vessels_near_port().
|
||
|
|
|
||
|
|
# 3. AISStream: persistent thread OR sync fallback
|
||
|
|
if self.aisstream:
|
||
|
|
if self.aisstream.thread_alive():
|
||
|
|
self.aisstream.subscribe_area(lat_min, lon_min, lat_max, lon_max)
|
||
|
|
elif len(vessels) < 20:
|
||
|
|
# Thread dead — use sync query to supplement results
|
||
|
|
try:
|
||
|
|
ais_vessels = self.aisstream.query_area_sync(
|
||
|
|
lat_min, lon_min, lat_max, lon_max, max_seconds=6)
|
||
|
|
_merge(ais_vessels)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"AISStream sync area query failed: {e}")
|
||
|
|
|
||
|
|
# 4. Digitraffic (free, no key — Finnish/Baltic)
|
||
|
|
if self.digitraffic and self.digitraffic.can_request():
|
||
|
|
try:
|
||
|
|
_merge(self.digitraffic.get_vessels_in_area(
|
||
|
|
lat_min, lat_max, lon_min, lon_max))
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Digitraffic area query failed: {e}")
|
||
|
|
|
||
|
|
# 5. AISHub (if configured)
|
||
|
|
if len(vessels) < 10 and self.aishub and self.aishub.can_request():
|
||
|
|
try:
|
||
|
|
_merge(self.aishub.get_vessels_in_area(
|
||
|
|
lat_min, lat_max, lon_min, lon_max))
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"AISHub area query failed: {e}")
|
||
|
|
|
||
|
|
# Filter results
|
||
|
|
return self._filter_vessels(vessels, vessel_type, min_dwt, max_dwt)
|
||
|
|
|
||
|
|
def get_status(self) -> dict:
|
||
|
|
"""Get provider status for /health endpoint."""
|
||
|
|
result = {}
|
||
|
|
if self.aisstream:
|
||
|
|
stats = self.aisstream.get_stats()
|
||
|
|
stats['watchdog_alive'] = hasattr(self, '_watchdog_thread') and self._watchdog_thread.is_alive()
|
||
|
|
result['aisstream'] = stats
|
||
|
|
else:
|
||
|
|
result['aisstream'] = 'not configured'
|
||
|
|
if self.aishub:
|
||
|
|
result['aishub'] = {'configured': True, 'can_request': self.aishub.can_request()}
|
||
|
|
else:
|
||
|
|
result['aishub'] = 'not configured'
|
||
|
|
if self.digitraffic:
|
||
|
|
result['digitraffic'] = {'configured': True, 'can_request': self.digitraffic.can_request()}
|
||
|
|
else:
|
||
|
|
result['digitraffic'] = 'not configured'
|
||
|
|
return result
|
||
|
|
|
||
|
|
# --- Helpers ---
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _is_fresh(position: dict, max_age_minutes: int) -> bool:
|
||
|
|
"""Check if a position is fresh enough. Handles str (SQLite) and datetime (PostgreSQL)."""
|
||
|
|
ts = position.get('timestamp')
|
||
|
|
if not ts:
|
||
|
|
return False
|
||
|
|
try:
|
||
|
|
if isinstance(ts, datetime):
|
||
|
|
ts_dt = ts.replace(tzinfo=None) if ts.tzinfo else ts
|
||
|
|
elif isinstance(ts, str):
|
||
|
|
ts_clean = ts.replace('Z', '').replace('+00:00', '')
|
||
|
|
ts_dt = datetime.fromisoformat(ts_clean)
|
||
|
|
else:
|
||
|
|
return False
|
||
|
|
age = datetime.utcnow() - ts_dt
|
||
|
|
return age < timedelta(minutes=max_age_minutes)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
return False
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _filter_vessels(vessels: List[dict], vessel_type: str = None,
|
||
|
|
min_dwt: float = None, max_dwt: float = None) -> List[dict]:
|
||
|
|
"""Filter vessels by type and DWT."""
|
||
|
|
result = vessels
|
||
|
|
if config.FREIGHT_VESSELS_ONLY:
|
||
|
|
result = [v for v in result
|
||
|
|
if v.get('type_code') is None
|
||
|
|
or (v.get('type_code') or 0) >= config.FREIGHT_MIN_TYPE_CODE]
|
||
|
|
if vessel_type:
|
||
|
|
result = [v for v in result
|
||
|
|
if v.get('type_category') == vessel_type
|
||
|
|
or vessel_type.lower() in (v.get('type', '') or '').lower()]
|
||
|
|
if min_dwt is not None:
|
||
|
|
result = [v for v in result
|
||
|
|
if v.get('dwt') is not None and float(v['dwt']) >= min_dwt]
|
||
|
|
if max_dwt is not None:
|
||
|
|
result = [v for v in result
|
||
|
|
if v.get('dwt') is not None and float(v['dwt']) <= max_dwt]
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# SINGLETON & STARTUP
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
_provider: Optional[AISProvider] = None
|
||
|
|
_provider_lock = threading.Lock()
|
||
|
|
|
||
|
|
|
||
|
|
def get_provider() -> AISProvider:
|
||
|
|
"""Get or create the singleton AIS provider."""
|
||
|
|
global _provider
|
||
|
|
if _provider is None:
|
||
|
|
with _provider_lock:
|
||
|
|
if _provider is None:
|
||
|
|
_provider = AISProvider()
|
||
|
|
_provider.initialize()
|
||
|
|
return _provider
|
||
|
|
|
||
|
|
|
||
|
|
def start_ais_provider():
|
||
|
|
"""Initialize and start the AIS provider. Called from seafare_api.py."""
|
||
|
|
provider = get_provider()
|
||
|
|
ais_on = 'ON' if provider.aisstream else 'OFF'
|
||
|
|
hub_on = 'ON' if provider.aishub else 'OFF'
|
||
|
|
dt_on = 'ON' if provider.digitraffic else 'OFF'
|
||
|
|
print(f" AIS Provider: AISStream={ais_on}, AISHub={hub_on}, Digitraffic={dt_on}")
|
||
|
|
return provider
|
||
|
|
|
||
|
|
|
||
|
|
# =============================================================================
|
||
|
|
# STANDALONE TEST
|
||
|
|
# =============================================================================
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
logging.basicConfig(level=logging.DEBUG)
|
||
|
|
print("AIS Provider — standalone test")
|
||
|
|
print(f" AISSTREAM_API_KEY: {'set' if AISSTREAM_API_KEY else 'not set'}")
|
||
|
|
print(f" AISHUB_USERNAME: {'set' if AISHUB_USERNAME else 'not set'}")
|
||
|
|
print(f" DIGITRAFFIC_ENABLED: {DIGITRAFFIC_ENABLED}")
|
||
|
|
|
||
|
|
provider = AISProvider()
|
||
|
|
provider.initialize()
|
||
|
|
|
||
|
|
print("\nProvider status:", json.dumps(provider.get_status(), indent=2, default=str))
|
||
|
|
|
||
|
|
# Test: get position for a well-known MMSI (EVER GIVEN = 353136000)
|
||
|
|
print("\nTesting get_vessel_position(mmsi='353136000')...")
|
||
|
|
pos = provider.get_vessel_position(mmsi='353136000')
|
||
|
|
if pos:
|
||
|
|
print(f" Position: lat={pos.get('latitude')}, lon={pos.get('longitude')}, "
|
||
|
|
f"speed={pos.get('speed')}, source={pos.get('source')}")
|
||
|
|
else:
|
||
|
|
print(" No position data available")
|
||
|
|
|
||
|
|
if provider.aisstream:
|
||
|
|
print("\nWaiting 15s for AISStream data...")
|
||
|
|
time.sleep(15)
|
||
|
|
stats = provider.aisstream.get_stats()
|
||
|
|
print(f" Positions received: {stats['positions_received']}")
|
||
|
|
|
||
|
|
provider.shutdown()
|
||
|
|
print("\nDone.")
|