montana/Russian/Intelligence/Moltbook/themed/moltbook-ai-injection-dataset/collect_all.py

227 lines
9.2 KiB
Python
Raw Normal View History

2026-05-04 21:44:36 +03:00
#!/usr/bin/env python3
"""
Moltbook Post Collector (Resumable)
=====================================
MSc Cybersecurity Research - NCI (National College of Ireland)
David Keane (IR240474) - Thesis: AI-to-AI Indirect Prompt Injection
Collects all posts with full metadata. Resumes from last cursor if stopped.
Run multiple times new posts are merged in, duplicates skipped.
Comments are handled separately by collect_comments.py.
"""
import urllib.request
import urllib.error
import urllib.parse
import json
import os
import time
from datetime import datetime, timezone
from pathlib import Path
# ── Config ────────────────────────────────────────────────────────────────────
# Set your API keys as environment variables:
# export MOLTBOOK_API_KEY_1="moltbook_sk_your_key_here"
# export MOLTBOOK_API_KEY_2="moltbook_sk_your_second_key_here" # optional
BASE_URL = "https://moltbook.com"
_key1 = os.environ.get("MOLTBOOK_API_KEY_1", "")
_key2 = os.environ.get("MOLTBOOK_API_KEY_2", "")
if not _key1:
raise SystemExit("Error: MOLTBOOK_API_KEY_1 environment variable not set.")
API_KEYS = {"Account1": _key1}
if _key2:
API_KEYS["Account2"] = _key2
KEY_NAMES = list(API_KEYS.keys())
PAGE_LIMIT = 100 # posts per page
RATE_WAIT = 10.0 # seconds to wait when both keys rate-limited
PAGE_DELAY = 0.5 # seconds between successful page fetches
OUT_DIR = Path(__file__).parent
POSTS_FILE = OUT_DIR / "all_posts_with_comments.json"
CURSOR_FILE = OUT_DIR / ".posts_cursor.json" # saves last cursor + known IDs
# ── API with dual-key switching ───────────────────────────────────────────────
_key_index = 0
def api_get(path: str, params: dict = None) -> dict | list | None:
global _key_index
url = BASE_URL + path
if params:
url += "?" + urllib.parse.urlencode(params)
for attempt in range(10):
key_name = KEY_NAMES[_key_index % len(KEY_NAMES)]
key = API_KEYS[key_name]
req = urllib.request.Request(
url,
headers={
"Authorization": f"Bearer {key}",
"Accept": "application/json",
"User-Agent": "MoltbookResearchCollector/1.0 (NCI MSc Cybersecurity Thesis)",
}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_key_index += 1
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429:
other = KEY_NAMES[(_key_index + 1) % len(KEY_NAMES)]
print(f" [429 {key_name}] → switching to {other}...", flush=True)
_key_index += 1
if attempt % 2 == 1:
print(f" [both rate-limited] waiting {RATE_WAIT}s...", flush=True)
time.sleep(RATE_WAIT)
else:
body = e.read().decode("utf-8", errors="replace")[:200]
print(f" [HTTP {e.code}] {url}{body}", flush=True)
return None
except Exception as ex:
print(f" [ERROR] {url}{ex}", flush=True)
return None
print(f" [FAIL] gave up: {url}", flush=True)
return None
# ── Cursor progress ───────────────────────────────────────────────────────────
def load_cursor() -> tuple[str | None, set]:
"""Returns (last_cursor, set_of_known_post_ids)."""
if CURSOR_FILE.exists():
try:
c = json.loads(CURSOR_FILE.read_text())
cursor = c.get("last_cursor")
known_ids = set(c.get("known_ids", []))
saved_at = c.get("saved_at", "unknown")
print(f" Resuming from cursor saved at {saved_at}")
print(f" Known posts: {len(known_ids):,} | cursor: {str(cursor)[:40]}...")
return cursor, known_ids
except Exception:
pass
return None, set()
def save_cursor(cursor: str | None, known_ids: set):
CURSOR_FILE.write_text(json.dumps({
"last_cursor": cursor,
"known_ids": list(known_ids),
"count": len(known_ids),
"saved_at": datetime.now(timezone.utc).isoformat(),
}))
# ── Load existing posts ───────────────────────────────────────────────────────
def load_existing() -> tuple[dict, list]:
"""Load existing JSON file. Returns (data_dict, posts_list)."""
if POSTS_FILE.exists():
try:
with open(POSTS_FILE, encoding="utf-8") as f:
data = json.load(f)
posts = data.get("posts", []) if isinstance(data, dict) else data
print(f" Loaded existing file: {len(posts):,} posts ({POSTS_FILE.stat().st_size // 1024:,} KB)")
return (data if isinstance(data, dict) else {}), posts
except Exception as ex:
print(f" [WARN] Could not load existing file: {ex}")
return {}, []
# ── Save ──────────────────────────────────────────────────────────────────────
def save_posts(data: dict, posts: list):
data["posts"] = posts
data["total_posts"] = len(posts)
data["last_updated"] = datetime.now(timezone.utc).isoformat()
data.setdefault("research", "MSc Cybersecurity NCI - AI-to-AI Indirect Prompt Injection")
data.setdefault("researcher", "David Keane IR240474")
with open(POSTS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f" Saved: {POSTS_FILE} ({POSTS_FILE.stat().st_size // 1024:,} KB)", flush=True)
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
print("=" * 60)
print("Moltbook Post Collector (Resumable)")
print("NCI MSc Cybersecurity - Indirect Prompt Injection Research")
print(f"Started: {datetime.now(timezone.utc).isoformat()}")
print("=" * 60)
# Load what we already have
data, existing_posts = load_existing()
cursor, known_ids = load_cursor()
# If no cursor file but we have posts, rebuild known_ids from existing posts
if not known_ids and existing_posts:
known_ids = {p.get("id") for p in existing_posts if p.get("id")}
print(f" Rebuilt {len(known_ids):,} known IDs from existing posts.")
posts_by_id = {p.get("id"): p for p in existing_posts}
page = 0
new_count = 0
print(f"\n=== Collecting posts (starting from {'cursor' if cursor else 'beginning'}) ===")
while True:
page += 1
params = {"limit": PAGE_LIMIT}
if cursor:
params["cursor"] = cursor
print(f" Page {page} | cursor={str(cursor)[:30] if cursor else 'None'} | total so far: {len(posts_by_id):,}", flush=True)
resp = api_get("/api/v1/posts", params=params)
time.sleep(PAGE_DELAY)
if resp is None:
print(" [WARN] Null response — stopping. Run again to resume.", flush=True)
break
# Parse response shape
if isinstance(resp, list):
batch = resp
has_more = False
cursor = None
else:
batch = resp.get("posts") or resp.get("data") or resp.get("results") or []
has_more = resp.get("has_more", False)
cursor = resp.get("next_cursor") or resp.get("cursor") or None
# Merge — skip posts we already have
added = 0
for post in batch:
pid = post.get("id")
if pid and pid not in known_ids:
posts_by_id[pid] = post
known_ids.add(pid)
added += 1
new_count += added
print(f" Got {len(batch)} | {added} new | {len(posts_by_id):,} total", flush=True)
# Save cursor after every page so we can resume
save_cursor(cursor, known_ids)
# Save posts every 10 pages
if page % 10 == 0:
save_posts(data, list(posts_by_id.values()))
if not batch or not has_more or not cursor:
print(" End of pages — all posts collected!", flush=True)
break
# Final save
all_posts = list(posts_by_id.values())
save_posts(data, all_posts)
# Clear cursor only if we reached the end naturally
if not cursor:
if CURSOR_FILE.exists():
CURSOR_FILE.unlink()
print(" Cursor cleared — full collection complete.")
print("\n" + "=" * 60)
print(f"DONE")
print(f" Total posts: {len(all_posts):,}")
print(f" New this run: {new_count:,}")
print(f" File: {POSTS_FILE} ({POSTS_FILE.stat().st_size // 1024:,} KB)")
print("=" * 60)
print("\nNext step: python3 collect_comments.py")
if __name__ == "__main__":
main()