#!/usr/bin/env python3 """ Equasis Parser — Free Maritime Database Ship owners, operators, managers, inspections Ɉ MONTANA PROTOCOL — ML-DSA-65 (FIPS 204) """ import os import re import json import time import requests from typing import Optional, Dict, List from bs4 import BeautifulSoup # Credentials from environment EQUASIS_USER = os.environ.get("EQUASIS_USER") EQUASIS_PASS = os.environ.get("EQUASIS_PASS") EQUASIS_BASE = "https://www.equasis.org/EquasisWeb" class EquasisParser: """Parser for Equasis maritime database""" def __init__(self, username: str = None, password: str = None): self.username = username or EQUASIS_USER self.password = password or EQUASIS_PASS self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', }) self.logged_in = False def login(self) -> bool: """Login to Equasis""" if not self.username or not self.password: print("Equasis credentials not configured") return False try: # Get login page first (for cookies) self.session.get(f"{EQUASIS_BASE}/public/HomePage") # Login — field is j_email resp = self.session.post( f"{EQUASIS_BASE}/authen/HomePage?fs=HomePage", data={ 'j_email': self.username, 'j_password': self.password, 'submit': 'Login' }, allow_redirects=True ) # After successful login page contains search form and "Welcome" if 'Search' in resp.text or 'Welcome' in resp.text: self.logged_in = True print("Equasis login successful") return True else: print("Equasis login failed") return False except Exception as e: print(f"Login error: {e}") return False def _check_rate_limit(self) -> bool: """Check if we can make another request today (limit ~400 to stay safe under 500).""" try: import maritime_db as db remaining = db.get_equasis_remaining() if remaining <= 0: print(f"Equasis daily limit reached (400/day). Try again tomorrow.") return False return True except Exception: return True # If DB unavailable, allow request def _increment_counter(self): """Increment daily request counter.""" try: import maritime_db as db db.increment_equasis_counter() except Exception: pass def _get_cache(self, cache_type: str, cache_key: str): """Check cache for Equasis data.""" try: import maritime_db as db return db.get_equasis_cache(cache_key, cache_type) except Exception: return None def _set_cache(self, cache_type: str, cache_key: str, data): """Store Equasis data in cache.""" try: import maritime_db as db db.set_equasis_cache(cache_key, cache_type, data) except Exception: pass def search_vessel(self, query: str) -> List[Dict]: """ Search vessel by name, IMO, or MMSI Returns list of matching vessels """ # Check cache first cached = self._get_cache('search', query.lower().strip()) if cached is not None: return cached if not self._check_rate_limit(): return [] if not self.logged_in: if not self.login(): return [] try: search_data = { 'P_PAGE': '1', 'P_PAGE_COMP': '1', 'P_PAGE_SHIP': '1', 'P_ENTREE_HOME': query, 'P_ENTREE_HOME_HIDDEN': query, 'checkbox-ship': 'Ship', 'advancedSearch': '', } self._increment_counter() time.sleep(1) # Be polite — 1 req/sec resp = self.session.post( f"{EQUASIS_BASE}/restricted/Search?fs=HomePage", data=search_data ) if resp.status_code != 200: print(f"Search failed: HTTP {resp.status_code}") return [] soup = BeautifulSoup(resp.text, 'html.parser') results = [] # Results table: header row has 6 , data rows have for IMO + 5 table = soup.find('table', class_='table-striped') if not table: return results for row in table.find_all('tr')[1:]: # Skip header th = row.find('th') cells = row.find_all('td') # Data rows have 1 (IMO) + 5 ; skip mobile rows with < 5 if not th or len(cells) < 5: continue vessel = { 'imo': th.get_text(strip=True), 'name': cells[0].get_text(strip=True), 'gross_tonnage': cells[1].get_text(strip=True), 'type': cells[2].get_text(strip=True), 'year_built': cells[3].get_text(strip=True), 'flag': re.sub(r'\s+', ' ', cells[4].get_text(strip=True)).strip(), } results.append(vessel) # Cache results if results: self._set_cache('search', query.lower().strip(), results) return results except Exception as e: print(f"Search error: {e}") return [] def get_vessel_details(self, imo: str) -> Dict: """ Get detailed vessel information including owners/operators """ # Check cache first cached = self._get_cache('details', imo) if cached is not None: return cached if not self._check_rate_limit(): return {} if not self.logged_in: if not self.login(): return {} try: self._increment_counter() time.sleep(1) # Be polite resp = self.session.get( f"{EQUASIS_BASE}/restricted/ShipInfo", params={'fs': 'Search', 'P_IMO': imo} ) if resp.status_code != 200: print(f"ShipInfo failed: HTTP {resp.status_code}") return {} soup = BeautifulSoup(resp.text, 'html.parser') vessel = {'imo': imo} # Ship name + IMO from h4 h4 = soup.find('h4', class_='color-gris-bleu-copyright') if h4: text = h4.get_text(strip=True) # Format: "EVER GIVEN- IMO n°9811000" match = re.match(r'^(.+?)\s*-\s*IMO', text) if match: vessel['name'] = match.group(1).strip() # Parse div.row fields for vessel data # Each row has: label div + value div for row_div in soup.find_all('div', class_='row'): text = row_div.get_text(strip=True) # Match "LabelValue" patterns patterns = [ (r'^Flag\((.+?)\)$', 'flag'), (r'^Flag(.+)$', 'flag'), (r'^Call Sign(.+)$', 'callsign'), (r'^MMSI(\d+)$', 'mmsi'), (r'^Gross tonnage(\d+)', 'gross_tonnage'), (r'^DWT(\d+)', 'deadweight'), (r'^Type of ship(.+?)(?:\(|$)', 'type'), (r'^Year of build(\d+)', 'year_built'), (r'^Status(.+?)(?:\(|$)', 'status'), ] for pattern, key in patterns: m = re.match(pattern, text) if m and key not in vessel: vessel[key] = m.group(1).strip() # Parse companies table vessel['companies'] = self._parse_companies(soup) # Cache result if vessel.get('name'): self._set_cache('details', imo, vessel) return vessel except Exception as e: print(f"Details error: {e}") return {} def _parse_companies(self, soup: BeautifulSoup) -> List[Dict]: """Parse company information from vessel page""" companies = [] # Find the first table with company data # Header: IMO number | Role | Name of company | Address | Date of effect | Details for table in soup.find_all('table', class_='tableLS'): header_row = table.find('tr') if not header_row: continue headers = [th.get_text(strip=True).lower() for th in header_row.find_all(['th', 'td'])] if not any('role' in h for h in headers): continue for row in table.find_all('tr')[1:]: cells = row.find_all('td') if len(cells) < 3: continue company = {} for i, cell in enumerate(cells): if i >= len(headers): break val = cell.get_text(strip=True) h = headers[i] if 'imo' in h: company['imo_number'] = val elif 'role' in h: role = val.lower() if 'registered owner' in role: company['role'] = 'owner' elif 'ship manager' in role or 'commercial manager' in role: company['role'] = 'manager' elif 'ism manager' in role: company['role'] = 'ism_manager' elif 'operator' in role: company['role'] = 'operator' elif 'technical manager' in role: company['role'] = 'technical_manager' else: company['role'] = role elif 'name' in h: company['name'] = val elif 'address' in h: company['address'] = val elif 'date' in h: company['date_effect'] = val if company.get('name'): companies.append(company) if companies: break # Only first matching table return companies def get_company_contacts(self, company_name: str) -> Dict: """ Search for company details Note: Equasis has limited company contact info """ # Check cache first cached = self._get_cache('contacts', company_name.lower().strip()) if cached is not None: return cached if not self._check_rate_limit(): return {} if not self.logged_in: if not self.login(): return {} try: self._increment_counter() time.sleep(1) # Be polite resp = self.session.get( f"{EQUASIS_BASE}/restricted/CompanyInfo", params={'P_COMPANY': company_name} ) soup = BeautifulSoup(resp.text, 'html.parser') company = {'name': company_name} # Parse company info from div.row structure for row_div in soup.find_all('div', class_='row'): text = row_div.get_text(strip=True) if text.startswith('Address'): company['address'] = text[7:].strip() elif text.startswith('Country'): company['country'] = text[7:].strip() elif text.startswith('Telephone'): company['phone'] = text[9:].strip() # Fallback: parse from table for row in soup.find_all('tr'): cells = row.find_all('td') if len(cells) >= 2: label = cells[0].get_text(strip=True).lower() value = cells[1].get_text(strip=True) if 'address' in label and 'address' not in company: company['address'] = value elif 'country' in label and 'country' not in company: company['country'] = value elif ('telephone' in label or 'phone' in label) and 'phone' not in company: company['phone'] = value elif 'email' in label and 'email' not in company: company['email'] = value # Cache result if company.get('name') and (company.get('address') or company.get('country')): self._set_cache('contacts', company_name.lower().strip(), company) return company except Exception as e: print(f"Company search error: {e}") return {} # ============================================================================= # CONVENIENCE FUNCTIONS # ============================================================================= _parser = None def get_parser() -> EquasisParser: """Get singleton parser instance""" global _parser if _parser is None: _parser = EquasisParser() return _parser def search_vessel(query: str) -> List[Dict]: """Quick vessel search""" return get_parser().search_vessel(query) def get_vessel(imo: str) -> Dict: """Get vessel with owner/operator info""" return get_parser().get_vessel_details(imo) def get_contacts(company_name: str) -> Dict: """Get company contacts""" return get_parser().get_company_contacts(company_name) if __name__ == "__main__": from dotenv import load_dotenv load_dotenv() parser = EquasisParser() if parser.login(): print("\nSearching for 'EVER GIVEN'...") results = parser.search_vessel("EVER GIVEN") for v in results[:3]: print(f" - {v}") if results: imo = results[0].get('imo') print(f"\nGetting details for IMO {imo}...") details = parser.get_vessel_details(imo) print(json.dumps(details, indent=2, ensure_ascii=False))