Skip to content

Ingestion Pipeline

Overview

The ingestion pipeline transforms raw scientific data into a queryable knowledge graph. It supports multiple data sources, LLM-powered entity extraction, multi-strategy entity consolidation, and relationship consolidation with full evidence preservation.

flowchart LR
    subgraph sources["Data Sources"]
        S1[PubMed]
        S2[bioRxiv]
        S3[PMC]
        S4[PDF]
        S5[CSV / Parquet]
    end

    subgraph extract["Per-Chunk Extraction"]
        E1[LLM Entity\nExtraction]
        E2[Ontology\nFiltering]
        E3[Protein\nLinking]
        E1 --> E2 --> E3
    end

    subgraph post["Post-Processing"]
        P1[Node Labeling\n& Standardization]
        P2[Entity\nConsolidation]
        P3[Relationship\nConsolidation]
        P4[Vector\nEmbeddings]
        P1 --> P2 --> P3 --> P4
    end

    sources --> extract --> post

Data Sources

Source CLI Flag Description
PubMed --source pubmed (default) Abstracts via Entrez API
bioRxiv --source biorxiv Full-text preprints with PDF extraction
PMC --source pmc Open-access full-text via BioC API
PDF --source pdf Local PDF files with multi-strategy extraction
CSV/Parquet/TXT enrichment_main.py Tabular data with column analysis

Pipeline Stages

Ingest disease ontologies and protein dictionaries to establish canonical IDs:

# Disease ontology (MONDO IDs)
uv run python enrichment_main.py \
  --file src/utils/mondo.obo \
  --database olink1 \
  --column-handlers "0:mondo-id,1:disease-name,2:synonyms"

# Protein dictionaries (UniProt IDs)
uv run python enrichment_main.py \
  --file src/utils/uniprot_ids_human.csv \
  --database olink1 \
  --column-handlers "0:uniprot-id,1:protein-name,2:gene-symbol,3:synonyms"

Stage 2: Literature Ingestion

# PubMed abstracts
uv run python ingest_main.py \
  --search-term "cardiovascular disease protein biomarker" \
  --max-results 500 --database olink1 --service local

# Using a queries file (one MeSH query per line)
uv run python ingest_main.py \
  --queries-file data/queries.txt \
  --max-results 100 --database olink1

# bioRxiv preprints
uv run python ingest_main.py \
  --source biorxiv --search-term "protein biomarker" \
  --max-results 50 --database olink1

# PMC full-text
uv run python ingest_main.py \
  --source pmc --search-term "cardiovascular disease protein biomarker" \
  --max-results 50 --database olink1

# Local PDFs
uv run python ingest_main.py \
  --source pdf --pdf-files paper1.pdf paper2.pdf \
  --database olink1

Stage 3: Entity Consolidation

Multi-strategy deduplication merges entities into canonical nodes:

uv run python -m pipeline.processors.entity_resolver \
  --database olink1 --operation full

Strategies (in order):

  1. UniProt ID matching — exact match on uniprot_id (highest accuracy)
  2. Synonym/gene symbol matching — case-insensitive synonym comparison
  3. Fuzzy name matching — 85% similarity threshold for typos/abbreviations
  4. MONDO ID matching — links diseases to ontology hierarchy

Stage 4: Relationship Consolidation

Merges duplicate edges while preserving all evidence:

uv run python ingest_main.py \
  --consolidate-relationships --database olink1

Before: 47 duplicate ASSOCIATES_WITH edges between TP53 and Cancer After: 1 consolidated edge with evidence_sources: 47, all 47 PMIDs, confidence statistics, temporal tracking

Stage 5: Vector Embeddings

Required for semantic search:

uv run python ingest_main.py \
  --add-graph-embeddings --database olink1 --service bedrock

Stage 6: Validation

uv run python -m pipeline.processors.entity_resolver \
  --database olink1 --operation validate

Two-Phase ETL (Extract-Only + Bulk Load)

For large-scale runs or environments where Neo4j deadlocks are a concern, decouple extraction from database writes:

Phase 1 — Extract to JSONL (no Neo4j writes, max parallelism):

uv run python ingest_main.py \
  --extract-only /tmp/extraction_output \
  --search-term "cardiovascular disease protein biomarker" \
  --max-results 1000 --database olink1 --service bedrock

Phase 2 — Bulk-load into Neo4j (single-threaded, no conflicts):

uv run python ingest_main.py \
  --load-from /tmp/extraction_output \
  --database olink1 --load-batch-size 500 --embed-chunks

The --embed-chunks flag generates vector embeddings for chunk text during the bulk load. The bulk loader auto-detects both per-query subdirectories and flat layouts.

Parallel Ingestion

For bulk overnight runs across all sources (Neo4j):

uv run python parallel_ingest.py \
  -q data/queries_bulk.txt \
  --biorxiv-queries-file data/queries_biorxiv.txt \
  -d olink1 -s bedrock \
  --pubmed-max 500 --pmc-max 200 --biorxiv-max 100
Flag Default Description
--queries-file / -q required Queries file (PubMed/PMC)
--biorxiv-queries-file same as -q Separate queries for bioRxiv
--pubmed-max 500 Max results per query for PubMed
--pmc-max 200 Max results per query for PMC
--biorxiv-max 100 Max results per query for bioRxiv
--skip-node-labeling off Skip post-ingestion labeling (saves hours)
--skip-pubmed/pmc/biorxiv off Skip individual sources
--dry-run off Preview without executing

Performance

Use --skip-node-labeling during bulk ingestion and run labeling separately on a schedule. For overnight runs through AWS SSM tunnel: caffeinate -s bash scripts/tunnel_keepalive.sh

Scaled Ingestion (Neptune + Aurora)

For ingesting millions of papers using the SQS-decoupled pipeline with Neptune + Aurora pgvector, see Neptune Massive Ingest — the next-generation architecture for production-scale KG building.

Detailed Guides

For deeper dives into specific pipeline components:

Guide Description
Extraction Architecture & Rationale Design decisions behind the LLM extraction pipeline
KG Creation Guide Step-by-step knowledge graph assembly
Entity Consolidation Multi-strategy deduplication in depth
PDF Extraction Multi-strategy PDF text and table extraction
Preprint Integration bioRxiv and preprint-specific handling
Multimodal Processing Image and table extraction from documents

Pipeline Enhancements

  • Token-based chunking — tiktoken (512 tokens, 64 overlap) with sentence boundary preservation
  • Gleaning extraction — multiple LLM passes per chunk to catch missed entities
  • Progress tracking — structured JSON events to pipeline_progress.json
  • Cost tracking — token usage and estimated USD per LLM call
  • Audit logging — JSON-line entries to audit.log