montana/CLI/mlkem768.py

358 lines
11 KiB
Python
Raw Permalink Normal View History

"""
ML-KEM-768 Post-Quantum Key Encapsulation for Montana Protocol
FIPS 203 Standard
Шифрование приватного ключа ML-DSA-65 с помощью когнитивного ключа.
Совместимо с iOS реализацией.
Dependencies:
pip install kyber-py cryptography
"""
import hashlib
import os
from typing import Optional, Tuple
# ML-KEM-768 через kyber-py (чистый Python, детерминированная генерация)
try:
from kyber_py.ml_kem import ML_KEM_768
HAS_KYBER = True
except ImportError:
HAS_KYBER = False
# AES-GCM через cryptography
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
# Константы ML-KEM-768 (FIPS 203)
PUBLIC_KEY_SIZE = 1184
SECRET_KEY_SIZE = 2400
CIPHERTEXT_SIZE = 1088
SHARED_SECRET_SIZE = 32
KEYPAIR_SEED_SIZE = 48 # kyber-py требует 48 байт
# ML-DSA-65 размеры
MLDSA65_PRIVATE_KEY_SIZE = 4032
MLDSA65_PUBLIC_KEY_SIZE = 1952
# Salt для PBKDF2 (должен совпадать с iOS)
KEM_KEY_SALT = b"MONTANA_KEM_KEY_V1"
def check_dependencies() -> bool:
"""Проверка зависимостей"""
if not HAS_KYBER:
print("ОШИБКА: Установи kyber-py:")
print(" pip install kyber-py")
return False
if not HAS_CRYPTO:
print("ОШИБКА: Установи cryptography:")
print(" pip install cryptography")
return False
return True
def pbkdf2_derive(password: str, salt: bytes, iterations: int, key_length: int) -> bytes:
"""PBKDF2-HMAC-SHA256 деривация"""
normalized = " ".join(password.lower().split())
return hashlib.pbkdf2_hmac(
"sha256",
normalized.encode("utf-8"),
salt,
iterations,
dklen=key_length
)
def keypair_from_seed(seed: bytes) -> Optional[Tuple[bytes, bytes]]:
"""
Детерминированная генерация ML-KEM-768 ключевой пары из seed.
Args:
seed: 64-byte seed
Returns:
(secret_key, public_key) или None при ошибке
"""
if len(seed) != KEYPAIR_SEED_SIZE:
print(f"[MLKEM768] Invalid seed size: {len(seed)}, expected {KEYPAIR_SEED_SIZE}")
return None
try:
# ML_KEM_768 — singleton, устанавливаем seed и генерируем
ML_KEM_768.set_drbg_seed(seed)
# Генерируем ключевую пару детерминированно
public_key, secret_key = ML_KEM_768.keygen()
print(f"[MLKEM768] Generated keypair: pk={len(public_key)}, sk={len(secret_key)}")
return (secret_key, public_key)
except Exception as e:
print(f"[MLKEM768] Keypair generation failed: {e}")
return None
def encapsulate(public_key: bytes, seed: Optional[bytes] = None) -> Optional[Tuple[bytes, bytes]]:
"""
Инкапсуляция для получения shared secret.
Args:
public_key: ML-KEM-768 публичный ключ
seed: Опциональный seed для детерминированной инкапсуляции
Returns:
(ciphertext, shared_secret) или None
"""
if len(public_key) != PUBLIC_KEY_SIZE:
print(f"[MLKEM768] Invalid public key size: {len(public_key)}")
return None
try:
if seed:
ML_KEM_768.set_drbg_seed(seed)
shared_secret, ciphertext = ML_KEM_768.encaps(public_key)
print(f"[MLKEM768] Encapsulated: ct={len(ciphertext)}, ss={len(shared_secret)}")
return (ciphertext, shared_secret)
except Exception as e:
print(f"[MLKEM768] Encapsulation failed: {e}")
return None
def decapsulate(ciphertext: bytes, secret_key: bytes) -> Optional[bytes]:
"""
Декапсуляция для восстановления shared secret.
Args:
ciphertext: ML-KEM-768 ciphertext
secret_key: ML-KEM-768 секретный ключ
Returns:
shared_secret (32 bytes) или None
"""
if len(ciphertext) != CIPHERTEXT_SIZE:
print(f"[MLKEM768] Invalid ciphertext size: {len(ciphertext)}")
return None
if len(secret_key) != SECRET_KEY_SIZE:
print(f"[MLKEM768] Invalid secret key size: {len(secret_key)}")
return None
try:
shared_secret = ML_KEM_768.decaps(secret_key, ciphertext)
print(f"[MLKEM768] Decapsulated: ss={len(shared_secret)}")
return shared_secret
except Exception as e:
print(f"[MLKEM768] Decapsulation failed: {e}")
return None
def aes_gcm_encrypt(data: bytes, key: bytes, nonce: bytes) -> Optional[bytes]:
"""AES-256-GCM шифрование"""
if len(key) != 32:
print(f"[AES] Invalid key size: {len(key)}")
return None
if len(nonce) != 12:
print(f"[AES] Invalid nonce size: {len(nonce)}")
return None
try:
aesgcm = AESGCM(key)
return aesgcm.encrypt(nonce, data, None)
except Exception as e:
print(f"[AES] Encryption failed: {e}")
return None
def aes_gcm_decrypt(data: bytes, key: bytes, nonce: bytes) -> Optional[bytes]:
"""AES-256-GCM расшифровка"""
if len(key) != 32:
print(f"[AES] Invalid key size: {len(key)}")
return None
if len(nonce) != 12:
print(f"[AES] Invalid nonce size: {len(nonce)}")
return None
try:
aesgcm = AESGCM(key)
return aesgcm.decrypt(nonce, data, None)
except Exception as e:
print(f"[AES] Decryption failed: {e}")
return None
def encrypt_private_key(private_key: bytes, cognitive_key: str) -> Optional[bytes]:
"""
Зашифровать ML-DSA-65 приватный ключ с помощью когнитивного ключа.
Схема: ML-KEM-768 + AES-256-GCM
Args:
private_key: ML-DSA-65 приватный ключ (4032 bytes)
cognitive_key: Когнитивный ключ пользователя
Returns:
Зашифрованные данные или None
"""
if not check_dependencies():
return None
if len(private_key) != MLDSA65_PRIVATE_KEY_SIZE:
print(f"[MLKEM768] Invalid private key size: {len(private_key)}")
return None
# 1. Деривация seed из когнитивного ключа (600K итераций)
seed = pbkdf2_derive(cognitive_key, KEM_KEY_SALT, 600_000, KEYPAIR_SEED_SIZE)
# 2. Генерация ML-KEM-768 keypair детерминированно
keys = keypair_from_seed(seed)
if not keys:
return None
secret_key, public_key = keys
# 3. Инкапсуляция для получения shared secret
encaps_result = encapsulate(public_key)
if not encaps_result:
return None
ciphertext, shared_secret = encaps_result
# 4. AES-GCM шифрование приватного ключа
nonce = os.urandom(12)
encrypted = aes_gcm_encrypt(private_key, shared_secret, nonce)
if not encrypted:
return None
# 5. Упаковка: ciphertext (1088) + nonce (12) + encrypted (4032 + 16 tag)
result = ciphertext + nonce + encrypted
print(f"[MLKEM768] Encrypted private key: {len(result)} bytes")
return result
def decrypt_private_key(encrypted_data: bytes, cognitive_key: str) -> Optional[bytes]:
"""
Расшифровать ML-DSA-65 приватный ключ с помощью когнитивного ключа.
Args:
encrypted_data: Зашифрованные данные
cognitive_key: Когнитивный ключ пользователя
Returns:
ML-DSA-65 приватный ключ или None
"""
if not check_dependencies():
return None
# Минимальный размер: ciphertext (1088) + nonce (12) + priv (4032) + tag (16)
min_size = CIPHERTEXT_SIZE + 12 + MLDSA65_PRIVATE_KEY_SIZE + 16
if len(encrypted_data) < min_size:
print(f"[MLKEM768] Encrypted data too small: {len(encrypted_data)}, expected >= {min_size}")
return None
# 1. Деривация seed из когнитивного ключа (600K итераций)
seed = pbkdf2_derive(cognitive_key, KEM_KEY_SALT, 600_000, KEYPAIR_SEED_SIZE)
# 2. Генерация ML-KEM-768 keypair детерминированно
keys = keypair_from_seed(seed)
if not keys:
return None
secret_key, public_key = keys
# 3. Распаковка
kem_ciphertext = encrypted_data[:CIPHERTEXT_SIZE]
nonce = encrypted_data[CIPHERTEXT_SIZE:CIPHERTEXT_SIZE + 12]
aes_ciphertext = encrypted_data[CIPHERTEXT_SIZE + 12:]
# 4. Декапсуляция для восстановления shared secret
shared_secret = decapsulate(kem_ciphertext, secret_key)
if not shared_secret:
print("[MLKEM768] Decapsulation failed - wrong cognitive key?")
return None
# 5. AES-GCM расшифровка
decrypted = aes_gcm_decrypt(aes_ciphertext, shared_secret, nonce)
if not decrypted:
print("[MLKEM768] AES decryption failed")
return None
# 6. Проверка размера
if len(decrypted) != MLDSA65_PRIVATE_KEY_SIZE:
print(f"[MLKEM768] Decrypted key wrong size: {len(decrypted)}")
return None
print(f"[MLKEM768] Decrypted private key: {len(decrypted)} bytes")
return decrypted
# === Тестирование ===
def test_mlkem768():
"""Тест ML-KEM-768 шифрования"""
print("=" * 50)
print("ML-KEM-768 + AES-256-GCM Test")
print("=" * 50 + "\n")
if not check_dependencies():
return False
# Тестовые данные
test_private_key = os.urandom(MLDSA65_PRIVATE_KEY_SIZE)
cognitive_key = "это мой уникальный когнитивный ключ для проверки постквантового шифрования приватного ключа"
print(f"Private key size: {len(test_private_key)} bytes")
print(f"Cognitive key: {cognitive_key[:40]}...")
print()
# Шифруем
print("Encrypting...")
encrypted = encrypt_private_key(test_private_key, cognitive_key)
if not encrypted:
print("❌ FAIL: Encryption failed")
return False
print(f"Encrypted size: {len(encrypted)} bytes")
print(f" - KEM ciphertext: {CIPHERTEXT_SIZE} bytes")
print(f" - Nonce: 12 bytes")
print(f" - AES ciphertext: {len(encrypted) - CIPHERTEXT_SIZE - 12} bytes")
print()
# Расшифровываем с правильным ключом
print("Decrypting with correct key...")
decrypted = decrypt_private_key(encrypted, cognitive_key)
if not decrypted:
print("❌ FAIL: Decryption failed")
return False
# Проверяем
if decrypted == test_private_key:
print("\n✅ SUCCESS: Decrypted key matches original")
else:
print("\n❌ FAIL: Keys don't match")
return False
# Тест с неправильным ключом
print("\nDecrypting with wrong key...")
wrong_decrypted = decrypt_private_key(encrypted, "wrong cognitive key")
if wrong_decrypted is None:
print("✅ Correctly rejected wrong key")
else:
print("❌ FAIL: Should have rejected wrong key")
return False
print("\n" + "=" * 50)
print("ALL TESTS PASSED")
print("=" * 50)
return True
if __name__ == "__main__":
test_mlkem768()