664 lines
31 KiB
Python
664 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
test_timechain.py — 9 Проверок Готовности Таймчейна
|
||
Montana Protocol v3.0
|
||
|
||
Каждый тест соответствует одной проверке из спецификации.
|
||
Все 9 PASS = Таймчейн готов.
|
||
|
||
Post-quantum: ML-DSA-65 (FIPS 204) — реальные подписи, не моки.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import hashlib
|
||
import tempfile
|
||
import unittest
|
||
import shutil
|
||
|
||
# Добавляем путь к модулям
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from node_crypto import (
|
||
generate_keypair,
|
||
public_key_to_address,
|
||
sign_message,
|
||
verify_signature,
|
||
validate_address,
|
||
)
|
||
from transaction import (
|
||
Transaction,
|
||
TxInput,
|
||
TxOutput,
|
||
UTXOSet,
|
||
create_coinbase_tx,
|
||
create_transfer_tx,
|
||
validate_transaction,
|
||
)
|
||
from timechain import (
|
||
TimeChain,
|
||
Tau1Window,
|
||
Tau2Window,
|
||
merkle_root,
|
||
GENESIS_HASH,
|
||
TAU1_PER_TAU2,
|
||
)
|
||
from nts_anchor import NTSAnchorService
|
||
from presence_proof import (
|
||
PresenceChain,
|
||
PresenceProof,
|
||
compute_proof_hash,
|
||
format_presence_message,
|
||
parse_presence_message,
|
||
PRESENCE_VERSION,
|
||
)
|
||
|
||
|
||
class TestTimeChainReadiness(unittest.TestCase):
|
||
"""
|
||
9 проверок готовности Таймчейна Montana Protocol.
|
||
|
||
Каждый тест использует реальную ML-DSA-65 криптографию.
|
||
Временные БД создаются в /tmp и удаляются после теста.
|
||
"""
|
||
|
||
@classmethod
|
||
def setUpClass(cls):
|
||
"""Генерируем ключи один раз для всех тестов (ML-DSA-65 keygen медленный)"""
|
||
print("\n🔑 Generating ML-DSA-65 keys for tests...")
|
||
|
||
# Узел A (создатель окон)
|
||
cls.priv_a, cls.pub_a = generate_keypair()
|
||
cls.addr_a = public_key_to_address(cls.pub_a)
|
||
|
||
# Узел B (получатель)
|
||
cls.priv_b, cls.pub_b = generate_keypair()
|
||
cls.addr_b = public_key_to_address(cls.pub_b)
|
||
|
||
# Узел C (для double-spend теста)
|
||
cls.priv_c, cls.pub_c = generate_keypair()
|
||
cls.addr_c = public_key_to_address(cls.pub_c)
|
||
|
||
print(f" Node A: {cls.addr_a}")
|
||
print(f" Node B: {cls.addr_b}")
|
||
print(f" Node C: {cls.addr_c}")
|
||
|
||
def setUp(self):
|
||
"""Создаём временную директорию для каждого теста"""
|
||
self.test_dir = tempfile.mkdtemp(prefix="montana_test_")
|
||
self.db_path = os.path.join(self.test_dir, "timechain.db")
|
||
|
||
def tearDown(self):
|
||
"""Удаляем временную директорию"""
|
||
shutil.rmtree(self.test_dir, ignore_errors=True)
|
||
|
||
def _create_chain(self, with_nts: bool = True) -> TimeChain:
|
||
"""Создать TimeChain с ключами узла A (strict_timestamps=False for rapid test execution)"""
|
||
nts = NTSAnchorService(mock_mode=True) if with_nts else None
|
||
chain = TimeChain(
|
||
node_id=self.addr_a,
|
||
private_key=self.priv_a,
|
||
db_path=self.db_path,
|
||
public_key=self.pub_a,
|
||
strict_timestamps=False,
|
||
nts_service=nts,
|
||
)
|
||
return chain
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 1: Горизонтальная целостность τ₁
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_1_horizontal_integrity(self):
|
||
"""
|
||
Берём любой слайс τ₁, вычисляем хеш предыдущего, сравниваем
|
||
с prev_tau1_hash. Проходим от последнего до генезиса.
|
||
Все хеши сходятся — цепочка не нарушена.
|
||
Изменил бит в старом слайсе — все хеши после него рассыпаются.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 1: Горизонтальная целостность τ₁ ═══")
|
||
chain = self._create_chain()
|
||
|
||
# Создаём генезис
|
||
genesis = chain.create_genesis_window()
|
||
self.assertEqual(genesis.prev_tau1_hash, GENESIS_HASH)
|
||
self.assertEqual(genesis.window_number, 0)
|
||
|
||
# Создаём 10 τ₁ окон
|
||
for i in range(10):
|
||
coinbase = create_coinbase_tx(self.addr_a, 60, t2_index=0)
|
||
chain.create_tau1_window([coinbase])
|
||
|
||
# Верификация цепочки
|
||
ok, msg = chain.verify_tau1_chain()
|
||
self.assertTrue(ok, f"Chain verification failed: {msg}")
|
||
print(f" ✅ {msg}")
|
||
|
||
# Проверяем что prev_hash каждого окна = hash предыдущего
|
||
prev_hash = GENESIS_HASH
|
||
for i in range(11): # genesis + 10
|
||
window = chain.db.get_tau1(i)
|
||
self.assertIsNotNone(window, f"Window #{i} not found")
|
||
self.assertEqual(
|
||
window.prev_tau1_hash, prev_hash,
|
||
f"Window #{i}: prev_hash mismatch"
|
||
)
|
||
prev_hash = window.window_hash()
|
||
|
||
print(f" ✅ 11 окон: горизонтальная цепочка целостна")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 2: Вертикальная матрёшка
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_2_vertical_matryoshka(self):
|
||
"""
|
||
Берём слайс τ₂. В нём tau1_headers (10 хешей) и tau1_merkle_root.
|
||
Строим Merkle tree из 10 хешей, вычисляем корень, сравниваем.
|
||
Совпадает — матрёшка корректна.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 2: Вертикальная матрёшка τ₂→τ₁ ═══")
|
||
chain = self._create_chain()
|
||
|
||
# Genesis + 10 τ₁ окон
|
||
chain.create_genesis_window()
|
||
tau1_hashes = []
|
||
for i in range(TAU1_PER_TAU2):
|
||
coinbase = create_coinbase_tx(self.addr_a, 60, t2_index=0)
|
||
window = chain.create_tau1_window([coinbase])
|
||
tau1_hashes.append(window.window_hash())
|
||
|
||
# Финализация τ₂
|
||
coinbase_t2 = [create_coinbase_tx(self.addr_a, 600, t2_index=0)]
|
||
tau2 = chain.finalize_tau2(coinbase_t2, halving_coefficient=1.0)
|
||
self.assertIsNotNone(tau2, "τ₂ should be created")
|
||
|
||
# Проверяем merkle root
|
||
expected_root = merkle_root(tau2.tau1_headers)
|
||
self.assertEqual(tau2.tau1_merkle_root, expected_root)
|
||
print(f" ✅ τ₂ Merkle root совпадает: {expected_root[:16]}...")
|
||
|
||
# Верификация через chain.verify_tau2_matryoshka
|
||
ok, msg = chain.verify_tau2_matryoshka(0)
|
||
self.assertTrue(ok, f"Matryoshka verification failed: {msg}")
|
||
print(f" ✅ {msg}")
|
||
|
||
# Проверяем что τ₁ headers в τ₂ совпадают с реальными хешами
|
||
# (genesis не входит в первый набор τ₁ headers для τ₂,
|
||
# потому что pending_tau1_headers начинает заполняться после genesis)
|
||
self.assertEqual(len(tau2.tau1_headers), TAU1_PER_TAU2)
|
||
print(f" ✅ τ₂ содержит {len(tau2.tau1_headers)} хешей τ₁")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 3: Валидация транзакций
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_3_transaction_validation(self):
|
||
"""
|
||
Для каждой транзакции в каждом τ₁:
|
||
- Найти UTXO по (tx_hash, output_idx)
|
||
- Проверить что не потрачен
|
||
- Проверить address(input.pubkey) == UTXO.address
|
||
- Проверить ML_DSA_65.verify(pubkey, tx_data, signature)
|
||
- Проверить sum(inputs) ≥ sum(outputs)
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 3: Валидация транзакций ═══")
|
||
chain = self._create_chain()
|
||
chain.create_genesis_window()
|
||
|
||
# Эмиссия: A получает 350 Ɉ
|
||
coinbase = create_coinbase_tx(self.addr_a, 350, t2_index=0)
|
||
chain.create_tau1_window([coinbase])
|
||
|
||
# Проверяем coinbase валидация
|
||
ok, msg = validate_transaction(coinbase, chain.utxo_set)
|
||
self.assertTrue(ok, f"Coinbase validation failed: {msg}")
|
||
print(f" ✅ Coinbase валидна: {msg}")
|
||
|
||
# Transfer: A → B (100 Ɉ), сдача 250 на A
|
||
utxos_a = chain.utxo_set.get_utxos_for_address(self.addr_a)
|
||
self.assertTrue(len(utxos_a) > 0, "A should have UTXOs")
|
||
|
||
transfer = create_transfer_tx(
|
||
utxos_to_spend=[(utxos_a[0]["tx_hash"], utxos_a[0]["output_idx"], utxos_a[0]["amount"])],
|
||
recipient=self.addr_b,
|
||
amount=100,
|
||
change_address=self.addr_a,
|
||
private_key=self.priv_a,
|
||
public_key=self.pub_a,
|
||
)
|
||
|
||
# Валидация transfer
|
||
ok, msg = validate_transaction(transfer, chain.utxo_set)
|
||
self.assertTrue(ok, f"Transfer validation failed: {msg}")
|
||
print(f" ✅ Transfer валидна: {msg}")
|
||
|
||
# Проверяем что подпись ML-DSA-65 работает
|
||
for inp in transfer.inputs:
|
||
verified = verify_signature(inp.pubkey, transfer.tx_hash, inp.signature)
|
||
self.assertTrue(verified, "ML-DSA-65 signature should be valid")
|
||
print(f" ✅ ML-DSA-65 подписи валидны")
|
||
|
||
# Проверяем address(pubkey) == UTXO.address
|
||
derived = public_key_to_address(self.pub_a)
|
||
self.assertEqual(derived, self.addr_a)
|
||
print(f" ✅ address(pubkey) == UTXO.address")
|
||
|
||
# Проверяем невалидную транзакцию (подделанный tx_hash)
|
||
bad_tx = Transaction(
|
||
inputs=transfer.inputs,
|
||
outputs=transfer.outputs,
|
||
timestamp=transfer.timestamp,
|
||
tx_type="transfer",
|
||
tx_hash="0" * 64, # Подделанный хеш
|
||
)
|
||
ok, msg = validate_transaction(bad_tx, chain.utxo_set)
|
||
self.assertFalse(ok, "Transaction with fake hash should be invalid")
|
||
print(f" ✅ Подделанный tx_hash отвергнут: {msg}")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 4: UTXO и балансы
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_4_utxo_and_balances(self):
|
||
"""
|
||
Отправляешь 100 Ɉ с адреса A на адрес B.
|
||
Вход — UTXO адреса A (350 Ɉ).
|
||
Выходы — 100 Ɉ на B + 250 Ɉ сдача на A.
|
||
Старый UTXO потрачен, два новых создаются.
|
||
balance(A) = 250, balance(B) = 100.
|
||
Инвариант: sum(all UTXO) = total emission.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 4: UTXO и балансы ═══")
|
||
chain = self._create_chain()
|
||
chain.create_genesis_window()
|
||
|
||
# Эмиссия: A получает 350 Ɉ
|
||
coinbase = create_coinbase_tx(self.addr_a, 350, t2_index=0)
|
||
chain.create_tau1_window([coinbase])
|
||
|
||
# Проверяем баланс A
|
||
balance_a = chain.utxo_set.get_balance(self.addr_a)
|
||
self.assertEqual(balance_a, 350)
|
||
print(f" Balance A после эмиссии: {balance_a} Ɉ ✅")
|
||
|
||
# Transfer: A → B (100), сдача A (250)
|
||
utxos_a = chain.utxo_set.get_utxos_for_address(self.addr_a)
|
||
transfer = create_transfer_tx(
|
||
utxos_to_spend=[(utxos_a[0]["tx_hash"], utxos_a[0]["output_idx"], utxos_a[0]["amount"])],
|
||
recipient=self.addr_b,
|
||
amount=100,
|
||
change_address=self.addr_a,
|
||
private_key=self.priv_a,
|
||
public_key=self.pub_a,
|
||
)
|
||
chain.create_tau1_window([transfer])
|
||
|
||
# Проверяем балансы
|
||
balance_a = chain.utxo_set.get_balance(self.addr_a)
|
||
balance_b = chain.utxo_set.get_balance(self.addr_b)
|
||
self.assertEqual(balance_a, 250, f"Expected A=250, got {balance_a}")
|
||
self.assertEqual(balance_b, 100, f"Expected B=100, got {balance_b}")
|
||
print(f" Balance A: {balance_a} Ɉ ✅")
|
||
print(f" Balance B: {balance_b} Ɉ ✅")
|
||
|
||
# Инвариант: sum(all UTXO) = total emission
|
||
total = chain.utxo_set.total_unspent()
|
||
self.assertEqual(total, 350, f"Total UTXO should be 350 (emission), got {total}")
|
||
print(f" Total supply: {total} Ɉ = emission ✅")
|
||
|
||
# Supply invariant
|
||
ok, msg = chain.utxo_set.verify_supply_invariant()
|
||
self.assertTrue(ok, f"Supply invariant failed: {msg}")
|
||
print(f" Supply invariant: {msg} ✅")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 5: Эмиссия
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_5_emission(self):
|
||
"""
|
||
Каждый τ₂: coinbase-транзакции, coins = seconds × halving_coefficient.
|
||
TIME_BANK -= 600 секунд.
|
||
После халвинга: тот же участник за 450 секунд получает 225 Ɉ (коэф 0.5).
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 5: Эмиссия ═══")
|
||
chain = self._create_chain()
|
||
chain.create_genesis_window()
|
||
|
||
# Создаём 10 τ₁ (для накопления headers)
|
||
for i in range(TAU1_PER_TAU2):
|
||
chain.create_tau1_window([])
|
||
|
||
# Эпоха 0: halving_coefficient = 1.0
|
||
# Участник A: 450 секунд → 450 Ɉ
|
||
# Участник B: 600 секунд → 600 Ɉ
|
||
coinbase_txs = [
|
||
create_coinbase_tx(self.addr_a, 450, t2_index=0),
|
||
create_coinbase_tx(self.addr_b, 600, t2_index=0),
|
||
]
|
||
tau2 = chain.finalize_tau2(coinbase_txs, halving_coefficient=1.0)
|
||
self.assertIsNotNone(tau2)
|
||
self.assertEqual(tau2.total_emissions, 1050)
|
||
print(f" τ₂ #0: emissions = {tau2.total_emissions} Ɉ (450+600) ✅")
|
||
|
||
# Балансы
|
||
balance_a = chain.utxo_set.get_balance(self.addr_a)
|
||
balance_b = chain.utxo_set.get_balance(self.addr_b)
|
||
self.assertEqual(balance_a, 450)
|
||
self.assertEqual(balance_b, 600)
|
||
print(f" A = {balance_a} Ɉ, B = {balance_b} Ɉ ✅")
|
||
|
||
# TIME_BANK
|
||
self.assertEqual(chain.time_bank_spent, 600)
|
||
print(f" TIME_BANK spent: {chain.time_bank_spent} sec ✅")
|
||
|
||
# Эпоха 1: halving_coefficient = 0.5
|
||
# 10 ещё τ₁
|
||
for i in range(TAU1_PER_TAU2):
|
||
chain.create_tau1_window([])
|
||
|
||
coinbase_txs_halved = [
|
||
create_coinbase_tx(self.addr_a, 225, t2_index=1), # 450 × 0.5 = 225
|
||
]
|
||
tau2_halved = chain.finalize_tau2(coinbase_txs_halved, halving_coefficient=0.5)
|
||
self.assertIsNotNone(tau2_halved)
|
||
self.assertEqual(tau2_halved.total_emissions, 225)
|
||
self.assertEqual(tau2_halved.halving_coefficient, 0.5)
|
||
print(f" τ₂ #1 (halved): emissions = {tau2_halved.total_emissions} Ɉ, coef = 0.5 ✅")
|
||
|
||
# Проверяем что A теперь 450 + 225 = 675
|
||
balance_a = chain.utxo_set.get_balance(self.addr_a)
|
||
self.assertEqual(balance_a, 675)
|
||
print(f" A total = {balance_a} Ɉ (450 + 225) ✅")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 6: Double Spend
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_6_double_spend(self):
|
||
"""
|
||
Пытаешься потратить один UTXO дважды:
|
||
Транзакция на B и одновременно на C, ссылаясь на тот же UTXO.
|
||
Сеть принимает первую, вторую отвергает.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 6: Double Spend ═══")
|
||
chain = self._create_chain()
|
||
chain.create_genesis_window()
|
||
|
||
# Эмиссия: A получает 500 Ɉ
|
||
coinbase = create_coinbase_tx(self.addr_a, 500, t2_index=0)
|
||
chain.create_tau1_window([coinbase])
|
||
|
||
utxos_a = chain.utxo_set.get_utxos_for_address(self.addr_a)
|
||
self.assertEqual(len(utxos_a), 1)
|
||
utxo = utxos_a[0]
|
||
|
||
# TX1: A → B (300), сдача 200 на A
|
||
tx1 = create_transfer_tx(
|
||
utxos_to_spend=[(utxo["tx_hash"], utxo["output_idx"], utxo["amount"])],
|
||
recipient=self.addr_b,
|
||
amount=300,
|
||
change_address=self.addr_a,
|
||
private_key=self.priv_a,
|
||
public_key=self.pub_a,
|
||
)
|
||
|
||
# TX2: A → C (400), сдача 100 на A (тот же UTXO!)
|
||
tx2 = create_transfer_tx(
|
||
utxos_to_spend=[(utxo["tx_hash"], utxo["output_idx"], utxo["amount"])],
|
||
recipient=self.addr_c,
|
||
amount=400,
|
||
change_address=self.addr_a,
|
||
private_key=self.priv_a,
|
||
public_key=self.pub_a,
|
||
)
|
||
|
||
# Первая проходит
|
||
chain.create_tau1_window([tx1])
|
||
balance_b = chain.utxo_set.get_balance(self.addr_b)
|
||
self.assertEqual(balance_b, 300)
|
||
print(f" TX1 (A→B 300): ACCEPTED ✅")
|
||
|
||
# Вторая ДОЛЖНА быть отвергнута (UTXO уже потрачен)
|
||
ok, msg = validate_transaction(tx2, chain.utxo_set)
|
||
self.assertFalse(ok, "Double spend should be rejected")
|
||
self.assertIn("already spent", msg)
|
||
print(f" TX2 (A→C 400, same UTXO): REJECTED ✅ ({msg})")
|
||
|
||
# Попытка включить в окно тоже должна провалиться
|
||
with self.assertRaises(ValueError):
|
||
chain.create_tau1_window([tx2])
|
||
print(f" Double spend в окне: REJECTED ✅")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 7: Presence Proof Chain
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_7_presence_proof_chain(self):
|
||
"""
|
||
Genesis → proof#1 → proof#2 → …
|
||
Формат MONTANA_PRESENCE_V1 корректен.
|
||
ML-DSA-65 подписи валидны.
|
||
Временные метки строго вперёд.
|
||
prev_hash совпадает с proof_hash предыдущего.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 7: Presence Proof Chain ═══")
|
||
|
||
presence = PresenceChain(
|
||
node_id=self.addr_a,
|
||
private_key=self.priv_a,
|
||
pubkey=self.pub_a,
|
||
db_path=self.db_path,
|
||
)
|
||
|
||
# Создаём 5 proofs
|
||
proofs = []
|
||
for i in range(5):
|
||
proof = presence.create_proof(t2_index=0)
|
||
proofs.append(proof)
|
||
time.sleep(0.01) # Гарантируем разные timestamps
|
||
|
||
self.assertEqual(presence.proof_count, 5)
|
||
print(f" Создано {presence.proof_count} proofs ✅")
|
||
|
||
# Верификация цепочки
|
||
ok, msg = presence.verify_chain()
|
||
self.assertTrue(ok, f"Presence chain verification failed: {msg}")
|
||
print(f" Верификация: {msg} ✅")
|
||
|
||
# Проверяем формат каждого proof
|
||
for proof in proofs:
|
||
parsed = parse_presence_message(proof.message)
|
||
self.assertIsNotNone(parsed, f"Proof #{proof.proof_number}: invalid format")
|
||
self.assertEqual(parsed["version"], PRESENCE_VERSION)
|
||
self.assertEqual(parsed["pubkey"], self.pub_a)
|
||
|
||
print(f" Формат MONTANA_PRESENCE_V1: OK ✅")
|
||
|
||
# Проверяем цепочку prev_hash
|
||
prev_hash = GENESIS_HASH
|
||
for proof in proofs:
|
||
self.assertEqual(proof.prev_proof_hash, prev_hash)
|
||
expected = compute_proof_hash(proof.message, proof.signature)
|
||
self.assertEqual(proof.proof_hash, expected)
|
||
prev_hash = proof.proof_hash
|
||
|
||
print(f" Цепочка prev_hash: OK ✅")
|
||
|
||
# Проверяем подписи ML-DSA-65
|
||
for proof in proofs:
|
||
verified = verify_signature(proof.pubkey, proof.message, proof.signature)
|
||
self.assertTrue(verified, f"Proof #{proof.proof_number}: invalid signature")
|
||
|
||
print(f" ML-DSA-65 подписи: OK ✅")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 8: Синхронизация узлов
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_8_node_sync(self):
|
||
"""
|
||
Создаём chain на node A. Копируем БД на node B.
|
||
Node B верифицирует от генезиса.
|
||
Должен прийти к тому же UTXO set и тем же балансам.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 8: Синхронизация узлов ═══")
|
||
|
||
# Node A создаёт chain
|
||
chain_a = self._create_chain()
|
||
chain_a.create_genesis_window()
|
||
|
||
# Эмиссия + transfer
|
||
coinbase = create_coinbase_tx(self.addr_a, 500, t2_index=0)
|
||
chain_a.create_tau1_window([coinbase])
|
||
|
||
utxos = chain_a.utxo_set.get_utxos_for_address(self.addr_a)
|
||
transfer = create_transfer_tx(
|
||
utxos_to_spend=[(utxos[0]["tx_hash"], utxos[0]["output_idx"], utxos[0]["amount"])],
|
||
recipient=self.addr_b,
|
||
amount=200,
|
||
change_address=self.addr_a,
|
||
private_key=self.priv_a,
|
||
public_key=self.pub_a,
|
||
)
|
||
chain_a.create_tau1_window([transfer])
|
||
|
||
# Балансы на A
|
||
balance_a_on_a = chain_a.utxo_set.get_balance(self.addr_a)
|
||
balance_b_on_a = chain_a.utxo_set.get_balance(self.addr_b)
|
||
print(f" Node A: A={balance_a_on_a}, B={balance_b_on_a}")
|
||
|
||
# Node B: открываем ту же БД (симуляция синхронизации)
|
||
chain_b = TimeChain(
|
||
node_id=self.addr_b,
|
||
private_key=self.priv_b,
|
||
db_path=self.db_path,
|
||
public_key=self.pub_b,
|
||
strict_timestamps=False,
|
||
)
|
||
# Регистрируем узел A для верификации его подписей
|
||
chain_b.register_node(self.addr_a, self.pub_a)
|
||
|
||
# Верификация на B
|
||
ok, msg = chain_b.verify_full_chain()
|
||
self.assertTrue(ok, f"Node B verification failed: {msg}")
|
||
print(f" Node B verification: {msg} ✅")
|
||
|
||
# Балансы совпадают
|
||
balance_a_on_b = chain_b.utxo_set.get_balance(self.addr_a)
|
||
balance_b_on_b = chain_b.utxo_set.get_balance(self.addr_b)
|
||
self.assertEqual(balance_a_on_a, balance_a_on_b)
|
||
self.assertEqual(balance_b_on_a, balance_b_on_b)
|
||
print(f" Node B: A={balance_a_on_b}, B={balance_b_on_b} (identical) ✅")
|
||
|
||
# UTXO counts
|
||
self.assertEqual(
|
||
chain_a.utxo_set.utxo_count(),
|
||
chain_b.utxo_set.utxo_count(),
|
||
)
|
||
print(f" UTXO count: {chain_a.utxo_set.utxo_count()} = {chain_b.utxo_set.utxo_count()} ✅")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# ПРОВЕРКА 9: API
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_9_api(self):
|
||
"""
|
||
get_balance() возвращает confirmed, utxo_count, total_supply.
|
||
get_stats() возвращает tau1_count, tau2_count, total_supply, utxo_count.
|
||
"""
|
||
print("\n═══ ПРОВЕРКА 9: API ═══")
|
||
chain = self._create_chain()
|
||
chain.create_genesis_window()
|
||
|
||
# Эмиссия
|
||
for i in range(TAU1_PER_TAU2):
|
||
coinbase = create_coinbase_tx(self.addr_a, 60, t2_index=0)
|
||
chain.create_tau1_window([coinbase])
|
||
|
||
# τ₂ finalize
|
||
coinbase_t2 = [create_coinbase_tx(self.addr_a, 600, t2_index=0)]
|
||
chain.finalize_tau2(coinbase_t2, halving_coefficient=1.0)
|
||
|
||
# Balance API
|
||
balance = chain.get_balance(self.addr_a)
|
||
self.assertIn("confirmed", balance)
|
||
self.assertIn("utxo_count", balance)
|
||
self.assertIn("total_supply", balance)
|
||
self.assertEqual(balance["confirmed"], 600 + 600) # 10 coinbase в τ₁ + 1 coinbase в τ₂
|
||
print(f" Balance API: {balance} ✅")
|
||
|
||
# Stats API
|
||
stats = chain.get_stats()
|
||
self.assertEqual(stats["tau1_count"], 11) # genesis + 10
|
||
self.assertEqual(stats["tau2_count"], 1)
|
||
self.assertGreater(stats["total_supply"], 0)
|
||
self.assertGreater(stats["utxo_count"], 0)
|
||
print(f" Stats API: τ₁={stats['tau1_count']}, τ₂={stats['tau2_count']}, supply={stats['total_supply']} ✅")
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# БОНУС: Крипто-слой тест из спецификации
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
def test_crypto_layer(self):
|
||
"""
|
||
Генерируешь seed, выводишь ключи, получаешь адрес,
|
||
подписываешь строку «Montana Genesis»,
|
||
верифицируешь подпись — работает.
|
||
|
||
(Детерминистичность seed→keys требует BIP-39 + KDF,
|
||
здесь тестируем ML-DSA-65 sign/verify цикл)
|
||
"""
|
||
print("\n═══ БОНУС: Крипто-слой ═══")
|
||
|
||
# Генерация
|
||
priv, pub = generate_keypair()
|
||
addr = public_key_to_address(pub)
|
||
self.assertTrue(validate_address(addr))
|
||
self.assertEqual(len(addr), 42)
|
||
self.assertTrue(addr.startswith("mt"))
|
||
print(f" Адрес: {addr} ✅")
|
||
|
||
# Подпись
|
||
message = "Montana Genesis"
|
||
sig = sign_message(priv, message)
|
||
self.assertTrue(len(sig) > 0)
|
||
print(f" Подпись: {len(bytes.fromhex(sig))} bytes ✅")
|
||
|
||
# Верификация
|
||
verified = verify_signature(pub, message, sig)
|
||
self.assertTrue(verified, "Signature should be valid")
|
||
print(f" Верификация: OK ✅")
|
||
|
||
# Изменение сообщения → верификация падает
|
||
wrong_verified = verify_signature(pub, "Montana Genesis!", sig)
|
||
self.assertFalse(wrong_verified, "Changed message should fail")
|
||
print(f" Изменённое сообщение отвергнуто ✅")
|
||
|
||
# Чужой ключ → верификация падает
|
||
_, pub2 = generate_keypair()
|
||
foreign_verified = verify_signature(pub2, message, sig)
|
||
self.assertFalse(foreign_verified, "Foreign key should fail")
|
||
print(f" Чужой ключ отвергнут ✅")
|
||
|
||
# address(pubkey) → addr
|
||
derived = public_key_to_address(pub)
|
||
self.assertEqual(derived, addr)
|
||
print(f" address(pubkey) = {derived} = {addr} ✅")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# MAIN
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
if __name__ == "__main__":
|
||
print("=" * 70)
|
||
print(" Montana Protocol — 9 Проверок Готовности Таймчейна")
|
||
print(" ML-DSA-65 (FIPS 204) — Post-Quantum Security")
|
||
print("=" * 70)
|
||
|
||
unittest.main(verbosity=2)
|