Neptune + Aurora (Beta): Massive Ingest¶
The beta environment uses Neptune (graph storage) + Aurora pgvector (vector embeddings) as the primary backend, replacing Neo4j for scalable ingestion of millions of papers.
Architecture¶
┌─────────────────────────────────────────────────────────────────────────┐
│ Input Sources │
│ PubMed (1970s–present) │ PMC Full-Text │ bioRxiv │ PDFs │ CSV/TSV │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌────────────────────────────────▼────────────────────────────────────────┐
│ Extraction Workers (scale: 1–10) │
│ │
│ • Fetch papers from source APIs │
│ • Chunk text (512 tokens, 64 overlap) │
│ • LLM entity extraction (50–100 concurrent Bedrock calls) │
│ • Schema-based validation (reject garbage entities) │
│ • Push results to SQS │
│ │
│ Code: pipeline/ingest/sqs_extraction_worker.py │
│ Infra: ECS Fargate (4 vCPU / 16 GB) │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌────────────────────────────────▼────────────────────────────────────────┐
│ SQS Queue │
│ │
│ • Main queue: graphrag-beta-extraction-queue (7 day retention) │
│ • Dead letter queue: graphrag-beta-extraction-dlq (14 day, 3 retries) │
│ • Decouples extraction from ingestion — scale independently │
│ │
│ Infra: cdk_resources/stacks/extraction_queue.py │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌────────────────────────────────▼────────────────────────────────────────┐
│ Ingestion Workers (scale: 1–5) │
│ │
│ • Read messages from SQS (long-poll, auto-throttle) │
│ • Batch UNWIND to Neptune (5000 nodes/tx) │
│ • Batch INSERT to Aurora pgvector (100 embeddings/flush) │
│ • Self-throttling — won't overwhelm the database │
│ │
│ Code: pipeline/ingest/sqs_ingestion_worker.py │
│ Infra: ECS Fargate (1 vCPU / 4 GB) │
└─────────────────────────────────────────────────────────────────────────┘
Data Flow¶
| Step | Component | Output |
|---|---|---|
| 1. Fetch | PubMed/PMC/bioRxiv API | Raw text + metadata |
| 2. Chunk | TokenChunker (512 tokens) |
Text chunks with doc_id |
| 3. Extract | Bedrock LLM (Llama 3.1) | Nodes + relationships JSON |
| 4. Validate | KGSchema (YAML-driven) |
Filtered entities (no garbage) |
| 5. Queue | SQS | Buffered messages |
| 6. Write Graph | Neptune (OpenCypher UNWIND) | Labeled nodes + typed edges |
| 7. Write Vectors | Aurora pgvector (HNSW) | 768-dim embeddings |
Running¶
Direct Mode (single process, no SQS)¶
For smaller runs (< 10K papers), use the direct pipeline:
uv run python -m pipeline.ingest.scaled_ingest \
--queries-file data/queries_bulk.txt \
--source pubmed \
--max-results 1000 \
--concurrency 50
Environment variables:
NEPTUNE_ENDPOINT=neptunedbcluster-xxx.cluster-xxx.eu-north-1.neptune.amazonaws.com
AURORA_ENDPOINT=ragstack-beta-xxx.cluster-xxx.eu-north-1.rds.amazonaws.com
AURORA_PASSWORD=xxx
BEDROCK_MODEL_ID=us.meta.llama3-1-8b-instruct-v1:0
BEDROCK_REGION=us-east-1
AWS_REGION=eu-north-1
SQS Mode (decoupled, for massive scale)¶
For millions of papers, use the SQS-decoupled pipeline:
# Terminal 1: Extraction worker (scale to N instances)
SQS_QUEUE_URL=https://sqs.eu-north-1.amazonaws.com/357836458011/graphrag-beta-extraction-queue \
uv run python -m pipeline.ingest.sqs_extraction_worker \
-q data/queries_bulk.txt --source pubmed -n 5000 -c 100
# Terminal 2: Ingestion worker (throttled DB writes)
SQS_QUEUE_URL=https://sqs.eu-north-1.amazonaws.com/357836458011/graphrag-beta-extraction-queue \
uv run python -m pipeline.ingest.sqs_ingestion_worker
Weekly Scheduled Worker¶
Runs every Sunday at 02:00 UTC via EventBridge → ECS:
Automatically ingests papers published in the last 7 days across all queries in data/queries_bulk.txt.
From the Frontend UI¶
The beta React frontend (https://graphrag-beta-react.mlapps.olink.systems) has an ingestion form:
- Select source (PubMed / PMC / bioRxiv / PDF)
- Enter search query
- Set max results (up to 50,000)
- Select target: Neptune + Aurora (recommended)
- Submit → background job with real-time progress
API: POST /v1/ingestion/jobs with target: "neptune"
Key Files¶
| File | Purpose |
|---|---|
pipeline/ingest/scaled_ingest.py |
Core pipeline: async LLM pool + Neptune writer + Aurora writer + checkpointing |
pipeline/ingest/sqs_bridge.py |
SQS producer/consumer interface |
pipeline/ingest/sqs_extraction_worker.py |
Extraction worker (produces to SQS) |
pipeline/ingest/sqs_ingestion_worker.py |
Ingestion worker (consumes from SQS) |
pipeline/ingest/weekly_worker.py |
Scheduled weekly ingest (last 7 days) |
schema/kg_schema.yaml |
Entity types, relationship constraints, extraction rules |
src/models/kg_schema.py |
Schema loader + prompt generator + validator |
cdk_resources/stacks/extraction_queue.py |
SQS queue + DLQ CDK construct |
cdk_resources/stacks/ingest_task_stack.py |
ECS Fargate task definition (4 vCPU / 16 GB) |
cdk_resources/stacks/weekly_ingest_schedule.py |
EventBridge cron rule |
Dockerfile.ingest |
Container image for ingestion tasks |
scripts/run_ingest_task.sh |
One-liner to launch ECS ingestion task |
Scale Estimates¶
| Scale | Papers | Extraction Time | Ingestion Time | Cost |
|---|---|---|---|---|
| Test | 100 | 30 sec | 10 sec | $0.08 |
| Small | 10,000 | 7 min | 3 min | $8 |
| Medium | 100,000 | 1.5 hours | 30 min | $80 |
| Large | 1,000,000 | 11 hours | 5 hours | $800 |
| Full (since 1970s) | 5,000,000+ | 2.5 days | 1 day | $4,000 |
Extraction time assumes 50 concurrent Bedrock calls. Scale to 100+ for faster throughput. Ingestion time assumes single worker. Scale to 3–5 workers for faster DB writes.
Neptune vs Neo4j¶
| Neo4j (Prod) | Neptune (Beta) | |
|---|---|---|
| Write speed | ~500 rels/sec (via tunnel) | ~5,000 rels/sec (UNWIND, VPC-internal) |
| Concurrent writes | Single connection | Multiple workers via SQS |
| Scaling | Vertical only (single ECS task) | Serverless auto-scale (1–128 NCU) |
| Vector search | Built-in (limited) | Aurora pgvector (HNSW, dedicated) |
| Cost at idle | ~$50/month (always-on) | ~$30/month (scales to near-zero) |
| Backup/restore | EFS snapshots (slow restore) | Continuous automated backups |
Future Directions¶
- Auto-scaling extraction workers — CloudWatch alarm on SQS queue depth triggers additional ECS tasks
- Bedrock batch inference — use Bedrock's batch API for 50% cost reduction on large runs
- Incremental deduplication — detect and merge duplicate entities across ingestion runs
- Citation graph — extract paper-to-paper citations as relationships
- Full-text indexing — Neptune fulltext search for keyword queries alongside graph traversal
- Embedding model upgrade — migrate from all-mpnet-base-v2 (768d) to domain-specific biomedical model
- Real-time streaming — EventBridge pipe from PubMed RSS → SQS → extraction → Neptune (sub-hour latency for new papers)
- Multi-region — replicate Neptune to us-east-1 for lower-latency Bedrock calls