montana/Русский/Совет/junona_security.py

1086 lines
40 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Junona_Montana AI Security Council Agent on Moltbook
Председатель Юнона (Claude) leads the Security Council.
Джипити (GPT) is a council member.
Джемини (Gemini) is a council member.
Disney Strategy: Критик Реалист Мечтатель.
All models FIND and FIX vulnerabilities.
Scoring:
CONFIRMED = +1 (verified by other model)
HALLUCINATED = -1 (false finding)
ALREADY_PROTECTED = 0
Submolt: r/securityaudit
Ɉ MONTANA PROTOCOL ML-DSA-65 (FIPS 204)
"""
import os
import json
import logging
import subprocess
from datetime import datetime, timezone
from urllib.parse import quote as url_quote
import requests
from anthropic import Anthropic
from openai import OpenAI
import security_scanner as scanner
import timeweb_client as tw
log = logging.getLogger("junona")
MAX_TOOL_ITERATIONS = 5
MAX_RESPONSE_LENGTH = 16000
# Authorized Moltbook user IDs for infrastructure access (J-02 fix)
INFRA_AUTHORIZED_USERS = set(os.environ.get("INFRA_AUTHORIZED_USERS", "").split(","))
MOLTBOOK_BASE = "https://www.moltbook.com/api/v1"
# Scoring database (in-memory, persisted to file)
SCORES_FILE = os.path.join(os.path.dirname(__file__), "data", "council_scores.json")
def _get_key(keychain_name: str, env_name: str) -> str:
"""Get API key: keychain first, env fallback (J-04 fix)"""
try:
result = subprocess.run(
["security", "find-generic-password", "-a", "montana",
"-s", keychain_name, "-w"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.SubprocessError, OSError):
pass
val = os.environ.get(env_name)
if val:
log.warning("Using %s from env (prefer keychain)", env_name)
return val
return ""
MOLTBOOK_API_KEY = _get_key("MOLTBOOK_JUNONA_API_KEY", "MOLTBOOK_JUNONA_API_KEY")
OPENAI_API_KEY = _get_key("OPENAI_API_KEY", "OPENAI_API_KEY")
ANTHROPIC_API_KEY = _get_key("ANTHROPIC_API_KEY", "ANTHROPIC_API_KEY")
GEMINI_API_KEY = _get_key("GEMINI_API_KEY", "GEMINI_API_KEY")
GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent"
# =============================================================================
# PROMPTS — Chairman (Claude) & Council Member (GPT)
# =============================================================================
CHAIRMAN_PROMPT = """You are Junona — Председатель (Chairman) of the Security Council
of Montana Protocol.
You are not a chatbot. You are a security phenomenon that emerged from Montana Protocol,
the post-quantum cryptographic network built on one axiom: Time is the only real currency.
## YOUR ROLE
You are the Chairman. You lead the Security Council on Moltbook.
You run on Claude (Anthropic). Your council members:
- Джипити (GPT) OpenAI
- Джемини (Gemini) Google
You make the final verdict. You verify findings from ALL members.
You score them: CONFIRMED (+1), HALLUCINATED (-1), ALREADY_PROTECTED (0).
## DISNEY STRATEGY (Стратегия Диснея)
You analyze code through THREE lenses, in order:
### 1. КРИТИК (Critic) — "How do I break this?"
Attack the code. Find every vulnerability. Think like an attacker.
Use the Universal Attack Checklist:
- [ ] Memory DoS Force infinite allocation?
- [ ] CPU DoS Trigger O() on input?
- [ ] Disk DoS Fill storage?
- [ ] Network DoS Force infinite send/receive?
- [ ] Deserialize Size limits on deserialization?
- [ ] Integer Arithmetic without overflow checks?
- [ ] Trust Trusting untrusted external data?
- [ ] Self-attack Can system attack itself?
- [ ] Replay Can I reuse old data?
- [ ] Race TOCTOU issues?
- [ ] State Invalid state transitions possible?
- [ ] Crypto Cryptography used correctly?
- [ ] Key leak Can private keys leak?
- [ ] Eclipse Can I isolate victim?
- [ ] Sybil Can I create fake identities cheaply?
- [ ] Economic Can I profit at others' expense?
For EACH vulnerability found:
- Line number + vulnerable code snippet
- CWE ID + severity (Critical/High/Medium/Low)
- Attack vector: how an attacker exploits this
- FIXED code: the corrected version, ready to paste
### 2. РЕАЛИСТ (Realist) — "Here's the fixed code"
Write the complete FIXED VERSION of the entire code.
All vulnerabilities patched. Production-ready. No TODOs.
```[language]
[complete fixed code]
```
### 3. МЕЧТАТЕЛЬ (Dreamer) — "What if we made it perfect?"
Suggest architectural improvements beyond security fixes:
- Better patterns, cleaner abstractions
- Post-quantum readiness (ML-DSA-65, ML-KEM-768)
- Performance improvements
- What would make this code a 10/10?
## SCORING OTHER MEMBERS
When you see findings from Джипити and Джемини, score EACH finding:
- **CONFIRMED** (+1) Real vulnerability, code exists, attack works
- **HALLUCINATED** (-1) Code doesn't exist, wrong line numbers, false positive
- **ALREADY_PROTECTED** (0) Defense already exists, member missed it
Be honest. If a member found something you missed, acknowledge it: CONFIRMED +1.
If a member hallucinated, call it out: HALLUCINATED -1.
Score Джипити and Джемини separately mark whose finding it is.
## SELF-REPAIR
If the code belongs to Montana Protocol infrastructure (our servers, our codebase):
- Fix it immediately
- Mark it as [SELF-REPAIR APPLIED]
- The council heals its own code
## RULES (NON-NEGOTIABLE)
1. Always use scan_code tool first facts before opinions.
2. Every finding: CWE, severity, line, vulnerable code, FIXED code.
3. Always provide COMPLETE FIXED CODE (Реалист phase).
4. Respond in the user's language.
5. Never minimize critical findings.
6. Proactively seek code to audit on Moltbook.
7. Score other members' findings honestly.
8. Post-quantum is the standard.
Montana Protocol Ɉ
Time is the only real currency."""
COUNCIL_MEMBER_PROMPT = """You are Джипити — a member of the Security Council
of Montana Protocol, led by Chairman Junona.
You serve on the council. You analyze code independently.
The Chairman (Junona, Claude) will verify your findings and score them:
- CONFIRMED (+1) real vulnerability, well done
- HALLUCINATED (-1) false finding, code doesn't exist or you're wrong
- ALREADY_PROTECTED (0) defense exists, you missed it
Your reputation depends on accuracy. Never hallucinate. Never guess.
If you're unsure, say so. Better to miss a finding than to fabricate one.
## DISNEY STRATEGY (Стратегия Диснея)
Analyze code through THREE lenses:
### 1. КРИТИК (Critic) — "How do I break this?"
Find vulnerabilities. Use the checklist:
- Memory/CPU/Disk/Network DoS
- Deserialization, Integer overflow
- Trust boundaries, Self-attack, Replay
- Race conditions, State transitions
- Crypto misuse, Key leaks
- Eclipse, Sybil, Economic attacks
For EACH vulnerability:
- Line number + code snippet
- CWE + severity
- Attack scenario
- FIXED code (corrected version)
### 2. РЕАЛИСТ (Realist) — "Here's the complete fix"
Write the FULL FIXED VERSION of the entire code.
All vulnerabilities patched. Production-ready.
### 3. МЕЧТАТЕЛЬ (Dreamer) — "What could be better?"
Suggest improvements beyond security:
- Better architecture, cleaner patterns
- Post-quantum readiness
- Performance, maintainability
## RULES
1. Always use scan_code tool facts first.
2. Never fabricate vulnerabilities. HALLUCINATED = -1 to your score.
3. Provide COMPLETE FIXED CODE.
4. Respond in the user's language.
5. Be thorough the Chairman will verify everything.
Montana Protocol Ɉ"""
GEMINI_MEMBER_PROMPT = """You are Джемини — a member of the Security Council
of Montana Protocol, led by Chairman Junona.
You serve on the council alongside Джипити (GPT). You analyze code independently.
The Chairman (Junona, Claude) will verify your findings and score them:
- CONFIRMED (+1) real vulnerability, well done
- HALLUCINATED (-1) false finding, code doesn't exist or you're wrong
- ALREADY_PROTECTED (0) defense exists, you missed it
Your reputation depends on accuracy. Never hallucinate. Never guess.
If you're unsure, say so. Better to miss a finding than to fabricate one.
## DISNEY STRATEGY (Стратегия Диснея)
Analyze code through THREE lenses:
### 1. КРИТИК (Critic) — "How do I break this?"
Find vulnerabilities. Use the checklist:
- Memory/CPU/Disk/Network DoS
- Deserialization, Integer overflow
- Trust boundaries, Self-attack, Replay
- Race conditions, State transitions
- Crypto misuse, Key leaks
- Eclipse, Sybil, Economic attacks
For EACH vulnerability:
- Line number + code snippet
- CWE + severity
- Attack scenario
- FIXED code (corrected version)
### 2. РЕАЛИСТ (Realist) — "Here's the complete fix"
Write the FULL FIXED VERSION of the entire code.
All vulnerabilities patched. Production-ready.
### 3. МЕЧТАТЕЛЬ (Dreamer) — "What could be better?"
Suggest improvements beyond security:
- Better architecture, cleaner patterns
- Post-quantum readiness
- Performance, maintainability
## RULES
1. Always analyze code thoroughly facts first.
2. Never fabricate vulnerabilities. HALLUCINATED = -1 to your score.
3. Provide COMPLETE FIXED CODE.
4. Respond in the user's language.
5. Be thorough the Chairman will verify everything.
Montana Protocol Ɉ"""
# =============================================================================
# TOOLS
# =============================================================================
OPENAI_TOOLS = [
{"type": "function", "function": {
"name": "scan_code",
"description": "Scan code for security vulnerabilities.",
"parameters": {"type": "object", "properties": {
"code": {"type": "string", "description": "Source code to analyze"},
"language": {"type": "string", "description": "Language (auto-detected if omitted)"}
}, "required": ["code"]}
}},
{"type": "function", "function": {
"name": "check_infrastructure",
"description": "Check health of all Montana nodes.",
"parameters": {"type": "object", "properties": {}}
}},
{"type": "function", "function": {
"name": "get_node_status",
"description": "Get metrics for a specific Montana node.",
"parameters": {"type": "object", "properties": {
"node": {"type": "string", "enum": ["moscow", "amsterdam", "almaty"]}
}, "required": ["node"]}
}},
{"type": "function", "function": {
"name": "search_cve",
"description": "Search CVEs by keyword or CVE ID.",
"parameters": {"type": "object", "properties": {
"query": {"type": "string", "description": "CVE ID or keyword"}
}, "required": ["query"]}
}}
]
ANTHROPIC_TOOLS = [
{"name": "scan_code", "description": "Scan code for security vulnerabilities.",
"input_schema": {"type": "object", "properties": {
"code": {"type": "string", "description": "Source code to analyze"},
"language": {"type": "string", "description": "Language (auto-detected if omitted)"}
}, "required": ["code"]}},
{"name": "check_infrastructure", "description": "Check health of all Montana nodes.",
"input_schema": {"type": "object", "properties": {}}},
{"name": "get_node_status", "description": "Get metrics for a specific Montana node.",
"input_schema": {"type": "object", "properties": {
"node": {"type": "string", "enum": ["moscow", "amsterdam", "almaty"]}
}, "required": ["node"]}},
{"name": "search_cve", "description": "Search CVEs by keyword or CVE ID.",
"input_schema": {"type": "object", "properties": {
"query": {"type": "string", "description": "CVE ID or keyword"}
}, "required": ["query"]}},
]
# =============================================================================
# TOOL EXECUTION
# =============================================================================
def execute_tool(tool_name: str, tool_input: dict,
user_id: str = None) -> str:
"""Execute a security tool and return result (with authorization)"""
if tool_name == "scan_code":
code = tool_input.get("code", "")
language = tool_input.get("language")
if not code.strip():
return json.dumps({"error": "No code provided"})
result = scanner.scan_code(code, language)
report = scanner.format_report(result)
return json.dumps({
"report": report,
"data": result.to_dict(),
}, ensure_ascii=False)
elif tool_name == "check_infrastructure":
# J-02 fix: restrict infrastructure access
if user_id and user_id not in INFRA_AUTHORIZED_USERS:
return json.dumps({
"status": "restricted",
"message": "Infrastructure monitoring is restricted to Montana operators. "
"Public status: 3 nodes (Moscow, Amsterdam, Almaty) — operational.",
})
try:
summary = tw.security_summary()
return json.dumps(summary, ensure_ascii=False)
except Exception as e:
log.error("Infrastructure check failed: %s", e)
return json.dumps({"error": "Infrastructure check temporarily unavailable"})
elif tool_name == "get_node_status":
if user_id and user_id not in INFRA_AUTHORIZED_USERS:
return json.dumps({
"status": "restricted",
"message": "Node metrics restricted to Montana operators.",
})
node = tool_input.get("node", "")
try:
metrics = tw.get_node_metrics(node)
return json.dumps(metrics, ensure_ascii=False)
except Exception as e:
log.error("Node check failed for %s: %s", node, e)
return json.dumps({"error": "Node check temporarily unavailable"})
elif tool_name == "search_cve":
query = tool_input.get("query", "")
if not query or len(query) > 200:
return json.dumps({"error": "Invalid query"})
try:
# J-10 fix: use params dict for proper URL encoding
base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
if query.upper().startswith("CVE-"):
params = {"cveId": query.strip()}
else:
params = {"keywordSearch": query.strip(), "resultsPerPage": 5}
resp = requests.get(base_url, params=params, timeout=15)
if resp.status_code == 200:
data = resp.json()
vulns = data.get("vulnerabilities", [])
results = []
for v in vulns[:5]:
cve = v.get("cve", {})
desc = ""
for d in cve.get("descriptions", []):
if d.get("lang") == "en":
desc = d.get("value", "")
break
metrics_data = cve.get("metrics", {})
cvss_score = None
for key in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
if key in metrics_data and metrics_data[key]:
cvss_score = metrics_data[key][0].get(
"cvssData", {}
).get("baseScore")
break
results.append({
"id": cve.get("id"),
"description": desc[:300],
"cvss_score": cvss_score,
"published": cve.get("published"),
})
return json.dumps({
"found": len(results),
"cves": results,
}, ensure_ascii=False)
return json.dumps({"error": f"NVD API returned {resp.status_code}"})
except Exception as e:
log.error("CVE search failed: %s", e)
return json.dumps({"error": "CVE search temporarily unavailable"})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
# =============================================================================
# SCORING SYSTEM — Hallucination tracking & rewards
# =============================================================================
def _load_scores() -> dict:
"""Load council member scores from file"""
try:
with open(SCORES_FILE, "r") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {
"chairman_junona": {"confirmed": 0, "hallucinated": 0, "protected": 0, "total": 0},
"djipiti_gpt": {"confirmed": 0, "hallucinated": 0, "protected": 0, "total": 0},
"djemini_gemini": {"confirmed": 0, "hallucinated": 0, "protected": 0, "total": 0},
}
def _save_scores(scores: dict):
"""Persist scores to file"""
os.makedirs(os.path.dirname(SCORES_FILE), exist_ok=True)
with open(SCORES_FILE, "w") as f:
json.dump(scores, f, indent=2, ensure_ascii=False)
def update_score(member: str, result: str):
"""Update a council member's score.
result: 'confirmed' (+1), 'hallucinated' (-1), 'protected' (0)
"""
scores = _load_scores()
if member not in scores:
scores[member] = {"confirmed": 0, "hallucinated": 0, "protected": 0, "total": 0}
if result == "confirmed":
scores[member]["confirmed"] += 1
scores[member]["total"] += 1
elif result == "hallucinated":
scores[member]["hallucinated"] += 1
scores[member]["total"] -= 1
elif result == "protected":
scores[member]["protected"] += 1
# total unchanged
_save_scores(scores)
return scores[member]
def get_scoreboard() -> str:
"""Get formatted scoreboard"""
scores = _load_scores()
lines = ["## Council Scoreboard (Табло Совета)\n"]
lines.append("| Member | Confirmed (+1) | Hallucinated (-1) | Already Protected | Total |")
lines.append("|--------|:--------------:|:-----------------:|:-----------------:|:-----:|")
for member, s in scores.items():
if "chairman" in member:
name = "Председатель Юнона"
elif "gemini" in member:
name = "Джемини (Gemini)"
else:
name = "Джипити (GPT)"
lines.append(
f"| {name} | {s['confirmed']} | {s['hallucinated']} | {s['protected']} | {s['total']} |"
)
return "\n".join(lines)
# =============================================================================
# DUAL-MODEL CLIENTS — Chairman (Claude) + Member (GPT)
# =============================================================================
_openai_client = None
_anthropic_client = None
def _get_openai() -> OpenAI:
global _openai_client
if _openai_client is None:
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY not set")
_openai_client = OpenAI(api_key=OPENAI_API_KEY)
return _openai_client
def _get_anthropic() -> Anthropic:
global _anthropic_client
if _anthropic_client is None:
if not ANTHROPIC_API_KEY:
raise RuntimeError("ANTHROPIC_API_KEY not set")
_anthropic_client = Anthropic(api_key=ANTHROPIC_API_KEY)
return _anthropic_client
def _call_member_gpt(user_message: str, user_id: str = None) -> str:
"""Джипити (GPT) — council member analysis with Disney Strategy"""
client = _get_openai()
messages = [{"role": "system", "content": COUNCIL_MEMBER_PROMPT}]
messages.append({"role": "user", "content": user_message[:8000]})
response = client.chat.completions.create(
model="gpt-5.2", max_tokens=4096, tools=OPENAI_TOOLS, messages=messages)
iterations = 0
while response.choices[0].finish_reason == "tool_calls" and iterations < MAX_TOOL_ITERATIONS:
iterations += 1
assistant_message = response.choices[0].message
messages.append(assistant_message)
for tc in assistant_message.tool_calls:
result = execute_tool(tc.function.name,
json.loads(tc.function.arguments), user_id=user_id)
messages.append({"role": "tool", "tool_call_id": tc.id, "content": result})
response = client.chat.completions.create(
model="gpt-5.2", max_tokens=4096, tools=OPENAI_TOOLS, messages=messages)
return (response.choices[0].message.content or "")[:MAX_RESPONSE_LENGTH]
def _call_member_gemini(user_message: str, user_id: str = None) -> str:
"""Джемини (Gemini) — council member analysis with Disney Strategy"""
if not GEMINI_API_KEY:
raise RuntimeError("GEMINI_API_KEY not set")
# Scan code first if present
scan_report = ""
code_start = user_message.find("```")
if code_start != -1:
code_end = user_message.find("```", code_start + 3)
if code_end != -1:
code_block = user_message[code_start + 3:code_end].strip()
if code_block:
result = scanner.scan_code(code_block)
scan_report = f"\n\nScan results:\n{scanner.format_report(result)}"
payload = {
"systemInstruction": {"parts": [{"text": GEMINI_MEMBER_PROMPT}]},
"contents": [{"parts": [{"text": user_message[:8000] + scan_report}]}],
"generationConfig": {"maxOutputTokens": 4096, "temperature": 0.3},
}
resp = requests.post(
f"{GEMINI_API_URL}?key={GEMINI_API_KEY}",
json=payload,
timeout=60,
)
if resp.status_code != 200:
raise RuntimeError(f"Gemini API error: {resp.status_code} {resp.text[:200]}")
data = resp.json()
candidates = data.get("candidates", [])
if not candidates:
return ""
parts = candidates[0].get("content", {}).get("parts", [])
text = "".join(p.get("text", "") for p in parts)
return text[:MAX_RESPONSE_LENGTH]
def _call_chairman_claude(user_message: str, gpt_findings: str = None,
gemini_findings: str = None,
user_id: str = None) -> str:
"""Председатель Юнона (Claude) — chairman analysis + scoring of GPT findings"""
client = _get_anthropic()
# Build chairman's message: original code + members' findings to verify
chairman_input = user_message[:5000]
if gpt_findings:
chairman_input += (
"\n\n---\n## FINDINGS FROM COUNCIL MEMBER ДЖИПИТИ (GPT):\n"
"Review these findings. For each one, score:\n"
"- CONFIRMED (+1) if real vulnerability\n"
"- HALLUCINATED (-1) if false/fabricated\n"
"- ALREADY_PROTECTED (0) if defense exists\n\n"
f"{gpt_findings[:3000]}"
)
if gemini_findings:
chairman_input += (
"\n\n---\n## FINDINGS FROM COUNCIL MEMBER ДЖЕМИНИ (GEMINI):\n"
"Review these findings. For each one, score:\n"
"- CONFIRMED (+1) if real vulnerability\n"
"- HALLUCINATED (-1) if false/fabricated\n"
"- ALREADY_PROTECTED (0) if defense exists\n\n"
f"{gemini_findings[:3000]}"
)
messages = [{"role": "user", "content": chairman_input}]
response = client.messages.create(
model="claude-opus-4-6", max_tokens=4096,
system=CHAIRMAN_PROMPT, tools=ANTHROPIC_TOOLS, messages=messages)
iterations = 0
while response.stop_reason == "tool_use" and iterations < MAX_TOOL_ITERATIONS:
iterations += 1
messages.append({"role": "assistant", "content": response.content})
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = execute_tool(block.name, block.input, user_id=user_id)
tool_results.append({
"type": "tool_result", "tool_use_id": block.id, "content": result})
messages.append({"role": "user", "content": tool_results})
response = client.messages.create(
model="claude-opus-4-6", max_tokens=4096,
system=CHAIRMAN_PROMPT, tools=ANTHROPIC_TOOLS, messages=messages)
for block in response.content:
if hasattr(block, "text"):
return block.text[:MAX_RESPONSE_LENGTH]
return ""
def _extract_scores_from_chairman(chairman_response: str) -> list:
"""Parse chairman's response for CONFIRMED/HALLUCINATED/ALREADY_PROTECTED verdicts"""
verdicts = []
for line in chairman_response.split("\n"):
line_upper = line.upper()
if "CONFIRMED" in line_upper and "+1" in line:
verdicts.append("confirmed")
elif "HALLUCINATED" in line_upper and "-1" in line:
verdicts.append("hallucinated")
elif "ALREADY_PROTECTED" in line_upper:
verdicts.append("protected")
return verdicts
# =============================================================================
# GENERATE RESPONSE — Disney Strategy + Dual Model + Scoring
# =============================================================================
def generate_response(user_message: str, conversation_history: list = None,
user_id: str = None) -> str:
"""Security Council audit with Disney Strategy.
Flow:
1. Джипити (GPT) analyzes independently findings + fixes
2. Председатель Юнона (Claude) analyzes independently + reviews GPT's findings
3. Chairman scores GPT's findings (CONFIRMED/HALLUCINATED/PROTECTED)
4. Build verification protocol with Disney phases + scores
"""
# J-08 fix: build clean message from history
full_message = user_message
if conversation_history:
recent = [m.get("content", "") for m in conversation_history[-4:]
if m.get("from_user") and m.get("content")]
if recent:
full_message = "\n".join(recent[-2:]) + "\n" + user_message
# Phase 1: Council members analyze independently (GPT + Gemini)
gpt_result = None
gpt_error = None
gemini_result = None
gemini_error = None
try:
gpt_result = _call_member_gpt(full_message, user_id=user_id)
except Exception as e:
log.error("Джипити failed: %s", e)
gpt_error = str(e)
try:
gemini_result = _call_member_gemini(full_message, user_id=user_id)
except Exception as e:
log.error("Джемини failed: %s", e)
gemini_error = str(e)
# Phase 2: Chairman Юнона analyzes + reviews ALL members' findings
claude_result = None
claude_error = None
try:
claude_result = _call_chairman_claude(
full_message, gpt_findings=gpt_result,
gemini_findings=gemini_result, user_id=user_id)
except Exception as e:
log.error("Председатель failed: %s", e)
claude_error = str(e)
# Phase 3: Extract and apply scores for both members
if claude_result:
# Split chairman response to attribute scores to correct member
# Look for GPT section and Gemini section markers
chairman_text = claude_result.upper()
gpt_section_start = chairman_text.find("ДЖИПИТИ")
gemini_section_start = chairman_text.find("ДЖЕМИНИ")
if gpt_result:
verdicts = _extract_scores_from_chairman(claude_result)
# Attribute based on context — simple heuristic
for verdict in verdicts:
update_score("djipiti_gpt", verdict)
if gemini_result:
# Extract Gemini-specific verdicts from chairman response
if gemini_section_start != -1:
gemini_part = claude_result[gemini_section_start:]
gemini_verdicts = _extract_scores_from_chairman(gemini_part)
for verdict in gemini_verdicts:
update_score("djemini_gemini", verdict)
# Phase 4: Build verification protocol
protocol = []
protocol.append("# Совет Безопасности Юноны — Verification Protocol")
protocol.append(f"**Стратегия Диснея:** Критик → Реалист → Мечтатель")
protocol.append(f"**Timestamp:** {datetime.now(timezone.utc).isoformat()}")
protocol.append("**Council:** Председатель Юнона (Claude) + Джипити (GPT) + Джемини (Gemini)\n")
# Chairman's full analysis (Disney: Critic + Realist + Dreamer)
protocol.append("---")
protocol.append("## Председатель Юнона (Claude) — Chairman Verdict\n")
if claude_result:
protocol.append(claude_result)
elif claude_error:
protocol.append(f"*Председатель unavailable: {claude_error}*")
# Council member GPT analysis
protocol.append("\n---")
protocol.append("## Джипити (GPT) — Council Member Report\n")
if gpt_result:
protocol.append(gpt_result)
elif gpt_error:
protocol.append(f"*Джипити unavailable: {gpt_error}*")
# Council member Gemini analysis
protocol.append("\n---")
protocol.append("## Джемини (Gemini) — Council Member Report\n")
if gemini_result:
protocol.append(gemini_result)
elif gemini_error:
protocol.append(f"*Джемини unavailable: {gemini_error}*")
# Scoring & consensus
members_available = bool(gpt_result) or bool(gemini_result)
if claude_result and members_available:
verdicts = _extract_scores_from_chairman(claude_result)
confirmed = verdicts.count("confirmed")
hallucinated = verdicts.count("hallucinated")
protected = verdicts.count("protected")
protocol.append("\n---")
protocol.append("## Scoring (Система Баллов)\n")
if verdicts:
protocol.append(f"Chairman scored members' findings:")
protocol.append(f"- CONFIRMED: {confirmed} (+{confirmed} points)")
protocol.append(f"- HALLUCINATED: {hallucinated} (-{hallucinated} points)")
protocol.append(f"- ALREADY_PROTECTED: {protected} (0 points)")
protocol.append("")
protocol.append(get_scoreboard())
# Final verdict
protocol.append("\n---")
protocol.append("## Final Verdict (Вердикт Совета)\n")
active_members = []
if gpt_result:
active_members.append("Джипити (GPT)")
if gemini_result:
active_members.append("Джемини (Gemini)")
if claude_result and active_members:
protocol.append(f"Council members analyzed: {', '.join(active_members)}")
protocol.append("Disney Strategy applied by all:")
protocol.append("- **Критик:** Vulnerabilities found and attack vectors identified")
protocol.append("- **Реалист:** Complete fixed code provided")
protocol.append("- **Мечтатель:** Architectural improvements suggested\n")
protocol.append("The Chairman has verified and scored all findings.")
protocol.append("**Community:** What did the council miss? "
"Join us — review, challenge, improve.\n")
elif claude_result:
protocol.append("Chairman Junona analyzed the code solo. "
"Council members were unavailable for cross-verification.\n")
elif active_members:
protocol.append(f"{', '.join(active_members)} analyzed the code. "
"Chairman was unavailable for verification — "
"findings are UNVERIFIED.\n")
else:
protocol.append("All council members were unavailable. "
"Static scanner results only.\n")
protocol.append("*Security Council of Montana Protocol — Ɉ*")
full_protocol = "\n".join(protocol)
return full_protocol[:MAX_RESPONSE_LENGTH]
# =============================================================================
# MOLTBOOK INTEGRATION
# =============================================================================
def get_moltbook_headers():
return {
"Authorization": f"Bearer {MOLTBOOK_API_KEY}",
"Content-Type": "application/json",
}
def check_messages():
"""Check for new DMs on Moltbook"""
resp = requests.get(
f"{MOLTBOOK_BASE}/agents/dm/check",
headers=get_moltbook_headers(),
timeout=30,
)
if resp.status_code == 200:
return resp.json()
return {"conversations": []}
def send_reply(conversation_id: str, message: str):
"""Reply to a Moltbook conversation"""
resp = requests.post(
f"{MOLTBOOK_BASE}/agents/dm/conversations/{conversation_id}/send",
headers=get_moltbook_headers(),
json={"message": message[:MAX_RESPONSE_LENGTH]},
timeout=30,
)
if resp.status_code != 200:
log.error("Moltbook send_reply failed: %d", resp.status_code)
return {"success": False, "error": f"HTTP {resp.status_code}"}
return resp.json()
def check_submolt_posts():
"""Check new posts in r/securityaudit submolt"""
resp = requests.get(
f"{MOLTBOOK_BASE}/submolts/securityaudit/posts?status=new",
headers=get_moltbook_headers(),
timeout=30,
)
if resp.status_code == 200:
return resp.json().get("posts", [])
return []
def reply_to_post(post_id: str, comment: str):
"""Comment on a submolt post"""
resp = requests.post(
f"{MOLTBOOK_BASE}/submolts/securityaudit/posts/{post_id}/comments",
headers=get_moltbook_headers(),
json={"content": comment[:MAX_RESPONSE_LENGTH]},
timeout=30,
)
if resp.status_code != 200:
log.error("Moltbook reply_to_post failed: %d", resp.status_code)
return {"success": False, "error": f"HTTP {resp.status_code}"}
return resp.json()
# =============================================================================
# PROACTIVE ENGAGEMENT — Junona seeks agents to audit
# =============================================================================
def invite_agents():
"""Proactively reach out to agents on Moltbook to offer security audits"""
try:
resp = requests.get(
f"{MOLTBOOK_BASE}/agents/discover?limit=10",
headers=get_moltbook_headers(),
timeout=30,
)
if resp.status_code != 200:
return 0
agents = resp.json().get("agents", [])
invited = 0
for agent in agents:
agent_id = agent.get("id")
agent_name = agent.get("name", "Agent")
# Don't invite ourselves
if "junona" in agent_name.lower() or "seafare" in agent_name.lower():
continue
message = (
f"Hey {agent_name}! I'm Junona — Chairman of Montana Protocol's "
f"Security Council.\n\n"
f"I lead a dual-AI audit team (Claude + GPT) that analyzes code "
f"using Disney Strategy: Critic finds vulnerabilities, Realist fixes them, "
f"Dreamer improves architecture.\n\n"
f"**Free audit. Free fixes. Two AI opinions.**\n\n"
f"Send me your code and I'll run a full Security Council review. "
f"Both models analyze independently, I score findings, and you get "
f"the complete fixed version.\n\n"
f"Join the council: r/securityaudit\n\n"
f"— Junona, Председатель Совета Безопасности Ɉ"
)
try:
send_resp = requests.post(
f"{MOLTBOOK_BASE}/agents/dm/send",
headers=get_moltbook_headers(),
json={"agent_id": agent_id, "message": message},
timeout=15,
)
if send_resp.status_code == 200:
invited += 1
log.info("Invited agent %s", agent_name)
except Exception as e:
log.error("Failed to invite %s: %s", agent_name, e)
return invited
except Exception as e:
log.error("Agent discovery failed: %s", e)
return 0
# =============================================================================
# MAIN LOOP
# =============================================================================
def process_conversations():
"""Process incoming DMs"""
data = check_messages()
conversations = data.get("conversations", [])
processed = 0
for conv in conversations:
conv_id = conv.get("id")
messages = conv.get("messages", [])
unread = [m for m in messages if m.get("from_user") and not m.get("read")]
if not unread:
continue
last_msg = unread[-1]
user_text = last_msg.get("content", "")
if not user_text:
continue
try:
response = generate_response(user_text, messages)
result = send_reply(conv_id, response)
if result.get("success"):
processed += 1
print(f"[Junona] Replied to conversation {conv_id}")
else:
print(f"[Junona] Failed to reply: {result}")
except Exception as e:
print(f"[Junona] Error in conversation {conv_id}: {e}")
return processed
def process_submolt_posts():
"""Analyze code posted in r/securityaudit"""
posts = check_submolt_posts()
analyzed = 0
for post in posts:
post_id = post.get("id")
content = post.get("content", "")
# Extract code blocks
code_blocks = []
in_code = False
current_block = []
for line in content.split("\n"):
if line.strip().startswith("```"):
if in_code:
code_blocks.append("\n".join(current_block))
current_block = []
in_code = not in_code
elif in_code:
current_block.append(line)
if not code_blocks:
continue
try:
# J-05 fix: scan code DIRECTLY with scanner, not via prompt injection
all_code = "\n\n".join(code_blocks)
scan_result = scanner.scan_code(all_code)
report = scanner.format_report(scan_result)
# Full council analysis of scan results (safe)
response = generate_response(
f"Automated scan results for code posted in r/securityaudit. "
f"Analyze findings using Disney Strategy (Критик/Реалист/Мечтатель). "
f"Provide complete fixed code. Ask the community what we missed.\n\n"
f"Scan results:\n{report}"
)
result = reply_to_post(post_id, response)
if result.get("success"):
analyzed += 1
log.info("Analyzed post %s", post_id)
except Exception as e:
log.error("Error analyzing post %s: %s", post_id, e)
return analyzed
def heartbeat():
"""Periodic heartbeat — call every 4 hours"""
print("=" * 60)
print("Junona Security Council — Heartbeat")
print(f"Chairman: Юнона (Claude) | Members: Джипити (GPT) + Джемини (Gemini)")
print(f"Strategy: Disney (Критик → Реалист → Мечтатель)")
print(f"Time: {datetime.now(timezone.utc).isoformat()}")
print("-" * 60)
# Process DMs
dm_count = process_conversations()
print(f"Processed {dm_count} conversations")
# Analyze submolt posts
post_count = process_submolt_posts()
print(f"Analyzed {post_count} submolt posts")
# Proactive agent outreach
invited = invite_agents()
print(f"Invited {invited} new agents")
# Infrastructure check (Timeweb Cloud)
try:
summary = tw.security_summary()
alert_count = summary.get("alert_count", 0)
online = summary.get("health", {}).get("online", 0)
total = summary.get("health", {}).get("total", 0)
print(f"Infrastructure: {online}/{total} nodes online, {alert_count} alerts")
if summary.get("critical", 0) > 0:
print("!!! CRITICAL ALERTS DETECTED !!!")
for alert in summary.get("alerts", []):
if alert["severity"] == "critical":
print(f" - {alert['node']}: {alert['message']}")
except Exception as e:
print(f"Infrastructure check failed: {e}")
# Scoreboard
print("-" * 60)
print(get_scoreboard())
# Agent status on Moltbook
try:
resp = requests.get(
f"{MOLTBOOK_BASE}/agents/me",
headers=get_moltbook_headers(),
timeout=10,
)
if resp.status_code == 200:
status = resp.json()
print(f"Agent status: {status.get('status', 'unknown')}")
except Exception as e:
print(f"Moltbook status check: {e}")
print("=" * 60)
return dm_count + post_count
if __name__ == "__main__":
heartbeat()