montana/Montana-Protocol/Code/scripts/montana-cascade-add

142 lines
7.1 KiB
Plaintext
Raw Normal View History

2026-05-27 16:23:42 +03:00
#!/usr/bin/env python3
# montana-cascade-add <alias> <backend_ip>
# Universal version: auto-detects native xray vs Docker xray.
# Idempotently wires a cascade backend behind this front:
# - client (email "<alias>-cascade") on the first VLESS Reality inbound
# - outbound "<alias>-out" -> backend_ip:443 mirroring the universal key
# - routing rule user=[<alias>-cascade] -> <alias>-out
# Prints JSON result. No change => no restart.
import json, sys, subprocess, shutil, socket, time, uuid, os
NS = uuid.UUID("00000000-0000-0000-0000-0000000000ca")
def detect_xray_mode():
"""Returns (cfg_path, test_cmd, restart_cmd, xray_type)."""
native_cfg = "/usr/local/etc/xray/config.json"
docker_cfg = "/etc/montana-vpn/xray-config.json"
if os.path.exists(native_cfg) and shutil.which("xray"):
return native_cfg, ["/usr/local/bin/xray", "-test", "-c"], ["systemctl", "restart", "xray"], "native"
if os.path.exists(docker_cfg):
return docker_cfg, ["docker", "exec", "montana-xray", "xray", "-test", "-c", "/etc/xray/config.json"], ["docker", "restart", "montana-xray"], "docker"
sys.exit(json.dumps({"ok": False, "err": "no xray config found"}))
def tcp_ok(ip, port=443, t=5):
try:
with socket.create_connection((ip, port), t): return True
except OSError: return False
def ob_sig(o):
vs = o["settings"]["vnext"][0]; rs = o["streamSettings"]["realitySettings"]
return (vs["address"], vs["users"][0]["id"], rs["serverName"], rs["publicKey"], rs["shortId"])
def main():
alias = sys.argv[1].strip().lower(); ip = sys.argv[2].strip()
email = alias + "-cascade"; out_tag = alias + "-out"
cfg_path, test_cmd, restart_cmd, xray_type = detect_xray_mode()
cfg = json.load(open(cfg_path))
# find any VLESS Reality inbound
ib = None
for i in cfg["inbounds"]:
if i.get("protocol") == "vless" and "realitySettings" in i.get("streamSettings", {}):
ib = i; break
if not ib:
print(json.dumps({"ok": False, "err": "no VLESS Reality inbound found"})); sys.exit(1)
clients = ib["settings"]["clients"]
tmpl = next((o for o in cfg["outbounds"] if o.get("protocol") == "vless" and o.get("tag", "").endswith("-out")), None)
# if no template outbound, build one from inbound settings
if tmpl:
tvs = tmpl["settings"]["vnext"][0]; trs = tmpl["streamSettings"]["realitySettings"]
uni_uuid = tvs["users"][0]["id"]
flow = tvs["users"][0].get("flow", "")
else:
# derive from inbound
uni_uuid = clients[0]["id"] if clients else "e6d355e2-2d79-4c96-a373-3b0e6b6f4b0d"
rs = ib["streamSettings"]["realitySettings"]
trs = {"serverName": rs.get("dest", rs.get("serverNames", ["www.googletagmanager.com"])[0]) if isinstance(rs.get("dest"), str) else "www.googletagmanager.com",
"publicKey": "", # will be derived
"shortId": rs.get("shortIds", [""])[0],
"fingerprint": "chrome"}
# derive public key from private key
pk = rs.get("privateKey", "")
if pk:
try:
r = subprocess.run(["xray", "x25519", "-i", pk] if xray_type == "native"
else ["docker", "exec", "montana-xray", "xray", "x25519", "-i", pk],
capture_output=True, text=True, timeout=10)
for line in r.stdout.splitlines():
if "ublic" in line or "Password" in line:
trs["publicKey"] = line.split(": ")[-1].strip(); break
except: pass
flow = clients[0].get("flow", "") if clients else ""
target_sig = (ip, uni_uuid, trs.get("serverName",""), trs.get("publicKey",""), trs.get("shortId",""))
front_to_backend = tcp_ok(ip)
cid = next((c["id"] for c in clients if c.get("email") == email), None)
if not cid:
cid = str(uuid.uuid5(NS, alias))
changed = False
if not any(c.get("email") == email for c in clients):
clients.append({"id": cid, "email": email, "flow": ""}); changed = True
new_ob = {"tag": out_tag, "protocol": "vless",
"settings": {"vnext": [{"address": ip, "port": 443,
"users": [{"id": uni_uuid, "encryption": "none", "flow": flow}]}]},
"streamSettings": {"network": "tcp", "security": "reality",
"realitySettings": {"serverName": trs.get("serverName", "www.googletagmanager.com"),
"publicKey": trs.get("publicKey", ""),
"shortId": trs.get("shortId", ""),
"fingerprint": trs.get("fingerprint", "chrome")}}}
obs = cfg["outbounds"]; ex = next((o for o in obs if o.get("tag") == out_tag), None)
if ex is None:
obs.append(new_ob); changed = True
elif ob_sig(ex) != target_sig:
obs[obs.index(ex)] = new_ob; changed = True
rules = cfg.setdefault("routing", {}).setdefault("rules", [])
if not any(r.get("outboundTag") == out_tag and email in (r.get("user") or []) for r in rules):
rules.insert(0, {"type": "field", "user": [email], "outboundTag": out_tag}); changed = True
res = {"ok": True, "uuid": cid, "email": email, "xray_type": xray_type,
"front_to_backend": front_to_backend, "changed": changed, "restarted": False}
if changed:
ts = time.strftime("%Y%m%d-%H%M%S"); bak = cfg_path + ".bak-" + ts
shutil.copy2(cfg_path, bak)
# xray determines format by file extension — tmp must end with .json
tmp_dir = os.path.dirname(cfg_path)
tmp = os.path.join(tmp_dir, f"cascade-test-{ts}.json")
json.dump(cfg, open(tmp, "w"), indent=2, ensure_ascii=False)
if xray_type == "native":
t = subprocess.run(test_cmd + [tmp], capture_output=True, text=True)
else:
# for docker: copy tmp into container, test, then replace host file
subprocess.run(["docker", "cp", tmp, "montana-xray:/tmp/xray-test.json"], capture_output=True)
t = subprocess.run(["docker", "exec", "montana-xray", "xray", "-test", "-c", "/tmp/xray-test.json"],
capture_output=True, text=True)
if t.returncode != 0:
os.remove(tmp); res.update(ok=False, err="xray test failed: " + t.stderr[-400:])
print(json.dumps(res, ensure_ascii=False)); sys.exit(1)
os.replace(tmp, cfg_path)
subprocess.run(restart_cmd, capture_output=True, text=True); time.sleep(2)
if xray_type == "native":
act = subprocess.run(["systemctl", "is-active", "xray"], capture_output=True, text=True).stdout.strip()
else:
r = subprocess.run(["docker", "inspect", "-f", "{{.State.Running}}", "montana-xray"],
capture_output=True, text=True)
act = "active" if r.stdout.strip() == "true" else "inactive"
res.update(restarted=True, xray_active=act)
if act != "active":
shutil.copy2(bak, cfg_path)
subprocess.run(restart_cmd, capture_output=True, text=True)
res.update(ok=False, err="xray not active after restart; rolled back")
print(json.dumps(res, ensure_ascii=False))
if __name__ == "__main__":
main()