Skip to content

Neptune + Aurora (Beta): Massive Ingest

The beta environment uses Neptune (graph storage) + Aurora pgvector (vector embeddings) as the primary backend, replacing Neo4j for scalable ingestion of millions of papers.

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                         Input Sources                                     │
│   PubMed (1970s–present) │ PMC Full-Text │ bioRxiv │ PDFs │ CSV/TSV     │
└────────────────────────────────┬────────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────────┐
│                    Extraction Workers (scale: 1–10)                       │
│                                                                          │
│  • Fetch papers from source APIs                                         │
│  • Chunk text (512 tokens, 64 overlap)                                   │
│  • LLM entity extraction (50–100 concurrent Bedrock calls)               │
│  • Schema-based validation (reject garbage entities)                     │
│  • Push results to SQS                                                   │
│                                                                          │
│  Code: pipeline/ingest/sqs_extraction_worker.py                          │
│  Infra: ECS Fargate (4 vCPU / 16 GB)                                    │
└────────────────────────────────┬────────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────────┐
│                         SQS Queue                                        │
│                                                                          │
│  • Main queue: graphrag-beta-extraction-queue (7 day retention)          │
│  • Dead letter queue: graphrag-beta-extraction-dlq (14 day, 3 retries)  │
│  • Decouples extraction from ingestion — scale independently            │
│                                                                          │
│  Infra: cdk_resources/stacks/extraction_queue.py                         │
└────────────────────────────────┬────────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────────┐
│                    Ingestion Workers (scale: 1–5)                         │
│                                                                          │
│  • Read messages from SQS (long-poll, auto-throttle)                    │
│  • Batch UNWIND to Neptune (5000 nodes/tx)                              │
│  • Batch INSERT to Aurora pgvector (100 embeddings/flush)               │
│  • Self-throttling — won't overwhelm the database                       │
│                                                                          │
│  Code: pipeline/ingest/sqs_ingestion_worker.py                           │
│  Infra: ECS Fargate (1 vCPU / 4 GB)                                     │
└─────────────────────────────────────────────────────────────────────────┘

Data Flow

Step Component Output
1. Fetch PubMed/PMC/bioRxiv API Raw text + metadata
2. Chunk TokenChunker (512 tokens) Text chunks with doc_id
3. Extract Bedrock LLM (Llama 3.1) Nodes + relationships JSON
4. Validate KGSchema (YAML-driven) Filtered entities (no garbage)
5. Queue SQS Buffered messages
6. Write Graph Neptune (OpenCypher UNWIND) Labeled nodes + typed edges
7. Write Vectors Aurora pgvector (HNSW) 768-dim embeddings

Running

Direct Mode (single process, no SQS)

For smaller runs (< 10K papers), use the direct pipeline:

uv run python -m pipeline.ingest.scaled_ingest \
  --queries-file data/queries_bulk.txt \
  --source pubmed \
  --max-results 1000 \
  --concurrency 50

Environment variables:

NEPTUNE_ENDPOINT=neptunedbcluster-xxx.cluster-xxx.eu-north-1.neptune.amazonaws.com
AURORA_ENDPOINT=ragstack-beta-xxx.cluster-xxx.eu-north-1.rds.amazonaws.com
AURORA_PASSWORD=xxx
BEDROCK_MODEL_ID=us.meta.llama3-1-8b-instruct-v1:0
BEDROCK_REGION=us-east-1
AWS_REGION=eu-north-1

SQS Mode (decoupled, for massive scale)

For millions of papers, use the SQS-decoupled pipeline:

# Terminal 1: Extraction worker (scale to N instances)
SQS_QUEUE_URL=https://sqs.eu-north-1.amazonaws.com/357836458011/graphrag-beta-extraction-queue \
uv run python -m pipeline.ingest.sqs_extraction_worker \
  -q data/queries_bulk.txt --source pubmed -n 5000 -c 100

# Terminal 2: Ingestion worker (throttled DB writes)
SQS_QUEUE_URL=https://sqs.eu-north-1.amazonaws.com/357836458011/graphrag-beta-extraction-queue \
uv run python -m pipeline.ingest.sqs_ingestion_worker

Weekly Scheduled Worker

Runs every Sunday at 02:00 UTC via EventBridge → ECS:

# Manual trigger
uv run python -m pipeline.ingest.weekly_worker

Automatically ingests papers published in the last 7 days across all queries in data/queries_bulk.txt.

From the Frontend UI

The beta React frontend (https://graphrag-beta-react.mlapps.olink.systems) has an ingestion form:

  1. Select source (PubMed / PMC / bioRxiv / PDF)
  2. Enter search query
  3. Set max results (up to 50,000)
  4. Select target: Neptune + Aurora (recommended)
  5. Submit → background job with real-time progress

API: POST /v1/ingestion/jobs with target: "neptune"

Key Files

File Purpose
pipeline/ingest/scaled_ingest.py Core pipeline: async LLM pool + Neptune writer + Aurora writer + checkpointing
pipeline/ingest/sqs_bridge.py SQS producer/consumer interface
pipeline/ingest/sqs_extraction_worker.py Extraction worker (produces to SQS)
pipeline/ingest/sqs_ingestion_worker.py Ingestion worker (consumes from SQS)
pipeline/ingest/weekly_worker.py Scheduled weekly ingest (last 7 days)
schema/kg_schema.yaml Entity types, relationship constraints, extraction rules
src/models/kg_schema.py Schema loader + prompt generator + validator
cdk_resources/stacks/extraction_queue.py SQS queue + DLQ CDK construct
cdk_resources/stacks/ingest_task_stack.py ECS Fargate task definition (4 vCPU / 16 GB)
cdk_resources/stacks/weekly_ingest_schedule.py EventBridge cron rule
Dockerfile.ingest Container image for ingestion tasks
scripts/run_ingest_task.sh One-liner to launch ECS ingestion task

Scale Estimates

Scale Papers Extraction Time Ingestion Time Cost
Test 100 30 sec 10 sec $0.08
Small 10,000 7 min 3 min $8
Medium 100,000 1.5 hours 30 min $80
Large 1,000,000 11 hours 5 hours $800
Full (since 1970s) 5,000,000+ 2.5 days 1 day $4,000

Extraction time assumes 50 concurrent Bedrock calls. Scale to 100+ for faster throughput. Ingestion time assumes single worker. Scale to 3–5 workers for faster DB writes.

Neptune vs Neo4j

Neo4j (Prod) Neptune (Beta)
Write speed ~500 rels/sec (via tunnel) ~5,000 rels/sec (UNWIND, VPC-internal)
Concurrent writes Single connection Multiple workers via SQS
Scaling Vertical only (single ECS task) Serverless auto-scale (1–128 NCU)
Vector search Built-in (limited) Aurora pgvector (HNSW, dedicated)
Cost at idle ~$50/month (always-on) ~$30/month (scales to near-zero)
Backup/restore EFS snapshots (slow restore) Continuous automated backups

Future Directions

  1. Auto-scaling extraction workers — CloudWatch alarm on SQS queue depth triggers additional ECS tasks
  2. Bedrock batch inference — use Bedrock's batch API for 50% cost reduction on large runs
  3. Incremental deduplication — detect and merge duplicate entities across ingestion runs
  4. Citation graph — extract paper-to-paper citations as relationships
  5. Full-text indexing — Neptune fulltext search for keyword queries alongside graph traversal
  6. Embedding model upgrade — migrate from all-mpnet-base-v2 (768d) to domain-specific biomedical model
  7. Real-time streaming — EventBridge pipe from PubMed RSS → SQS → extraction → Neptune (sub-hour latency for new papers)
  8. Multi-region — replicate Neptune to us-east-1 for lower-latency Bedrock calls