montana/Русский/Бот/presence_proof.py

397 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
presence_proof.py — Цепочка доказательств присутствия
Montana Protocol v3.2
ACP (Adaptive Cognitive Presence) — консенсус Montana.
Каждый τ₁ узел подписывает:
MONTANA_PRESENCE_V1:{timestamp}:{prev_hash}:{pubkey}:{t2_index}
Доказательства образуют свою цепочку.
Сеть принимает только 1 подпись на τ₁.
Невозможно ускорить время.
Post-quantum security: ML-DSA-65 (FIPS 204) signatures.
"""
import hashlib
import sqlite3
import time
import threading
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple, Dict
from node_crypto import sign_message, verify_signature
logger = logging.getLogger("presence_proof")
GENESIS_HASH = "0" * 64
PRESENCE_VERSION = "MONTANA_PRESENCE_V1"
# ═══════════════════════════════════════════════════════════════════════════════
# PRESENCE PROOF
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class PresenceProof:
"""
Доказательство присутствия узла.
Формат сообщения:
MONTANA_PRESENCE_V1:{timestamp}:{prev_hash}:{pubkey}:{t2_index}
Хеш доказательства:
SHA256(message || signature)
Цепочка: genesis (prev_hash = 64 нуля) → proof#1 → proof#2 → …
"""
message: str # MONTANA_PRESENCE_V1:...
signature: str # ML-DSA-65 подпись message
proof_hash: str # SHA256(message || signature)
prev_proof_hash: str # Хеш предыдущего proof (genesis = GENESIS_HASH)
timestamp: int # Наносекунды UTC
pubkey: str # Публичный ключ узла (hex)
t2_index: int # Номер текущего τ₂ окна
proof_number: int # Последовательный номер
def to_dict(self) -> Dict:
return {
"message": self.message,
"signature": self.signature,
"proof_hash": self.proof_hash,
"prev_proof_hash": self.prev_proof_hash,
"timestamp": self.timestamp,
"pubkey": self.pubkey,
"t2_index": self.t2_index,
"proof_number": self.proof_number,
}
@classmethod
def from_dict(cls, d: Dict) -> "PresenceProof":
return cls(
message=d["message"],
signature=d["signature"],
proof_hash=d["proof_hash"],
prev_proof_hash=d["prev_proof_hash"],
timestamp=d["timestamp"],
pubkey=d["pubkey"],
t2_index=d["t2_index"],
proof_number=d["proof_number"],
)
def compute_proof_hash(message: str, signature: str) -> str:
"""SHA256(message || signature) — хеш доказательства"""
combined = message + signature
return hashlib.sha256(combined.encode("utf-8")).hexdigest()
def format_presence_message(
timestamp: int, prev_hash: str, pubkey: str, t2_index: int,
proof_number: int = -1,
) -> str:
"""
Форматирует сообщение для подписи.
ANTI-REPLAY: proof_number включается в сообщение.
Если proof_number == -1, используется legacy формат (без номера).
"""
if proof_number >= 0:
return f"{PRESENCE_VERSION}:{timestamp}:{prev_hash}:{pubkey}:{t2_index}:{proof_number}"
return f"{PRESENCE_VERSION}:{timestamp}:{prev_hash}:{pubkey}:{t2_index}"
def parse_presence_message(message: str) -> Optional[Dict]:
"""
Парсит сообщение presence proof.
Поддерживает оба формата:
- V1 (5 полей): version:timestamp:prev_hash:pubkey:t2_index
- V1+ (6 полей): version:timestamp:prev_hash:pubkey:t2_index:proof_number
Returns:
{"version": str, "timestamp": int, "prev_hash": str, "pubkey": str,
"t2_index": int, "proof_number": int}
или None при ошибке формата
"""
parts = message.split(":")
if len(parts) not in (5, 6):
return None
if parts[0] != PRESENCE_VERSION:
return None
try:
result = {
"version": parts[0],
"timestamp": int(parts[1]),
"prev_hash": parts[2],
"pubkey": parts[3],
"t2_index": int(parts[4]),
"proof_number": -1,
}
if len(parts) == 6:
result["proof_number"] = int(parts[5])
return result
except (ValueError, IndexError):
return None
# ═══════════════════════════════════════════════════════════════════════════════
# PRESENCE CHAIN
# ═══════════════════════════════════════════════════════════════════════════════
class PresenceChain:
"""
Цепочка доказательств присутствия узла.
Каждый τ₁ узел создаёт одно доказательство.
Цепочка верифицируема: формат, подписи, prev_hash, timestamps.
"""
def __init__(self, node_id: str, private_key: str, pubkey: str, db_path: str):
"""
Args:
node_id: mt... адрес узла
private_key: hex приватный ключ ML-DSA-65
pubkey: hex публичный ключ ML-DSA-65
db_path: путь к SQLite
"""
self.node_id = node_id
self.private_key = private_key
self.pubkey = pubkey
self.db_path = db_path
self._lock = threading.Lock()
self._init_db()
# Загружаем последний proof
self.last_proof_hash = self._load_last_hash()
self.proof_count = self._load_count()
self.last_timestamp = self._load_last_timestamp()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
# IMMUTABILITY: WAL mode + crash protection
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=FULL")
conn.execute("""
CREATE TABLE IF NOT EXISTS presence_proofs (
proof_number INTEGER PRIMARY KEY,
message TEXT NOT NULL,
signature TEXT NOT NULL,
proof_hash TEXT UNIQUE NOT NULL,
prev_proof_hash TEXT NOT NULL,
pubkey TEXT NOT NULL,
timestamp INTEGER NOT NULL,
t2_index INTEGER NOT NULL
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_proof_hash
ON presence_proofs(proof_hash)
""")
conn.commit()
def _conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _load_last_hash(self) -> str:
with self._conn() as conn:
row = conn.execute(
"SELECT proof_hash FROM presence_proofs ORDER BY proof_number DESC LIMIT 1"
).fetchone()
return row["proof_hash"] if row else GENESIS_HASH
def _load_count(self) -> int:
with self._conn() as conn:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM presence_proofs"
).fetchone()
return row["cnt"]
def _load_last_timestamp(self) -> int:
with self._conn() as conn:
row = conn.execute(
"SELECT timestamp FROM presence_proofs ORDER BY proof_number DESC LIMIT 1"
).fetchone()
return row["timestamp"] if row else 0
# ─── Creation ─────────────────────────────────────────────────────────────
def create_proof(self, t2_index: int) -> PresenceProof:
"""
Создать новое доказательство присутствия.
Вызывается каждый τ₁ (1 минута).
Подписывает ML-DSA-65, связывает с предыдущим proof.
Args:
t2_index: номер текущего τ₂ окна
Returns:
PresenceProof
"""
ts = time.time_ns()
# Формируем сообщение (с proof_number для anti-replay)
message = format_presence_message(
ts, self.last_proof_hash, self.pubkey, t2_index,
proof_number=self.proof_count,
)
# Подписываем ML-DSA-65
signature = sign_message(self.private_key, message)
# Хеш доказательства
proof_hash = compute_proof_hash(message, signature)
proof = PresenceProof(
message=message,
signature=signature,
proof_hash=proof_hash,
prev_proof_hash=self.last_proof_hash,
timestamp=ts,
pubkey=self.pubkey,
t2_index=t2_index,
proof_number=self.proof_count,
)
# Сохраняем
self._save_proof(proof)
return proof
def _save_proof(self, proof: PresenceProof):
with self._lock:
with self._conn() as conn:
conn.execute(
"""INSERT INTO presence_proofs
(proof_number, message, signature, proof_hash,
prev_proof_hash, pubkey, timestamp, t2_index)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
proof.proof_number,
proof.message,
proof.signature,
proof.proof_hash,
proof.prev_proof_hash,
proof.pubkey,
proof.timestamp,
proof.t2_index,
),
)
conn.commit()
self.last_proof_hash = proof.proof_hash
self.proof_count = proof.proof_number + 1
self.last_timestamp = proof.timestamp
# ─── Verification ─────────────────────────────────────────────────────────
def verify_chain(self) -> Tuple[bool, str]:
"""
Полная верификация цепочки presence proofs.
Проверки:
1. Формат MONTANA_PRESENCE_V1:... корректен
2. ML-DSA-65 подпись валидна
3. Временные метки строго возрастают
4. prev_hash совпадает с proof_hash предыдущего
5. proof_hash = SHA256(message || signature)
"""
count = self._load_count()
if count == 0:
return True, "Empty chain"
prev_hash = GENESIS_HASH
prev_timestamp = 0
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM presence_proofs ORDER BY proof_number"
).fetchall()
for row in rows:
proof = PresenceProof(
message=row["message"],
signature=row["signature"],
proof_hash=row["proof_hash"],
prev_proof_hash=row["prev_proof_hash"],
timestamp=row["timestamp"],
pubkey=row["pubkey"],
t2_index=row["t2_index"],
proof_number=row["proof_number"],
)
# 1. Формат
parsed = parse_presence_message(proof.message)
if parsed is None:
return False, f"Proof #{proof.proof_number}: invalid message format"
if parsed["version"] != PRESENCE_VERSION:
return False, f"Proof #{proof.proof_number}: wrong version {parsed['version']}"
# 2. ML-DSA-65 подпись
if not verify_signature(proof.pubkey, proof.message, proof.signature):
return False, f"Proof #{proof.proof_number}: invalid ML-DSA-65 signature"
# 3. Timestamps строго возрастают
if proof.timestamp <= prev_timestamp and proof.proof_number > 0:
return False, (
f"Proof #{proof.proof_number}: timestamp not increasing "
f"({proof.timestamp} <= {prev_timestamp})"
)
# 4. prev_hash совпадает
if proof.prev_proof_hash != prev_hash:
return False, (
f"Proof #{proof.proof_number}: prev_hash mismatch "
f"(expected {prev_hash[:16]}..., got {proof.prev_proof_hash[:16]}...)"
)
# 5. proof_hash = SHA256(message || signature)
expected_hash = compute_proof_hash(proof.message, proof.signature)
if proof.proof_hash != expected_hash:
return False, (
f"Proof #{proof.proof_number}: proof_hash mismatch "
f"(expected {expected_hash[:16]}..., got {proof.proof_hash[:16]}...)"
)
prev_hash = proof.proof_hash
prev_timestamp = proof.timestamp
return True, f"OK ({count} proofs verified)"
# ─── Query ────────────────────────────────────────────────────────────────
def get_proof(self, proof_number: int) -> Optional[PresenceProof]:
with self._conn() as conn:
row = conn.execute(
"SELECT * FROM presence_proofs WHERE proof_number = ?",
(proof_number,),
).fetchone()
if row:
return PresenceProof.from_dict(dict(row))
return None
def get_proofs_for_t2(self, t2_index: int) -> List[PresenceProof]:
"""Все proofs для данного τ₂ окна"""
with self._conn() as conn:
rows = conn.execute(
"SELECT * FROM presence_proofs WHERE t2_index = ? ORDER BY proof_number",
(t2_index,),
).fetchall()
return [PresenceProof.from_dict(dict(r)) for r in rows]
def get_stats(self) -> Dict:
return {
"proof_count": self.proof_count,
"last_proof_hash": self.last_proof_hash[:16] + "..." if self.last_proof_hash != GENESIS_HASH else "genesis",
"last_timestamp": self.last_timestamp,
}