montana/Русский/Логистика/patch_contacts_integration.py

450 lines
18 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Patch: Integrate mt_bulk_staging into search_contacts / unlock_contacts.
Before: search_contacts only queries `contacts` table (8 rows), fallback to staging
shows everything for free, no masking.
After: search_contacts queries staging by name/MMSI/IMO/owner, returns MASKED preview.
unlock_contacts reveals full data, charges user (or free if price=0), tracks purchases.
"""
import sys
# ─── 1. maritime_db.py ───────────────────────────────────────────────────────
with open('maritime_db.py', 'r', encoding='utf-8') as f:
db_code = f.read()
# 1a. Replace search_contacts_in_staging — add MMSI/IMO search, remove email filter, add mask param
old_staging = '''def search_contacts_in_staging(query: str, limit: int = 10) -> list:
"""Search mt_bulk_staging for vessel/owner contacts (fallback when contacts table is empty)."""
conn = get_connection()
try:
cursor = conn.cursor()
q = f"%{query}%"
cursor.execute("""
SELECT name, imo, owner, owner_email, owner_phone, owner_website,
owner_address, owner_country, operator, operator_website,
registered_owner, registered_owner_email, registered_owner_website
FROM mt_bulk_staging
WHERE (name ILIKE ? OR owner ILIKE ? OR operator ILIKE ?)
AND (owner_email IS NOT NULL OR operator_website IS NOT NULL)
LIMIT ?
""", (q, q, q, limit))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
contacts_out = []
# Owner contact
if r.get('owner'):
contacts_out.append({
'name': r['owner'],
'company_name': r['owner'],
'company': r.get('name', ''),
'role': 'Owner',
'phone': r.get('owner_phone') or '',
'email': r.get('owner_email') or '',
'website': r.get('owner_website') or '',
'country': r.get('owner_country') or '',
})
# Registered owner (if different)
if r.get('registered_owner') and r.get('registered_owner') != r.get('owner'):
contacts_out.append({
'name': r['registered_owner'],
'company_name': r['registered_owner'],
'company': r.get('name', ''),
'role': 'Registered Owner',
'phone': '',
'email': r.get('registered_owner_email') or '',
'website': r.get('registered_owner_website') or '',
})
# Operator (if different from owner)
if r.get('operator') and r.get('operator') != r.get('owner'):
contacts_out.append({
'name': r['operator'],
'company_name': r['operator'],
'company': r.get('name', ''),
'role': 'Operator',
'phone': '',
'email': '',
'website': r.get('operator_website') or '',
})
results.extend(contacts_out)
# Deduplicate by name
seen = set()
unique = []
for c in results:
if c['name'] not in seen:
seen.add(c['name'])
unique.append(c)
return unique[:limit]
finally:
conn.close()'''
new_staging = '''def _mask_email(email):
"""Mask email: john.doe@maersk.com -> j***@maersk.com"""
if not email:
return ''
parts = email.split('@')
if len(parts) != 2:
return '***@***.com'
local = parts[0]
masked_local = local[0] + '***' if local else '***'
return f"{masked_local}@{parts[1]}"
def _mask_phone(phone):
"""Mask phone: +1-555-123-4567 -> +1-***-**67"""
if not phone:
return ''
if len(phone) > 6:
return phone[:3] + '***' + phone[-2:]
return '***'
def search_contacts_in_staging(query: str, limit: int = 10, mask: bool = True) -> list:
"""Search mt_bulk_staging for vessel/owner contacts by name, MMSI, IMO, or owner/operator name.
mask=True: emails/phones masked (free preview). mask=False: full data (paid unlock)."""
conn = get_connection()
try:
cursor = conn.cursor()
q = f"%{query}%"
q_exact = query.strip()
# Search by name, owner, operator, registered_owner, MMSI, IMO
cursor.execute("""
SELECT name, imo, mmsi, flag, dwt, type_category,
owner, owner_email, owner_phone, owner_website,
owner_address, owner_country,
operator, operator_website, operator_country,
registered_owner, registered_owner_email, registered_owner_website,
registered_owner_country,
commercial_manager, commercial_manager_website, commercial_manager_country
FROM mt_bulk_staging
WHERE (name ILIKE ? OR owner ILIKE ? OR operator ILIKE ?
OR registered_owner ILIKE ?
OR CAST(mmsi AS TEXT) = ? OR CAST(imo AS TEXT) = ?)
AND (owner IS NOT NULL OR operator IS NOT NULL)
LIMIT ?
""", (q, q, q, q, q_exact, q_exact, limit))
rows = cursor.fetchall()
results = []
for row in rows:
r = dict(row)
vessel_info = f"{r.get('name', '')} (IMO: {r.get('imo', 'N/A')}, MMSI: {r.get('mmsi', 'N/A')})"
contacts_out = []
# Owner contact
if r.get('owner'):
email_raw = r.get('owner_email') or ''
phone_raw = r.get('owner_phone') or ''
contacts_out.append({
'company_name': r['owner'],
'vessel': vessel_info,
'role': 'Beneficial Owner',
'email': _mask_email(email_raw) if mask else email_raw,
'phone': _mask_phone(phone_raw) if mask else phone_raw,
'website': r.get('owner_website') or '',
'country': r.get('owner_country') or '',
'address': (r.get('owner_address') or '') if not mask else '',
'has_email': bool(email_raw),
'has_phone': bool(phone_raw),
})
# Registered owner (if different)
if r.get('registered_owner') and r.get('registered_owner') != r.get('owner'):
email_raw = r.get('registered_owner_email') or ''
contacts_out.append({
'company_name': r['registered_owner'],
'vessel': vessel_info,
'role': 'Registered Owner',
'email': _mask_email(email_raw) if mask else email_raw,
'phone': '',
'website': r.get('registered_owner_website') or '',
'country': r.get('registered_owner_country') or '',
'address': '',
'has_email': bool(email_raw),
'has_phone': False,
})
# Operator (if different from owner)
if r.get('operator') and r.get('operator') != r.get('owner'):
contacts_out.append({
'company_name': r['operator'],
'vessel': vessel_info,
'role': 'Operator',
'email': '',
'phone': '',
'website': r.get('operator_website') or '',
'country': r.get('operator_country') or '',
'address': '',
'has_email': False,
'has_phone': False,
})
# Commercial manager (if different)
if r.get('commercial_manager') and r.get('commercial_manager') != r.get('owner'):
contacts_out.append({
'company_name': r['commercial_manager'],
'vessel': vessel_info,
'role': 'Commercial Manager',
'email': '',
'phone': '',
'website': r.get('commercial_manager_website') or '',
'country': r.get('commercial_manager_country') or '',
'address': '',
'has_email': False,
'has_phone': False,
})
results.extend(contacts_out)
# Deduplicate by company_name + role
seen = set()
unique = []
for c in results:
key = (c['company_name'], c['role'])
if key not in seen:
seen.add(key)
unique.append(c)
return unique[:limit]
finally:
conn.close()'''
if old_staging in db_code:
db_code = db_code.replace(old_staging, new_staging)
print("[OK] maritime_db.py: search_contacts_in_staging replaced with masked version + MMSI/IMO search")
else:
print("[SKIP] maritime_db.py: search_contacts_in_staging not found (already patched?)")
with open('maritime_db.py', 'w', encoding='utf-8') as f:
f.write(db_code)
# ─── 2. seafare_agent.py ────────────────────────────────────────────────────
with open('seafare_agent.py', 'r', encoding='utf-8') as f:
agent_code = f.read()
# 2a. Replace _tool_search_contacts — masked preview + check purchased
old_search = '''def _tool_search_contacts(tool_input, is_admin=False, user_id=None):
query = tool_input.get("query")
contact_type = tool_input.get("type")
contacts = db.search_contacts(query=query, contact_type=contact_type, limit=10)
# Fallback: search mt_bulk_staging if contacts table has no results
if not contacts and query:
contacts = db.search_contacts_in_staging(query, limit=10)
# Word-split retry: "ASCO Azerbaijan" -> try "ASCO", then "Azerbaijan"
if not contacts and query and ' ' in query:
for word in sorted(query.split(), key=len, reverse=True):
if len(word) >= 3:
contacts = db.search_contacts(query=word, contact_type=contact_type, limit=10)
if contacts:
break
if not contacts:
for word in sorted(query.split(), key=len, reverse=True):
if len(word) >= 3:
contacts = db.search_contacts_in_staging(word, limit=10)
if contacts:
break
for c in contacts:
c.pop('source', None)
c.pop('id', None)
c.pop('created_at', None)
c.pop('updated_at', None)
return json.dumps({
"found": len(contacts),
"note": "Full contact details (temporarily free)",
"contacts": contacts
}, ensure_ascii=False)'''
new_search = '''def _tool_search_contacts(tool_input, is_admin=False, user_id=None):
query = tool_input.get("query")
contact_type = tool_input.get("type")
if not query:
return json.dumps({"found": 0, "error": "Query is required."})
# 1. Check if user already purchased these contacts — return full data
if user_id:
purchased = db.has_purchased_contact(user_id, query, contact_type)
if purchased:
return json.dumps({
"found": len(purchased),
"contacts": purchased,
"note": "Previously unlocked contacts (no charge)."
}, ensure_ascii=False)
# 2. Search contacts table (legacy 8 rows)
contacts = db.search_contacts(query=query, contact_type=contact_type, limit=10)
# 3. Search mt_bulk_staging (40K+ vessels) — masked preview
staging_contacts = []
if query:
staging_contacts = db.search_contacts_in_staging(query, limit=10, mask=True)
# 4. Word-split retry if nothing found
if not contacts and not staging_contacts and query and ' ' in query:
for word in sorted(query.split(), key=len, reverse=True):
if len(word) >= 3:
staging_contacts = db.search_contacts_in_staging(word, limit=10, mask=True)
if staging_contacts:
break
# 5. Merge results (legacy contacts are already full, staging are masked)
all_contacts = []
for c in contacts:
c.pop('source', None)
c.pop('id', None)
c.pop('created_at', None)
c.pop('updated_at', None)
c['unlocked'] = True
all_contacts.append(c)
for c in staging_contacts:
c['unlocked'] = False
all_contacts.append(c)
# Count how many have emails/phones available for unlock
unlock_count = sum(1 for c in all_contacts if c.get('has_email') or c.get('has_phone'))
price = CONTACT_UNLOCK_PRICE
result = {
"found": len(all_contacts),
"contacts": all_contacts,
}
if all_contacts and unlock_count > 0:
if price > 0:
result["note"] = f"Contact emails/phones are masked. Use unlock_contacts to reveal full details (${price:.2f})."
else:
result["note"] = "Contact emails/phones are masked. Use unlock_contacts to reveal full details (free during beta)."
elif all_contacts:
result["note"] = f"Found {len(all_contacts)} contacts. No additional email/phone data available to unlock."
else:
result["note"] = "No contacts found in database."
return json.dumps(result, ensure_ascii=False)'''
if old_search in agent_code:
agent_code = agent_code.replace(old_search, new_search)
print("[OK] seafare_agent.py: _tool_search_contacts replaced with masked preview + purchased check")
else:
print("[FAIL] seafare_agent.py: _tool_search_contacts old block not found!")
sys.exit(1)
# 2b. Replace _tool_unlock_contacts — real unlock with charge + tracking
old_unlock = '''def _tool_unlock_contacts(tool_input, is_admin=False, user_id=None):
# Temporarily free — delegate to search_contacts with caller's real permissions
return execute_tool("search_contacts", tool_input, is_admin=is_admin, user_id=user_id)'''
new_unlock = '''def _tool_unlock_contacts(tool_input, is_admin=False, user_id=None):
query = tool_input.get("query")
contact_type = tool_input.get("type")
if not query:
return json.dumps({"error": "Query is required."})
if not user_id:
return json.dumps({"error": "Authentication required to unlock contacts. Please log in."})
# 1. Check already purchased — return full data free
purchased = db.has_purchased_contact(user_id, query, contact_type)
if purchased:
return json.dumps({
"found": len(purchased),
"contacts": purchased,
"note": "Previously unlocked contacts (no charge)."
}, ensure_ascii=False)
# 2. Get full (unmasked) contacts from staging
contacts = db.search_contacts_in_staging(query, limit=10, mask=False)
# Word-split retry
if not contacts and ' ' in query:
for word in sorted(query.split(), key=len, reverse=True):
if len(word) >= 3:
contacts = db.search_contacts_in_staging(word, limit=10, mask=False)
if contacts:
break
# Also check legacy contacts table
legacy = db.search_contacts(query=query, contact_type=contact_type, limit=10)
if legacy:
for c in legacy:
c.pop('source', None)
c.pop('id', None)
c.pop('created_at', None)
c.pop('updated_at', None)
contacts = legacy + contacts
if not contacts:
return json.dumps({"found": 0, "note": "No contacts found for this query."})
# 3. Charge user (skip if admin or price=0)
price = CONTACT_UNLOCK_PRICE
if price > 0 and not is_admin:
success = db.charge_and_log(
user_id=user_id,
amount=price,
service='contact_unlock',
details=f"Unlock contacts: {query}",
contacts=contacts,
query=query,
contact_type=contact_type or ''
)
if not success:
return json.dumps({
"error": f"Insufficient balance. Contact unlock costs ${price:.2f}. Please top up your wallet.",
"found": len(contacts),
"preview": [{"company_name": c.get("company_name", ""), "role": c.get("role", "")} for c in contacts[:5]]
})
else:
# Free (beta) or admin — still track in purchased_contacts
db.save_purchased_contacts(user_id, contacts, query, contact_type or '', 0.0)
# Remove internal flags before returning
for c in contacts:
c.pop('has_email', None)
c.pop('has_phone', None)
c['unlocked'] = True
note = f"Full contact details unlocked ({len(contacts)} contacts)."
if price > 0 and not is_admin:
note += f" Charged ${price:.2f}."
else:
note += " Free during beta."
return json.dumps({
"found": len(contacts),
"contacts": contacts,
"note": note
}, ensure_ascii=False)'''
if old_unlock in agent_code:
agent_code = agent_code.replace(old_unlock, new_unlock)
print("[OK] seafare_agent.py: _tool_unlock_contacts replaced with real unlock flow")
else:
print("[FAIL] seafare_agent.py: _tool_unlock_contacts old block not found!")
sys.exit(1)
with open('seafare_agent.py', 'w', encoding='utf-8') as f:
f.write(agent_code)
# ─── 3. config.py — bump version ────────────────────────────────────────────
with open('config.py', 'r', encoding='utf-8') as f:
cfg = f.read()
old_ver = "APP_VERSION = '3.43.3'"
new_ver = "APP_VERSION = '3.44.0'"
if old_ver in cfg:
cfg = cfg.replace(old_ver, new_ver)
print(f"[OK] config.py: version bumped to 3.44.0")
else:
print(f"[SKIP] config.py: version {old_ver} not found")
with open('config.py', 'w', encoding='utf-8') as f:
f.write(cfg)
print("\n=== PATCH COMPLETE ===")
print("Next: py_compile all files, restart seafare, test search_contacts")