commit fd2ab321110f363123c295dbaa3862329aec7709
parent a0bf4b555a37ff061384f55a6807ac4235ed17bf
Author: Brian Graham <brian@buildingbetterteams.de>
Date: Fri, 27 Feb 2026 22:34:30 +0100
Add paper claim system for parallel scan agents
scripts/claim.py provides file-based locking to prevent two agents
from scanning the same paper:
- take: claim a paper (fails if already claimed)
- done/fail: release a claim
- list: show unclaimed papers ready to scan
- status: summary of scan progress
- Claims expire after 10 minutes (stale agent recovery)
Claims stored as papers/<slug>/.claimed_<timestamp> files.
Added to .gitignore along with paper.txt (regenerable).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
2 files changed, 166 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -3,6 +3,12 @@ papers/*/paper.pdf
papers/*/*.pdf
inbox/*.pdf
+# Claim locks (ephemeral, used by parallel scan agents)
+papers/*/.claimed_*
+
+# Extracted text (regenerable from PDFs)
+papers/*/paper.txt
+
# OS files
.DS_Store
Thumbs.db
diff --git a/scripts/claim.py b/scripts/claim.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python3
+"""
+Paper claim system for parallel scanning. Prevents two agents from
+working on the same paper. Claims expire after 10 minutes.
+
+Claims are stored as empty files: papers/<slug>/.claimed_<timestamp>
+
+Usage:
+ python scripts/claim.py list # List unclaimed papers ready to scan
+ python scripts/claim.py list --limit 10 # First 10 unclaimed
+ python scripts/claim.py take <slug> # Claim a paper (prints "ok" or "taken")
+ python scripts/claim.py done <slug> # Mark scan complete, remove claim
+ python scripts/claim.py fail <slug> # Release claim without completing
+ python scripts/claim.py status # Show claim summary
+"""
+
+import sys
+import time
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parent.parent
+PAPERS_DIR = ROOT / "papers"
+
+CLAIM_PREFIX = ".claimed_"
+CLAIM_EXPIRY_SECONDS = 600 # 10 minutes
+
+
+def get_claim_file(slug):
+ """Find an active (non-expired) claim file for a slug."""
+ paper_dir = PAPERS_DIR / slug
+ if not paper_dir.exists():
+ return None
+ for f in paper_dir.glob(f"{CLAIM_PREFIX}*"):
+ try:
+ ts = float(f.name[len(CLAIM_PREFIX):])
+ if time.time() - ts < CLAIM_EXPIRY_SECONDS:
+ return f
+ else:
+ # Expired — clean up
+ f.unlink()
+ except (ValueError, OSError):
+ f.unlink()
+ return None
+
+
+def is_claimed(slug):
+ return get_claim_file(slug) is not None
+
+
+def claim(slug):
+ """Try to claim a paper. Returns True if claimed, False if already taken."""
+ if is_claimed(slug):
+ return False
+ paper_dir = PAPERS_DIR / slug
+ if not paper_dir.exists():
+ return False
+ claim_file = paper_dir / f"{CLAIM_PREFIX}{time.time():.3f}"
+ claim_file.touch()
+ return True
+
+
+def release(slug):
+ """Release a claim."""
+ paper_dir = PAPERS_DIR / slug
+ for f in paper_dir.glob(f"{CLAIM_PREFIX}*"):
+ f.unlink()
+
+
+def list_ready(limit=None):
+ """List paper slugs that have paper.txt, no scan.json, and no active claim."""
+ ready = []
+ for txt in sorted(PAPERS_DIR.glob("*/paper.txt")):
+ slug = txt.parent.name
+ scan = txt.parent / "scan.json"
+ if scan.exists():
+ continue
+ if is_claimed(slug):
+ continue
+ ready.append(slug)
+ if limit and len(ready) >= limit:
+ break
+ return ready
+
+
+def status():
+ """Show summary of claims and scan progress."""
+ total_txt = 0
+ total_scanned = 0
+ total_claimed = 0
+ total_unclaimed = 0
+
+ for txt in PAPERS_DIR.glob("*/paper.txt"):
+ total_txt += 1
+ slug = txt.parent.name
+ scan = txt.parent / "scan.json"
+ if scan.exists():
+ total_scanned += 1
+ elif is_claimed(slug):
+ total_claimed += 1
+ else:
+ total_unclaimed += 1
+
+ print(f"Papers with text: {total_txt}")
+ print(f" Scanned: {total_scanned}")
+ print(f" Claimed: {total_claimed}")
+ print(f" Available: {total_unclaimed}")
+
+
+def main():
+ args = sys.argv[1:]
+ if not args:
+ print("Usage: python scripts/claim.py [list|take|done|fail|status]")
+ sys.exit(1)
+
+ cmd = args[0]
+
+ if cmd == "list":
+ limit = None
+ for i, arg in enumerate(args):
+ if arg == "--limit" and i + 1 < len(args):
+ limit = int(args[i + 1])
+ ready = list_ready(limit)
+ for slug in ready:
+ print(slug)
+
+ elif cmd == "take":
+ if len(args) < 2:
+ print("Usage: python scripts/claim.py take <slug>")
+ sys.exit(1)
+ slug = args[1]
+ if claim(slug):
+ print("ok")
+ else:
+ print("taken")
+ sys.exit(1)
+
+ elif cmd == "done":
+ if len(args) < 2:
+ print("Usage: python scripts/claim.py done <slug>")
+ sys.exit(1)
+ release(args[1])
+ print("ok")
+
+ elif cmd == "fail":
+ if len(args) < 2:
+ print("Usage: python scripts/claim.py fail <slug>")
+ sys.exit(1)
+ release(args[1])
+ print("released")
+
+ elif cmd == "status":
+ status()
+
+ else:
+ print(f"Unknown command: {cmd}")
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()