450 lines
18 KiB
Python
Executable File
450 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Patch: Integrate mt_bulk_staging into search_contacts / unlock_contacts.
|
|
|
|
Before: search_contacts only queries `contacts` table (8 rows), fallback to staging
|
|
shows everything for free, no masking.
|
|
After: search_contacts queries staging by name/MMSI/IMO/owner, returns MASKED preview.
|
|
unlock_contacts reveals full data, charges user (or free if price=0), tracks purchases.
|
|
"""
|
|
|
|
import sys
|
|
|
|
# ─── 1. maritime_db.py ───────────────────────────────────────────────────────
|
|
|
|
with open('maritime_db.py', 'r', encoding='utf-8') as f:
|
|
db_code = f.read()
|
|
|
|
# 1a. Replace search_contacts_in_staging — add MMSI/IMO search, remove email filter, add mask param
|
|
old_staging = '''def search_contacts_in_staging(query: str, limit: int = 10) -> list:
|
|
"""Search mt_bulk_staging for vessel/owner contacts (fallback when contacts table is empty)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
q = f"%{query}%"
|
|
cursor.execute("""
|
|
SELECT name, imo, owner, owner_email, owner_phone, owner_website,
|
|
owner_address, owner_country, operator, operator_website,
|
|
registered_owner, registered_owner_email, registered_owner_website
|
|
FROM mt_bulk_staging
|
|
WHERE (name ILIKE ? OR owner ILIKE ? OR operator ILIKE ?)
|
|
AND (owner_email IS NOT NULL OR operator_website IS NOT NULL)
|
|
LIMIT ?
|
|
""", (q, q, q, limit))
|
|
rows = cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
contacts_out = []
|
|
# Owner contact
|
|
if r.get('owner'):
|
|
contacts_out.append({
|
|
'name': r['owner'],
|
|
'company_name': r['owner'],
|
|
'company': r.get('name', ''),
|
|
'role': 'Owner',
|
|
'phone': r.get('owner_phone') or '',
|
|
'email': r.get('owner_email') or '',
|
|
'website': r.get('owner_website') or '',
|
|
'country': r.get('owner_country') or '',
|
|
})
|
|
# Registered owner (if different)
|
|
if r.get('registered_owner') and r.get('registered_owner') != r.get('owner'):
|
|
contacts_out.append({
|
|
'name': r['registered_owner'],
|
|
'company_name': r['registered_owner'],
|
|
'company': r.get('name', ''),
|
|
'role': 'Registered Owner',
|
|
'phone': '',
|
|
'email': r.get('registered_owner_email') or '',
|
|
'website': r.get('registered_owner_website') or '',
|
|
})
|
|
# Operator (if different from owner)
|
|
if r.get('operator') and r.get('operator') != r.get('owner'):
|
|
contacts_out.append({
|
|
'name': r['operator'],
|
|
'company_name': r['operator'],
|
|
'company': r.get('name', ''),
|
|
'role': 'Operator',
|
|
'phone': '',
|
|
'email': '',
|
|
'website': r.get('operator_website') or '',
|
|
})
|
|
results.extend(contacts_out)
|
|
# Deduplicate by name
|
|
seen = set()
|
|
unique = []
|
|
for c in results:
|
|
if c['name'] not in seen:
|
|
seen.add(c['name'])
|
|
unique.append(c)
|
|
return unique[:limit]
|
|
finally:
|
|
conn.close()'''
|
|
|
|
new_staging = '''def _mask_email(email):
|
|
"""Mask email: john.doe@maersk.com -> j***@maersk.com"""
|
|
if not email:
|
|
return ''
|
|
parts = email.split('@')
|
|
if len(parts) != 2:
|
|
return '***@***.com'
|
|
local = parts[0]
|
|
masked_local = local[0] + '***' if local else '***'
|
|
return f"{masked_local}@{parts[1]}"
|
|
|
|
|
|
def _mask_phone(phone):
|
|
"""Mask phone: +1-555-123-4567 -> +1-***-**67"""
|
|
if not phone:
|
|
return ''
|
|
if len(phone) > 6:
|
|
return phone[:3] + '***' + phone[-2:]
|
|
return '***'
|
|
|
|
|
|
def search_contacts_in_staging(query: str, limit: int = 10, mask: bool = True) -> list:
|
|
"""Search mt_bulk_staging for vessel/owner contacts by name, MMSI, IMO, or owner/operator name.
|
|
mask=True: emails/phones masked (free preview). mask=False: full data (paid unlock)."""
|
|
conn = get_connection()
|
|
try:
|
|
cursor = conn.cursor()
|
|
q = f"%{query}%"
|
|
q_exact = query.strip()
|
|
# Search by name, owner, operator, registered_owner, MMSI, IMO
|
|
cursor.execute("""
|
|
SELECT name, imo, mmsi, flag, dwt, type_category,
|
|
owner, owner_email, owner_phone, owner_website,
|
|
owner_address, owner_country,
|
|
operator, operator_website, operator_country,
|
|
registered_owner, registered_owner_email, registered_owner_website,
|
|
registered_owner_country,
|
|
commercial_manager, commercial_manager_website, commercial_manager_country
|
|
FROM mt_bulk_staging
|
|
WHERE (name ILIKE ? OR owner ILIKE ? OR operator ILIKE ?
|
|
OR registered_owner ILIKE ?
|
|
OR CAST(mmsi AS TEXT) = ? OR CAST(imo AS TEXT) = ?)
|
|
AND (owner IS NOT NULL OR operator IS NOT NULL)
|
|
LIMIT ?
|
|
""", (q, q, q, q, q_exact, q_exact, limit))
|
|
rows = cursor.fetchall()
|
|
results = []
|
|
for row in rows:
|
|
r = dict(row)
|
|
vessel_info = f"{r.get('name', '')} (IMO: {r.get('imo', 'N/A')}, MMSI: {r.get('mmsi', 'N/A')})"
|
|
contacts_out = []
|
|
# Owner contact
|
|
if r.get('owner'):
|
|
email_raw = r.get('owner_email') or ''
|
|
phone_raw = r.get('owner_phone') or ''
|
|
contacts_out.append({
|
|
'company_name': r['owner'],
|
|
'vessel': vessel_info,
|
|
'role': 'Beneficial Owner',
|
|
'email': _mask_email(email_raw) if mask else email_raw,
|
|
'phone': _mask_phone(phone_raw) if mask else phone_raw,
|
|
'website': r.get('owner_website') or '',
|
|
'country': r.get('owner_country') or '',
|
|
'address': (r.get('owner_address') or '') if not mask else '',
|
|
'has_email': bool(email_raw),
|
|
'has_phone': bool(phone_raw),
|
|
})
|
|
# Registered owner (if different)
|
|
if r.get('registered_owner') and r.get('registered_owner') != r.get('owner'):
|
|
email_raw = r.get('registered_owner_email') or ''
|
|
contacts_out.append({
|
|
'company_name': r['registered_owner'],
|
|
'vessel': vessel_info,
|
|
'role': 'Registered Owner',
|
|
'email': _mask_email(email_raw) if mask else email_raw,
|
|
'phone': '',
|
|
'website': r.get('registered_owner_website') or '',
|
|
'country': r.get('registered_owner_country') or '',
|
|
'address': '',
|
|
'has_email': bool(email_raw),
|
|
'has_phone': False,
|
|
})
|
|
# Operator (if different from owner)
|
|
if r.get('operator') and r.get('operator') != r.get('owner'):
|
|
contacts_out.append({
|
|
'company_name': r['operator'],
|
|
'vessel': vessel_info,
|
|
'role': 'Operator',
|
|
'email': '',
|
|
'phone': '',
|
|
'website': r.get('operator_website') or '',
|
|
'country': r.get('operator_country') or '',
|
|
'address': '',
|
|
'has_email': False,
|
|
'has_phone': False,
|
|
})
|
|
# Commercial manager (if different)
|
|
if r.get('commercial_manager') and r.get('commercial_manager') != r.get('owner'):
|
|
contacts_out.append({
|
|
'company_name': r['commercial_manager'],
|
|
'vessel': vessel_info,
|
|
'role': 'Commercial Manager',
|
|
'email': '',
|
|
'phone': '',
|
|
'website': r.get('commercial_manager_website') or '',
|
|
'country': r.get('commercial_manager_country') or '',
|
|
'address': '',
|
|
'has_email': False,
|
|
'has_phone': False,
|
|
})
|
|
results.extend(contacts_out)
|
|
# Deduplicate by company_name + role
|
|
seen = set()
|
|
unique = []
|
|
for c in results:
|
|
key = (c['company_name'], c['role'])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(c)
|
|
return unique[:limit]
|
|
finally:
|
|
conn.close()'''
|
|
|
|
if old_staging in db_code:
|
|
db_code = db_code.replace(old_staging, new_staging)
|
|
print("[OK] maritime_db.py: search_contacts_in_staging replaced with masked version + MMSI/IMO search")
|
|
else:
|
|
print("[SKIP] maritime_db.py: search_contacts_in_staging not found (already patched?)")
|
|
|
|
with open('maritime_db.py', 'w', encoding='utf-8') as f:
|
|
f.write(db_code)
|
|
|
|
|
|
# ─── 2. seafare_agent.py ────────────────────────────────────────────────────
|
|
|
|
with open('seafare_agent.py', 'r', encoding='utf-8') as f:
|
|
agent_code = f.read()
|
|
|
|
# 2a. Replace _tool_search_contacts — masked preview + check purchased
|
|
old_search = '''def _tool_search_contacts(tool_input, is_admin=False, user_id=None):
|
|
query = tool_input.get("query")
|
|
contact_type = tool_input.get("type")
|
|
contacts = db.search_contacts(query=query, contact_type=contact_type, limit=10)
|
|
|
|
# Fallback: search mt_bulk_staging if contacts table has no results
|
|
if not contacts and query:
|
|
contacts = db.search_contacts_in_staging(query, limit=10)
|
|
|
|
# Word-split retry: "ASCO Azerbaijan" -> try "ASCO", then "Azerbaijan"
|
|
if not contacts and query and ' ' in query:
|
|
for word in sorted(query.split(), key=len, reverse=True):
|
|
if len(word) >= 3:
|
|
contacts = db.search_contacts(query=word, contact_type=contact_type, limit=10)
|
|
if contacts:
|
|
break
|
|
if not contacts:
|
|
for word in sorted(query.split(), key=len, reverse=True):
|
|
if len(word) >= 3:
|
|
contacts = db.search_contacts_in_staging(word, limit=10)
|
|
if contacts:
|
|
break
|
|
|
|
for c in contacts:
|
|
c.pop('source', None)
|
|
c.pop('id', None)
|
|
c.pop('created_at', None)
|
|
c.pop('updated_at', None)
|
|
|
|
return json.dumps({
|
|
"found": len(contacts),
|
|
"note": "Full contact details (temporarily free)",
|
|
"contacts": contacts
|
|
}, ensure_ascii=False)'''
|
|
|
|
new_search = '''def _tool_search_contacts(tool_input, is_admin=False, user_id=None):
|
|
query = tool_input.get("query")
|
|
contact_type = tool_input.get("type")
|
|
|
|
if not query:
|
|
return json.dumps({"found": 0, "error": "Query is required."})
|
|
|
|
# 1. Check if user already purchased these contacts — return full data
|
|
if user_id:
|
|
purchased = db.has_purchased_contact(user_id, query, contact_type)
|
|
if purchased:
|
|
return json.dumps({
|
|
"found": len(purchased),
|
|
"contacts": purchased,
|
|
"note": "Previously unlocked contacts (no charge)."
|
|
}, ensure_ascii=False)
|
|
|
|
# 2. Search contacts table (legacy 8 rows)
|
|
contacts = db.search_contacts(query=query, contact_type=contact_type, limit=10)
|
|
|
|
# 3. Search mt_bulk_staging (40K+ vessels) — masked preview
|
|
staging_contacts = []
|
|
if query:
|
|
staging_contacts = db.search_contacts_in_staging(query, limit=10, mask=True)
|
|
|
|
# 4. Word-split retry if nothing found
|
|
if not contacts and not staging_contacts and query and ' ' in query:
|
|
for word in sorted(query.split(), key=len, reverse=True):
|
|
if len(word) >= 3:
|
|
staging_contacts = db.search_contacts_in_staging(word, limit=10, mask=True)
|
|
if staging_contacts:
|
|
break
|
|
|
|
# 5. Merge results (legacy contacts are already full, staging are masked)
|
|
all_contacts = []
|
|
for c in contacts:
|
|
c.pop('source', None)
|
|
c.pop('id', None)
|
|
c.pop('created_at', None)
|
|
c.pop('updated_at', None)
|
|
c['unlocked'] = True
|
|
all_contacts.append(c)
|
|
for c in staging_contacts:
|
|
c['unlocked'] = False
|
|
all_contacts.append(c)
|
|
|
|
# Count how many have emails/phones available for unlock
|
|
unlock_count = sum(1 for c in all_contacts if c.get('has_email') or c.get('has_phone'))
|
|
price = CONTACT_UNLOCK_PRICE
|
|
|
|
result = {
|
|
"found": len(all_contacts),
|
|
"contacts": all_contacts,
|
|
}
|
|
|
|
if all_contacts and unlock_count > 0:
|
|
if price > 0:
|
|
result["note"] = f"Contact emails/phones are masked. Use unlock_contacts to reveal full details (${price:.2f})."
|
|
else:
|
|
result["note"] = "Contact emails/phones are masked. Use unlock_contacts to reveal full details (free during beta)."
|
|
elif all_contacts:
|
|
result["note"] = f"Found {len(all_contacts)} contacts. No additional email/phone data available to unlock."
|
|
else:
|
|
result["note"] = "No contacts found in database."
|
|
|
|
return json.dumps(result, ensure_ascii=False)'''
|
|
|
|
if old_search in agent_code:
|
|
agent_code = agent_code.replace(old_search, new_search)
|
|
print("[OK] seafare_agent.py: _tool_search_contacts replaced with masked preview + purchased check")
|
|
else:
|
|
print("[FAIL] seafare_agent.py: _tool_search_contacts old block not found!")
|
|
sys.exit(1)
|
|
|
|
# 2b. Replace _tool_unlock_contacts — real unlock with charge + tracking
|
|
old_unlock = '''def _tool_unlock_contacts(tool_input, is_admin=False, user_id=None):
|
|
# Temporarily free — delegate to search_contacts with caller's real permissions
|
|
return execute_tool("search_contacts", tool_input, is_admin=is_admin, user_id=user_id)'''
|
|
|
|
new_unlock = '''def _tool_unlock_contacts(tool_input, is_admin=False, user_id=None):
|
|
query = tool_input.get("query")
|
|
contact_type = tool_input.get("type")
|
|
|
|
if not query:
|
|
return json.dumps({"error": "Query is required."})
|
|
|
|
if not user_id:
|
|
return json.dumps({"error": "Authentication required to unlock contacts. Please log in."})
|
|
|
|
# 1. Check already purchased — return full data free
|
|
purchased = db.has_purchased_contact(user_id, query, contact_type)
|
|
if purchased:
|
|
return json.dumps({
|
|
"found": len(purchased),
|
|
"contacts": purchased,
|
|
"note": "Previously unlocked contacts (no charge)."
|
|
}, ensure_ascii=False)
|
|
|
|
# 2. Get full (unmasked) contacts from staging
|
|
contacts = db.search_contacts_in_staging(query, limit=10, mask=False)
|
|
|
|
# Word-split retry
|
|
if not contacts and ' ' in query:
|
|
for word in sorted(query.split(), key=len, reverse=True):
|
|
if len(word) >= 3:
|
|
contacts = db.search_contacts_in_staging(word, limit=10, mask=False)
|
|
if contacts:
|
|
break
|
|
|
|
# Also check legacy contacts table
|
|
legacy = db.search_contacts(query=query, contact_type=contact_type, limit=10)
|
|
if legacy:
|
|
for c in legacy:
|
|
c.pop('source', None)
|
|
c.pop('id', None)
|
|
c.pop('created_at', None)
|
|
c.pop('updated_at', None)
|
|
contacts = legacy + contacts
|
|
|
|
if not contacts:
|
|
return json.dumps({"found": 0, "note": "No contacts found for this query."})
|
|
|
|
# 3. Charge user (skip if admin or price=0)
|
|
price = CONTACT_UNLOCK_PRICE
|
|
if price > 0 and not is_admin:
|
|
success = db.charge_and_log(
|
|
user_id=user_id,
|
|
amount=price,
|
|
service='contact_unlock',
|
|
details=f"Unlock contacts: {query}",
|
|
contacts=contacts,
|
|
query=query,
|
|
contact_type=contact_type or ''
|
|
)
|
|
if not success:
|
|
return json.dumps({
|
|
"error": f"Insufficient balance. Contact unlock costs ${price:.2f}. Please top up your wallet.",
|
|
"found": len(contacts),
|
|
"preview": [{"company_name": c.get("company_name", ""), "role": c.get("role", "")} for c in contacts[:5]]
|
|
})
|
|
else:
|
|
# Free (beta) or admin — still track in purchased_contacts
|
|
db.save_purchased_contacts(user_id, contacts, query, contact_type or '', 0.0)
|
|
|
|
# Remove internal flags before returning
|
|
for c in contacts:
|
|
c.pop('has_email', None)
|
|
c.pop('has_phone', None)
|
|
c['unlocked'] = True
|
|
|
|
note = f"Full contact details unlocked ({len(contacts)} contacts)."
|
|
if price > 0 and not is_admin:
|
|
note += f" Charged ${price:.2f}."
|
|
else:
|
|
note += " Free during beta."
|
|
|
|
return json.dumps({
|
|
"found": len(contacts),
|
|
"contacts": contacts,
|
|
"note": note
|
|
}, ensure_ascii=False)'''
|
|
|
|
if old_unlock in agent_code:
|
|
agent_code = agent_code.replace(old_unlock, new_unlock)
|
|
print("[OK] seafare_agent.py: _tool_unlock_contacts replaced with real unlock flow")
|
|
else:
|
|
print("[FAIL] seafare_agent.py: _tool_unlock_contacts old block not found!")
|
|
sys.exit(1)
|
|
|
|
with open('seafare_agent.py', 'w', encoding='utf-8') as f:
|
|
f.write(agent_code)
|
|
|
|
|
|
# ─── 3. config.py — bump version ────────────────────────────────────────────
|
|
|
|
with open('config.py', 'r', encoding='utf-8') as f:
|
|
cfg = f.read()
|
|
|
|
old_ver = "APP_VERSION = '3.43.3'"
|
|
new_ver = "APP_VERSION = '3.44.0'"
|
|
if old_ver in cfg:
|
|
cfg = cfg.replace(old_ver, new_ver)
|
|
print(f"[OK] config.py: version bumped to 3.44.0")
|
|
else:
|
|
print(f"[SKIP] config.py: version {old_ver} not found")
|
|
|
|
with open('config.py', 'w', encoding='utf-8') as f:
|
|
f.write(cfg)
|
|
|
|
|
|
print("\n=== PATCH COMPLETE ===")
|
|
print("Next: py_compile all files, restart seafare, test search_contacts")
|