montana/Russian/Site/messenger-backend/app.py
2026-05-18 18:05:32 +03:00

334 lines
13 KiB
Python

import json
import os
import secrets
import sqlite3
import time
import urllib.request
import urllib.error
from contextlib import contextmanager
from functools import wraps
from pathlib import Path
from threading import Thread
import hmac, hashlib, base64
from flask import Flask, request, jsonify
from flask_cors import CORS
from nacl.signing import VerifyKey
from nacl.exceptions import BadSignatureError
app = Flask(__name__)
CORS(app)
DB_PATH = os.environ.get("MONTANA_DB_PATH", "/var/lib/montana-messenger/db.sqlite3")
PEERS_PATH = os.environ.get("MONTANA_PEERS_PATH", "/etc/montana-site/peers.json")
SHARED_SECRET = os.environ.get("MONTANA_SHARED_SECRET", "change-me-in-prod")
NODE_NAME = os.environ.get("MONTANA_NODE_NAME", "unknown")
CHALLENGE_TTL_MS = 5 * 60 * 1000
SESSION_TTL_MS = 30 * 24 * 60 * 60 * 1000
def now_ms() -> int:
return int(time.time() * 1000)
@contextmanager
def db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH, isolation_level=None, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
finally:
conn.close()
def init_schema():
schema = (Path(__file__).parent / "schema.sql").read_text()
with db() as conn:
conn.executescript(schema)
def load_peers():
try:
return json.loads(Path(PEERS_PATH).read_text()).get("peers", [])
except Exception:
return []
# ─── JWT-like signed challenge (stateless, любой узел может verify) ───
def b64u_encode(b: bytes) -> str:
return base64.urlsafe_b64encode(b).rstrip(b"=").decode("ascii")
def b64u_decode(s: str) -> bytes:
pad = "=" * (-len(s) % 4)
return base64.urlsafe_b64decode(s + pad)
def make_challenge_token(account_id: str, challenge: bytes, expires_at: int) -> str:
payload = json.dumps({"a": account_id, "c": challenge.hex(), "e": expires_at}, separators=(",", ":")).encode()
body = b64u_encode(payload)
sig = hmac.new(SHARED_SECRET.encode(), body.encode(), hashlib.sha256).digest()
return body + "." + b64u_encode(sig)
def parse_challenge_token(token: str):
try:
body, sig = token.split(".", 1)
expected = b64u_encode(hmac.new(SHARED_SECRET.encode(), body.encode(), hashlib.sha256).digest())
if not hmac.compare_digest(sig, expected):
return None
data = json.loads(b64u_decode(body))
if data["e"] < now_ms():
return None
return data["a"], bytes.fromhex(data["c"]), data["e"]
except Exception:
return None
# ─── Replicate events (gossip) ───
def replicate_async(event: dict):
"""Бросает событие всем peer-узлам в фоновом потоке."""
Thread(target=_replicate_sync, args=(event,), daemon=True).start()
def _replicate_sync(event: dict):
peers = load_peers()
body = json.dumps(event).encode()
sig = hmac.new(SHARED_SECRET.encode(), body, hashlib.sha256).hexdigest()
for peer in peers:
url = peer["url"] + "/api/internal/replicate"
try:
req = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json", "X-Montana-Sig": sig, "X-Montana-Origin": NODE_NAME},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass # fire-and-forget; узел догонит при следующем write
def apply_event(event: dict):
"""Применяет реплицируемое событие к локальной БД."""
kind = event.get("kind")
if kind == "account":
with db() as conn:
conn.execute(
"INSERT INTO accounts (account_id, encryption_pubkey, created_at_ms, last_seen_at_ms) VALUES (?, ?, ?, ?) "
"ON CONFLICT(account_id) DO UPDATE SET last_seen_at_ms=MAX(accounts.last_seen_at_ms, excluded.last_seen_at_ms), "
"encryption_pubkey=COALESCE(excluded.encryption_pubkey, accounts.encryption_pubkey)",
(event["account_id"], event.get("encryption_pubkey"), event.get("created_at_ms", now_ms()), event.get("last_seen_at_ms", now_ms())),
)
elif kind == "session":
with db() as conn:
conn.execute(
"INSERT INTO sessions (token, account_id, expires_at_ms, created_at_ms) VALUES (?, ?, ?, ?) "
"ON CONFLICT(token) DO NOTHING",
(event["token"], event["account_id"], event["expires_at_ms"], event["created_at_ms"]),
)
elif kind == "message":
with db() as conn:
conn.execute(
"INSERT INTO accounts (account_id, created_at_ms) VALUES (?, ?) ON CONFLICT(account_id) DO NOTHING",
(event["recipient"], event.get("created_at_ms", now_ms())),
)
conn.execute(
"INSERT INTO messages (msg_id, sender, recipient, ciphertext, nonce, created_at_ms) VALUES (?, ?, ?, ?, ?, ?) "
"ON CONFLICT(msg_id) DO NOTHING",
(event["msg_id"], event["sender"], event["recipient"], event["ciphertext"], event["nonce"], event["created_at_ms"]),
)
elif kind == "read":
with db() as conn:
placeholders = ",".join("?" * len(event["msg_ids"]))
conn.execute(
f"UPDATE messages SET read_at_ms=? WHERE msg_id IN ({placeholders}) AND recipient=?",
(event["read_at_ms"], *event["msg_ids"], event["account_id"]),
)
# ─── auth_required decorator ───
def auth_required(fn):
@wraps(fn)
def wrapper(*a, **kw):
h = request.headers.get("Authorization", "")
if not h.startswith("Bearer "):
return jsonify({"error": "no_auth"}), 401
token = h[7:]
with db() as conn:
row = conn.execute(
"SELECT account_id FROM sessions WHERE token=? AND expires_at_ms > ?",
(token, now_ms()),
).fetchone()
if not row:
return jsonify({"error": "invalid_token"}), 401
request.account_id = row["account_id"]
return fn(*a, **kw)
return wrapper
# ─── Public API ───
@app.post("/api/auth/challenge")
def auth_challenge():
data = request.get_json(force=True) or {}
account_id = (data.get("accountId") or "").lower()
if len(account_id) != 64:
return jsonify({"error": "bad_account_id"}), 400
challenge = secrets.token_bytes(32)
expires_at = now_ms() + CHALLENGE_TTL_MS
token = make_challenge_token(account_id, challenge, expires_at)
return jsonify({"challengeId": token, "challenge": challenge.hex(), "expiresAt": expires_at, "node": NODE_NAME})
@app.post("/api/auth/verify")
def auth_verify():
data = request.get_json(force=True) or {}
token = data.get("challengeId", "")
sig_hex = data.get("signature", "")
account_id_claimed = (data.get("accountId") or "").lower()
enc_pub = (data.get("encryptionPubkey") or "").lower() or None
parsed = parse_challenge_token(token)
if not parsed:
return jsonify({"error": "challenge_expired_or_invalid"}), 400
aid_in_token, challenge_bytes, expires = parsed
if aid_in_token != account_id_claimed:
return jsonify({"error": "account_mismatch"}), 400
try:
VerifyKey(bytes.fromhex(account_id_claimed)).verify(challenge_bytes, bytes.fromhex(sig_hex))
except (BadSignatureError, ValueError):
return jsonify({"error": "bad_signature"}), 401
session_token = secrets.token_urlsafe(32)
session_expires = now_ms() + SESSION_TTL_MS
created = now_ms()
with db() as conn:
conn.execute(
"INSERT INTO accounts (account_id, encryption_pubkey, created_at_ms, last_seen_at_ms) VALUES (?, ?, ?, ?) "
"ON CONFLICT(account_id) DO UPDATE SET last_seen_at_ms=excluded.last_seen_at_ms, "
"encryption_pubkey=COALESCE(excluded.encryption_pubkey, accounts.encryption_pubkey)",
(account_id_claimed, enc_pub, created, created),
)
conn.execute(
"INSERT INTO sessions (token, account_id, expires_at_ms, created_at_ms) VALUES (?, ?, ?, ?)",
(session_token, account_id_claimed, session_expires, created),
)
replicate_async({"kind": "account", "account_id": account_id_claimed, "encryption_pubkey": enc_pub, "created_at_ms": created, "last_seen_at_ms": created})
replicate_async({"kind": "session", "token": session_token, "account_id": account_id_claimed, "expires_at_ms": session_expires, "created_at_ms": created})
return jsonify({"sessionToken": session_token, "accountId": account_id_claimed, "expiresAt": session_expires, "node": NODE_NAME})
@app.post("/api/msg/send")
@auth_required
def msg_send():
data = request.get_json(force=True) or {}
recipient = (data.get("recipient") or "").lower()
ciphertext = data.get("ciphertext", "")
nonce = data.get("nonce", "")
if len(recipient) != 64 or not ciphertext or not nonce:
return jsonify({"error": "bad_input"}), 400
msg_id = secrets.token_hex(16)
created = now_ms()
with db() as conn:
conn.execute("INSERT INTO accounts (account_id, created_at_ms) VALUES (?, ?) ON CONFLICT(account_id) DO NOTHING", (recipient, created))
conn.execute(
"INSERT INTO messages (msg_id, sender, recipient, ciphertext, nonce, created_at_ms) VALUES (?, ?, ?, ?, ?, ?)",
(msg_id, request.account_id, recipient, ciphertext, nonce, created),
)
replicate_async({"kind": "message", "msg_id": msg_id, "sender": request.account_id, "recipient": recipient, "ciphertext": ciphertext, "nonce": nonce, "created_at_ms": created})
return jsonify({"msgId": msg_id, "createdAt": created, "node": NODE_NAME})
@app.get("/api/msg/inbox")
@auth_required
def msg_inbox():
since = int(request.args.get("since", "0"))
with db() as conn:
rows = conn.execute(
"SELECT msg_id, sender, recipient, ciphertext, nonce, created_at_ms, read_at_ms "
"FROM messages WHERE (recipient=? OR sender=?) AND created_at_ms > ? ORDER BY created_at_ms ASC LIMIT 1000",
(request.account_id, request.account_id, since),
).fetchall()
return jsonify({"messages": [{"msgId": r["msg_id"], "sender": r["sender"], "recipient": r["recipient"], "ciphertext": r["ciphertext"], "nonce": r["nonce"], "createdAt": r["created_at_ms"], "readAt": r["read_at_ms"]} for r in rows]})
@app.get("/api/msg/contacts")
@auth_required
def msg_contacts():
with db() as conn:
rows = conn.execute(
"SELECT CASE WHEN sender=? THEN recipient ELSE sender END AS aid, MAX(created_at_ms) AS last "
"FROM messages WHERE sender=? OR recipient=? GROUP BY 1 ORDER BY 2 DESC LIMIT 200",
(request.account_id, request.account_id, request.account_id),
).fetchall()
return jsonify({"contacts": [{"accountId": r["aid"], "lastMsgAt": r["last"]} for r in rows]})
@app.post("/api/msg/read")
@auth_required
def msg_read():
data = request.get_json(force=True) or {}
msg_ids = data.get("msgIds") or []
if not isinstance(msg_ids, list) or not msg_ids:
return jsonify({"error": "bad_input"}), 400
ts = now_ms()
placeholders = ",".join("?" * len(msg_ids))
with db() as conn:
conn.execute(f"UPDATE messages SET read_at_ms=? WHERE msg_id IN ({placeholders}) AND recipient=?", (ts, *msg_ids, request.account_id))
replicate_async({"kind": "read", "msg_ids": msg_ids, "account_id": request.account_id, "read_at_ms": ts})
return jsonify({"ok": True})
@app.get("/api/account/<account_id>")
@auth_required
def account_info(account_id):
account_id = account_id.lower()
if len(account_id) != 64:
return jsonify({"error": "bad_account_id"}), 400
with db() as conn:
row = conn.execute("SELECT account_id, encryption_pubkey, last_seen_at_ms FROM accounts WHERE account_id=?", (account_id,)).fetchone()
if not row:
return jsonify({"accountId": account_id, "encryptionPubkey": None, "lastSeenAt": None})
return jsonify({"accountId": row["account_id"], "encryptionPubkey": row["encryption_pubkey"], "lastSeenAt": row["last_seen_at_ms"]})
# ─── Internal: replication endpoint ───
@app.post("/api/internal/replicate")
def replicate_in():
raw = request.get_data()
sig = request.headers.get("X-Montana-Sig", "")
expected = hmac.new(SHARED_SECRET.encode(), raw, hashlib.sha256).hexdigest()
if not hmac.compare_digest(sig, expected):
return jsonify({"error": "bad_sig"}), 401
try:
event = json.loads(raw)
apply_event(event)
except Exception as e:
return jsonify({"error": str(e)}), 400
return jsonify({"ok": True, "node": NODE_NAME})
@app.get("/api/health")
def health():
try:
with db() as conn:
conn.execute("SELECT 1").fetchone()
peers = [p["name"] for p in load_peers()]
return jsonify({"ok": True, "service": "montana-messenger-relay", "node": NODE_NAME, "peers": peers, "ts": now_ms()})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
init_schema()
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5010, debug=False)