220 lines
7.2 KiB
Python
220 lines
7.2 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Supervisor for mt_detail_sweep.py
|
|||
|
|
Runs the sweep in subprocess batches. Kills subprocess if it hangs.
|
|||
|
|
Uses threading for non-blocking subprocess output reading with timeout.
|
|||
|
|
"""
|
|||
|
|
import subprocess, sys, os, time, psycopg2, threading, queue
|
|||
|
|
|
|||
|
|
SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mt_detail_sweep.py')
|
|||
|
|
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
|
|||
|
|
BATCH_SIZE = 50 # Vessels per subprocess run
|
|||
|
|
DELAY = 0.8 # Seconds between vessel fetches
|
|||
|
|
# Per-run timeout: 50 vessels × ~4s + login ~60s = ~260s. Be generous:
|
|||
|
|
RUN_TIMEOUT = 360 # Max seconds per run before hard kill
|
|||
|
|
INACTIVITY_MAX = 60 # Kill if no output for this many seconds
|
|||
|
|
MAX_RUNS = 500 # Safety limit
|
|||
|
|
LOG = '/tmp/mt_detail_live.txt'
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_remaining():
|
|||
|
|
try:
|
|||
|
|
conn = psycopg2.connect(DB_URL, connect_timeout=8)
|
|||
|
|
cur = conn.cursor()
|
|||
|
|
cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE draught IS NULL OR companies_json IS NULL')
|
|||
|
|
n = cur.fetchone()[0]
|
|||
|
|
conn.close()
|
|||
|
|
return n
|
|||
|
|
except Exception as e:
|
|||
|
|
return -1
|
|||
|
|
|
|||
|
|
|
|||
|
|
def enqueue_output(pipe, q):
|
|||
|
|
"""Thread that reads subprocess stdout and puts lines into a queue."""
|
|||
|
|
try:
|
|||
|
|
for line in iter(pipe.readline, ''):
|
|||
|
|
q.put(line)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
finally:
|
|||
|
|
q.put(None) # sentinel: pipe closed
|
|||
|
|
|
|||
|
|
|
|||
|
|
def kill_processes():
|
|||
|
|
"""Kill leftover Chrome processes after subprocess exits — wait 3s first."""
|
|||
|
|
time.sleep(3) # Give Chrome time to close itself first
|
|||
|
|
try:
|
|||
|
|
os.system('taskkill /F /IM chrome.exe >NUL 2>&1')
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
time.sleep(2) # Wait for chrome to fully die before next run
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
# Open log file for supervisor + subprocess output
|
|||
|
|
log_f = open(LOG, 'w', encoding='utf-8', errors='replace', buffering=1)
|
|||
|
|
|
|||
|
|
def log(msg):
|
|||
|
|
ts = time.strftime('%H:%M:%S')
|
|||
|
|
line = f"[SUPERVISOR {ts}] {msg}"
|
|||
|
|
try:
|
|||
|
|
log_f.write(line + '\n')
|
|||
|
|
log_f.flush()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
log(f"Starting supervisor. Script: {SCRIPT}")
|
|||
|
|
log(f"Batch: {BATCH_SIZE} | Delay: {DELAY}s | Run timeout: {RUN_TIMEOUT}s | Inactivity: {INACTIVITY_MAX}s")
|
|||
|
|
|
|||
|
|
total_committed = 0
|
|||
|
|
run = 0
|
|||
|
|
|
|||
|
|
while run < MAX_RUNS:
|
|||
|
|
remaining = get_remaining()
|
|||
|
|
if remaining == 0:
|
|||
|
|
log("DONE — no more vessels to process!")
|
|||
|
|
break
|
|||
|
|
if remaining < 0:
|
|||
|
|
log("DB connection failed — check SSH tunnel. Retrying in 15s...")
|
|||
|
|
time.sleep(15)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
run += 1
|
|||
|
|
log(f"=== Run {run} | Remaining: {remaining} ===")
|
|||
|
|
|
|||
|
|
cmd = [
|
|||
|
|
sys.executable, '-u', SCRIPT,
|
|||
|
|
'--limit', str(BATCH_SIZE),
|
|||
|
|
'--delay', str(DELAY),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
t_start = time.time()
|
|||
|
|
proc = subprocess.Popen(
|
|||
|
|
cmd,
|
|||
|
|
stdout=subprocess.PIPE,
|
|||
|
|
stderr=subprocess.STDOUT,
|
|||
|
|
cwd=os.path.dirname(SCRIPT),
|
|||
|
|
text=True,
|
|||
|
|
encoding='utf-8',
|
|||
|
|
errors='replace',
|
|||
|
|
bufsize=1,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Reader thread feeds lines into queue
|
|||
|
|
q = queue.Queue()
|
|||
|
|
reader_thread = threading.Thread(target=enqueue_output, args=(proc.stdout, q), daemon=True)
|
|||
|
|
reader_thread.start()
|
|||
|
|
|
|||
|
|
last_output_time = time.time()
|
|||
|
|
timed_out = False
|
|||
|
|
kill_reason = ''
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
while True:
|
|||
|
|
# Non-blocking get from queue with 1s timeout
|
|||
|
|
try:
|
|||
|
|
line = q.get(timeout=1.0)
|
|||
|
|
except queue.Empty:
|
|||
|
|
line = None
|
|||
|
|
except Exception as e:
|
|||
|
|
log(f" Queue error: {e}")
|
|||
|
|
line = None
|
|||
|
|
|
|||
|
|
if line is None:
|
|||
|
|
# Either sentinel (pipe closed) or timeout
|
|||
|
|
# Check if pipe actually closed
|
|||
|
|
if proc.poll() is not None:
|
|||
|
|
break # Process ended normally
|
|||
|
|
# Process still running — check timeouts
|
|||
|
|
else:
|
|||
|
|
line = line.rstrip()
|
|||
|
|
last_output_time = time.time()
|
|||
|
|
try:
|
|||
|
|
log_f.write(line + '\n')
|
|||
|
|
log_f.flush()
|
|||
|
|
except Exception:
|
|||
|
|
# Re-open log file if it was closed/corrupted
|
|||
|
|
try:
|
|||
|
|
log_f = open(LOG, 'a', encoding='utf-8', errors='replace', buffering=1)
|
|||
|
|
log_f.write(line + '\n')
|
|||
|
|
log_f.flush()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# Check overall run timeout
|
|||
|
|
elapsed = time.time() - t_start
|
|||
|
|
if elapsed > RUN_TIMEOUT:
|
|||
|
|
kill_reason = f'RUN_TIMEOUT ({elapsed:.0f}s > {RUN_TIMEOUT}s)'
|
|||
|
|
timed_out = True
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# Check inactivity timeout
|
|||
|
|
inactive = time.time() - last_output_time
|
|||
|
|
if inactive > INACTIVITY_MAX:
|
|||
|
|
kill_reason = f'INACTIVITY ({inactive:.0f}s > {INACTIVITY_MAX}s)'
|
|||
|
|
timed_out = True
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
log("Interrupted by user")
|
|||
|
|
try:
|
|||
|
|
proc.terminate()
|
|||
|
|
proc.wait(timeout=10)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
log(f" Main loop error: {e} — continuing")
|
|||
|
|
try:
|
|||
|
|
proc.terminate()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if timed_out:
|
|||
|
|
log(f" Killing subprocess: {kill_reason}")
|
|||
|
|
try:
|
|||
|
|
proc.terminate()
|
|||
|
|
time.sleep(3)
|
|||
|
|
if proc.poll() is None:
|
|||
|
|
proc.kill()
|
|||
|
|
proc.wait(timeout=10)
|
|||
|
|
except Exception as e:
|
|||
|
|
log(f" Kill error: {e}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
kill_processes()
|
|||
|
|
except Exception as e:
|
|||
|
|
log(f" kill_processes error: {e}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
exit_code = proc.returncode
|
|||
|
|
except Exception:
|
|||
|
|
exit_code = -1
|
|||
|
|
elapsed = time.time() - t_start
|
|||
|
|
log(f" Run {run} done: exit={exit_code} elapsed={elapsed:.0f}s timed_out={timed_out}"
|
|||
|
|
+ (f" [{kill_reason}]" if timed_out else ""))
|
|||
|
|
|
|||
|
|
# Count committed vessels
|
|||
|
|
try:
|
|||
|
|
new_remaining = get_remaining()
|
|||
|
|
if new_remaining >= 0 and new_remaining < remaining:
|
|||
|
|
committed = remaining - new_remaining
|
|||
|
|
total_committed += committed
|
|||
|
|
log(f" Committed: {committed} | Total: {total_committed} | Still need: {new_remaining}")
|
|||
|
|
else:
|
|||
|
|
log(f" No new commits (remaining={new_remaining})")
|
|||
|
|
except Exception as e:
|
|||
|
|
log(f" get_remaining error: {e}")
|
|||
|
|
new_remaining = remaining # Assume unchanged
|
|||
|
|
|
|||
|
|
if remaining > 0 and new_remaining != 0:
|
|||
|
|
log(" Waiting 8s before next run...")
|
|||
|
|
time.sleep(8)
|
|||
|
|
|
|||
|
|
log(f"Supervisor done. {total_committed} vessels committed across {run} runs.")
|
|||
|
|
log_f.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
main()
|