montana/Русский/Логистика/mt_supervisor.py

220 lines
7.2 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Supervisor for mt_detail_sweep.py
Runs the sweep in subprocess batches. Kills subprocess if it hangs.
Uses threading for non-blocking subprocess output reading with timeout.
"""
import subprocess, sys, os, time, psycopg2, threading, queue
SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mt_detail_sweep.py')
DB_URL = 'postgresql://seafare:SF_m0ntana_2026@127.0.0.1:15432/seafare_db'
BATCH_SIZE = 50 # Vessels per subprocess run
DELAY = 0.8 # Seconds between vessel fetches
# Per-run timeout: 50 vessels × ~4s + login ~60s = ~260s. Be generous:
RUN_TIMEOUT = 360 # Max seconds per run before hard kill
INACTIVITY_MAX = 60 # Kill if no output for this many seconds
MAX_RUNS = 500 # Safety limit
LOG = '/tmp/mt_detail_live.txt'
def get_remaining():
try:
conn = psycopg2.connect(DB_URL, connect_timeout=8)
cur = conn.cursor()
cur.execute('SELECT count(*) FROM mt_bulk_staging WHERE draught IS NULL OR companies_json IS NULL')
n = cur.fetchone()[0]
conn.close()
return n
except Exception as e:
return -1
def enqueue_output(pipe, q):
"""Thread that reads subprocess stdout and puts lines into a queue."""
try:
for line in iter(pipe.readline, ''):
q.put(line)
except Exception:
pass
finally:
q.put(None) # sentinel: pipe closed
def kill_processes():
"""Kill leftover Chrome processes after subprocess exits — wait 3s first."""
time.sleep(3) # Give Chrome time to close itself first
try:
os.system('taskkill /F /IM chrome.exe >NUL 2>&1')
except Exception:
pass
time.sleep(2) # Wait for chrome to fully die before next run
def main():
# Open log file for supervisor + subprocess output
log_f = open(LOG, 'w', encoding='utf-8', errors='replace', buffering=1)
def log(msg):
ts = time.strftime('%H:%M:%S')
line = f"[SUPERVISOR {ts}] {msg}"
try:
log_f.write(line + '\n')
log_f.flush()
except Exception:
pass
log(f"Starting supervisor. Script: {SCRIPT}")
log(f"Batch: {BATCH_SIZE} | Delay: {DELAY}s | Run timeout: {RUN_TIMEOUT}s | Inactivity: {INACTIVITY_MAX}s")
total_committed = 0
run = 0
while run < MAX_RUNS:
remaining = get_remaining()
if remaining == 0:
log("DONE — no more vessels to process!")
break
if remaining < 0:
log("DB connection failed — check SSH tunnel. Retrying in 15s...")
time.sleep(15)
continue
run += 1
log(f"=== Run {run} | Remaining: {remaining} ===")
cmd = [
sys.executable, '-u', SCRIPT,
'--limit', str(BATCH_SIZE),
'--delay', str(DELAY),
]
t_start = time.time()
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=os.path.dirname(SCRIPT),
text=True,
encoding='utf-8',
errors='replace',
bufsize=1,
)
# Reader thread feeds lines into queue
q = queue.Queue()
reader_thread = threading.Thread(target=enqueue_output, args=(proc.stdout, q), daemon=True)
reader_thread.start()
last_output_time = time.time()
timed_out = False
kill_reason = ''
try:
while True:
# Non-blocking get from queue with 1s timeout
try:
line = q.get(timeout=1.0)
except queue.Empty:
line = None
except Exception as e:
log(f" Queue error: {e}")
line = None
if line is None:
# Either sentinel (pipe closed) or timeout
# Check if pipe actually closed
if proc.poll() is not None:
break # Process ended normally
# Process still running — check timeouts
else:
line = line.rstrip()
last_output_time = time.time()
try:
log_f.write(line + '\n')
log_f.flush()
except Exception:
# Re-open log file if it was closed/corrupted
try:
log_f = open(LOG, 'a', encoding='utf-8', errors='replace', buffering=1)
log_f.write(line + '\n')
log_f.flush()
except Exception:
pass
# Check overall run timeout
elapsed = time.time() - t_start
if elapsed > RUN_TIMEOUT:
kill_reason = f'RUN_TIMEOUT ({elapsed:.0f}s > {RUN_TIMEOUT}s)'
timed_out = True
break
# Check inactivity timeout
inactive = time.time() - last_output_time
if inactive > INACTIVITY_MAX:
kill_reason = f'INACTIVITY ({inactive:.0f}s > {INACTIVITY_MAX}s)'
timed_out = True
break
except KeyboardInterrupt:
log("Interrupted by user")
try:
proc.terminate()
proc.wait(timeout=10)
except Exception:
pass
break
except Exception as e:
log(f" Main loop error: {e} — continuing")
try:
proc.terminate()
except Exception:
pass
if timed_out:
log(f" Killing subprocess: {kill_reason}")
try:
proc.terminate()
time.sleep(3)
if proc.poll() is None:
proc.kill()
proc.wait(timeout=10)
except Exception as e:
log(f" Kill error: {e}")
try:
kill_processes()
except Exception as e:
log(f" kill_processes error: {e}")
try:
exit_code = proc.returncode
except Exception:
exit_code = -1
elapsed = time.time() - t_start
log(f" Run {run} done: exit={exit_code} elapsed={elapsed:.0f}s timed_out={timed_out}"
+ (f" [{kill_reason}]" if timed_out else ""))
# Count committed vessels
try:
new_remaining = get_remaining()
if new_remaining >= 0 and new_remaining < remaining:
committed = remaining - new_remaining
total_committed += committed
log(f" Committed: {committed} | Total: {total_committed} | Still need: {new_remaining}")
else:
log(f" No new commits (remaining={new_remaining})")
except Exception as e:
log(f" get_remaining error: {e}")
new_remaining = remaining # Assume unchanged
if remaining > 0 and new_remaining != 0:
log(" Waiting 8s before next run...")
time.sleep(8)
log(f"Supervisor done. {total_committed} vessels committed across {run} runs.")
log_f.close()
if __name__ == '__main__':
main()