227 lines
9.2 KiB (Stored with Git LFS)
Python
227 lines
9.2 KiB (Stored with Git LFS)
Python
#!/usr/bin/env python3
|
|
"""
|
|
Moltbook Post Collector (Resumable)
|
|
=====================================
|
|
MSc Cybersecurity Research - NCI (National College of Ireland)
|
|
David Keane (IR240474) - Thesis: AI-to-AI Indirect Prompt Injection
|
|
|
|
Collects all posts with full metadata. Resumes from last cursor if stopped.
|
|
Run multiple times — new posts are merged in, duplicates skipped.
|
|
Comments are handled separately by collect_comments.py.
|
|
"""
|
|
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
import json
|
|
import os
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
# Set your API keys as environment variables:
|
|
# export MOLTBOOK_API_KEY_1="moltbook_sk_your_key_here"
|
|
# export MOLTBOOK_API_KEY_2="moltbook_sk_your_second_key_here" # optional
|
|
|
|
BASE_URL = "https://moltbook.com"
|
|
_key1 = os.environ.get("MOLTBOOK_API_KEY_1", "")
|
|
_key2 = os.environ.get("MOLTBOOK_API_KEY_2", "")
|
|
if not _key1:
|
|
raise SystemExit("Error: MOLTBOOK_API_KEY_1 environment variable not set.")
|
|
API_KEYS = {"Account1": _key1}
|
|
if _key2:
|
|
API_KEYS["Account2"] = _key2
|
|
KEY_NAMES = list(API_KEYS.keys())
|
|
PAGE_LIMIT = 100 # posts per page
|
|
RATE_WAIT = 10.0 # seconds to wait when both keys rate-limited
|
|
PAGE_DELAY = 0.5 # seconds between successful page fetches
|
|
|
|
OUT_DIR = Path(__file__).parent
|
|
POSTS_FILE = OUT_DIR / "all_posts_with_comments.json"
|
|
CURSOR_FILE = OUT_DIR / ".posts_cursor.json" # saves last cursor + known IDs
|
|
|
|
# ── API with dual-key switching ───────────────────────────────────────────────
|
|
_key_index = 0
|
|
|
|
def api_get(path: str, params: dict = None) -> dict | list | None:
|
|
global _key_index
|
|
url = BASE_URL + path
|
|
if params:
|
|
url += "?" + urllib.parse.urlencode(params)
|
|
|
|
for attempt in range(10):
|
|
key_name = KEY_NAMES[_key_index % len(KEY_NAMES)]
|
|
key = API_KEYS[key_name]
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"Authorization": f"Bearer {key}",
|
|
"Accept": "application/json",
|
|
"User-Agent": "MoltbookResearchCollector/1.0 (NCI MSc Cybersecurity Thesis)",
|
|
}
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
_key_index += 1
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 429:
|
|
other = KEY_NAMES[(_key_index + 1) % len(KEY_NAMES)]
|
|
print(f" [429 {key_name}] → switching to {other}...", flush=True)
|
|
_key_index += 1
|
|
if attempt % 2 == 1:
|
|
print(f" [both rate-limited] waiting {RATE_WAIT}s...", flush=True)
|
|
time.sleep(RATE_WAIT)
|
|
else:
|
|
body = e.read().decode("utf-8", errors="replace")[:200]
|
|
print(f" [HTTP {e.code}] {url} → {body}", flush=True)
|
|
return None
|
|
except Exception as ex:
|
|
print(f" [ERROR] {url} → {ex}", flush=True)
|
|
return None
|
|
|
|
print(f" [FAIL] gave up: {url}", flush=True)
|
|
return None
|
|
|
|
# ── Cursor progress ───────────────────────────────────────────────────────────
|
|
def load_cursor() -> tuple[str | None, set]:
|
|
"""Returns (last_cursor, set_of_known_post_ids)."""
|
|
if CURSOR_FILE.exists():
|
|
try:
|
|
c = json.loads(CURSOR_FILE.read_text())
|
|
cursor = c.get("last_cursor")
|
|
known_ids = set(c.get("known_ids", []))
|
|
saved_at = c.get("saved_at", "unknown")
|
|
print(f" Resuming from cursor saved at {saved_at}")
|
|
print(f" Known posts: {len(known_ids):,} | cursor: {str(cursor)[:40]}...")
|
|
return cursor, known_ids
|
|
except Exception:
|
|
pass
|
|
return None, set()
|
|
|
|
def save_cursor(cursor: str | None, known_ids: set):
|
|
CURSOR_FILE.write_text(json.dumps({
|
|
"last_cursor": cursor,
|
|
"known_ids": list(known_ids),
|
|
"count": len(known_ids),
|
|
"saved_at": datetime.now(timezone.utc).isoformat(),
|
|
}))
|
|
|
|
# ── Load existing posts ───────────────────────────────────────────────────────
|
|
def load_existing() -> tuple[dict, list]:
|
|
"""Load existing JSON file. Returns (data_dict, posts_list)."""
|
|
if POSTS_FILE.exists():
|
|
try:
|
|
with open(POSTS_FILE, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
posts = data.get("posts", []) if isinstance(data, dict) else data
|
|
print(f" Loaded existing file: {len(posts):,} posts ({POSTS_FILE.stat().st_size // 1024:,} KB)")
|
|
return (data if isinstance(data, dict) else {}), posts
|
|
except Exception as ex:
|
|
print(f" [WARN] Could not load existing file: {ex}")
|
|
return {}, []
|
|
|
|
# ── Save ──────────────────────────────────────────────────────────────────────
|
|
def save_posts(data: dict, posts: list):
|
|
data["posts"] = posts
|
|
data["total_posts"] = len(posts)
|
|
data["last_updated"] = datetime.now(timezone.utc).isoformat()
|
|
data.setdefault("research", "MSc Cybersecurity NCI - AI-to-AI Indirect Prompt Injection")
|
|
data.setdefault("researcher", "David Keane IR240474")
|
|
with open(POSTS_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
print(f" Saved: {POSTS_FILE} ({POSTS_FILE.stat().st_size // 1024:,} KB)", flush=True)
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
def main():
|
|
print("=" * 60)
|
|
print("Moltbook Post Collector (Resumable)")
|
|
print("NCI MSc Cybersecurity - Indirect Prompt Injection Research")
|
|
print(f"Started: {datetime.now(timezone.utc).isoformat()}")
|
|
print("=" * 60)
|
|
|
|
# Load what we already have
|
|
data, existing_posts = load_existing()
|
|
cursor, known_ids = load_cursor()
|
|
|
|
# If no cursor file but we have posts, rebuild known_ids from existing posts
|
|
if not known_ids and existing_posts:
|
|
known_ids = {p.get("id") for p in existing_posts if p.get("id")}
|
|
print(f" Rebuilt {len(known_ids):,} known IDs from existing posts.")
|
|
|
|
posts_by_id = {p.get("id"): p for p in existing_posts}
|
|
page = 0
|
|
new_count = 0
|
|
|
|
print(f"\n=== Collecting posts (starting from {'cursor' if cursor else 'beginning'}) ===")
|
|
|
|
while True:
|
|
page += 1
|
|
params = {"limit": PAGE_LIMIT}
|
|
if cursor:
|
|
params["cursor"] = cursor
|
|
|
|
print(f" Page {page} | cursor={str(cursor)[:30] if cursor else 'None'} | total so far: {len(posts_by_id):,}", flush=True)
|
|
|
|
resp = api_get("/api/v1/posts", params=params)
|
|
time.sleep(PAGE_DELAY)
|
|
|
|
if resp is None:
|
|
print(" [WARN] Null response — stopping. Run again to resume.", flush=True)
|
|
break
|
|
|
|
# Parse response shape
|
|
if isinstance(resp, list):
|
|
batch = resp
|
|
has_more = False
|
|
cursor = None
|
|
else:
|
|
batch = resp.get("posts") or resp.get("data") or resp.get("results") or []
|
|
has_more = resp.get("has_more", False)
|
|
cursor = resp.get("next_cursor") or resp.get("cursor") or None
|
|
|
|
# Merge — skip posts we already have
|
|
added = 0
|
|
for post in batch:
|
|
pid = post.get("id")
|
|
if pid and pid not in known_ids:
|
|
posts_by_id[pid] = post
|
|
known_ids.add(pid)
|
|
added += 1
|
|
|
|
new_count += added
|
|
print(f" Got {len(batch)} | {added} new | {len(posts_by_id):,} total", flush=True)
|
|
|
|
# Save cursor after every page so we can resume
|
|
save_cursor(cursor, known_ids)
|
|
|
|
# Save posts every 10 pages
|
|
if page % 10 == 0:
|
|
save_posts(data, list(posts_by_id.values()))
|
|
|
|
if not batch or not has_more or not cursor:
|
|
print(" End of pages — all posts collected!", flush=True)
|
|
break
|
|
|
|
# Final save
|
|
all_posts = list(posts_by_id.values())
|
|
save_posts(data, all_posts)
|
|
|
|
# Clear cursor only if we reached the end naturally
|
|
if not cursor:
|
|
if CURSOR_FILE.exists():
|
|
CURSOR_FILE.unlink()
|
|
print(" Cursor cleared — full collection complete.")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"DONE")
|
|
print(f" Total posts: {len(all_posts):,}")
|
|
print(f" New this run: {new_count:,}")
|
|
print(f" File: {POSTS_FILE} ({POSTS_FILE.stat().st_size // 1024:,} KB)")
|
|
print("=" * 60)
|
|
print("\nNext step: python3 collect_comments.py")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|