Most RAG tutorials show you a Jupyter notebook. Run the cells, watch the magic happen. But notebooks don’t run themselves at 3 AM when new documents arrive.

Production RAG needs orchestration. And data engineers have been orchestrating pipelines for years. Let’s apply what we know.

Why Airflow for RAG?

Apache Airflow gives us everything a production RAG pipeline needs:

  • Scheduling: Process new documents on a regular cadence
  • Dependency management: Ensure embedding happens after chunking
  • Retries: Handle transient API failures gracefully
  • Monitoring: Know when something breaks before users complain
  • Backfills: Reprocess historical data when you change your chunking strategy

If you’ve built ETL pipelines, this will feel familiar. Because RAG is ETL.

The Pipeline Structure

Our RAG pipeline has four tasks:

extract_documents  transform_documents  load_documents  evaluate_pipeline

This mirrors the Extract-Transform-Load pattern:

  1. Extract: Load raw documents from source
  2. Transform: Chunk documents and generate embeddings
  3. Load: Write to the vector database
  4. Evaluate: Validate the pipeline worked correctly

The DAG Definition

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'rag_pipeline',
    default_args=default_args,
    description='RAG document processing pipeline',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['rag', 'llm'],
) as dag:

    extract = PythonOperator(
        task_id='extract_documents',
        python_callable=extract_documents,
    )

    transform = PythonOperator(
        task_id='transform_documents',
        python_callable=transform_documents,
    )

    load = PythonOperator(
        task_id='load_documents',
        python_callable=load_documents,
    )

    evaluate = PythonOperator(
        task_id='evaluate_pipeline',
        python_callable=evaluate_pipeline,
    )

    extract >> transform >> load >> evaluate

The >> operator defines task dependencies. Transform waits for extract. Load waits for transform. Simple.

Task Implementation

Extract: Loading Documents

def extract_documents(**context):
    from pathlib import Path

    source_dir = Path("/opt/airflow/include/data/raw")
    documents = []

    for file_path in source_dir.glob("*.txt"):
        documents.append({
            'id': file_path.stem,
            'content': file_path.read_text(),
            'source': str(file_path),
        })

    context['ti'].xcom_push(key='documents', value=documents)
    return len(documents)

XCom (cross-communication) passes data between tasks. The extract task pushes documents; transform pulls them.

Transform: Chunking and Embedding

def transform_documents(**context):
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_openai import OpenAIEmbeddings

    documents = context['ti'].xcom_pull(
        task_ids='extract_documents', key='documents'
    )

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=200
    )
    embeddings = OpenAIEmbeddings()

    chunks = []
    for doc in documents:
        doc_chunks = splitter.split_text(doc['content'])
        for i, chunk_text in enumerate(doc_chunks):
            chunks.append({
                'id': f"{doc['id']}_chunk_{i}",
                'text': chunk_text,
                'embedding': embeddings.embed_query(chunk_text),
                'metadata': {'source': doc['source'], 'chunk_index': i}
            })

    context['ti'].xcom_push(key='chunks', value=chunks)
    return len(chunks)

Load: Writing to Vector Database

def load_documents(**context):
    import chromadb

    chunks = context['ti'].xcom_pull(
        task_ids='transform_documents', key='chunks'
    )

    client = chromadb.PersistentClient(path="/opt/airflow/include/data/chroma")
    collection = client.get_or_create_collection(name="documents")

    collection.upsert(
        ids=[c['id'] for c in chunks],
        embeddings=[c['embedding'] for c in chunks],
        documents=[c['text'] for c in chunks],
        metadatas=[c['metadata'] for c in chunks],
    )
    return len(chunks)

Note the upsert operation. This makes the task idempotent—running it twice produces the same result.

Handling Failures

Airflow’s retry mechanism handles transient failures:

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

For the embedding task, this is crucial. OpenAI rate limits happen. Automatic retries with exponential backoff handle most issues without human intervention.

Data Engineering Patterns Applied

Orchestration: Airflow manages dependencies, scheduling, and retries. You focus on the logic.

Idempotency: Tasks can run multiple times safely. Upserts instead of inserts. Deterministic chunk IDs.

Data Lineage: XCom provides a basic lineage trail. For production, add more explicit logging.

Modularity: Each task does one thing. Easy to test, easy to replace.

Observability: Airflow’s UI shows execution history, logs, and task duration.

What’s Next

Our pipeline now runs reliably. But which vector database should we use? ChromaDB is great for development, but what about production scale?

In the next post, we’ll benchmark different vector databases and help you choose the right one for your use case.


This is Part 3 of the “Data Engineering Meets AI” series. Read Part 2: Prompt Version Control