Most RAG tutorials show you a Jupyter notebook. Run the cells, watch the magic happen. But notebooks don’t run themselves at 3 AM when new documents arrive.
Production RAG needs orchestration. And data engineers have been orchestrating pipelines for years. Let’s apply what we know.
Why Airflow for RAG?
Apache Airflow gives us everything a production RAG pipeline needs:
- Scheduling: Process new documents on a regular cadence
- Dependency management: Ensure embedding happens after chunking
- Retries: Handle transient API failures gracefully
- Monitoring: Know when something breaks before users complain
- Backfills: Reprocess historical data when you change your chunking strategy
If you’ve built ETL pipelines, this will feel familiar. Because RAG is ETL.
The Pipeline Structure
Our RAG pipeline has four tasks:
extract_documents → transform_documents → load_documents → evaluate_pipeline
This mirrors the Extract-Transform-Load pattern:
- Extract: Load raw documents from source
- Transform: Chunk documents and generate embeddings
- Load: Write to the vector database
- Evaluate: Validate the pipeline worked correctly
The DAG Definition
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'rag_pipeline',
default_args=default_args,
description='RAG document processing pipeline',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['rag', 'llm'],
) as dag:
extract = PythonOperator(
task_id='extract_documents',
python_callable=extract_documents,
)
transform = PythonOperator(
task_id='transform_documents',
python_callable=transform_documents,
)
load = PythonOperator(
task_id='load_documents',
python_callable=load_documents,
)
evaluate = PythonOperator(
task_id='evaluate_pipeline',
python_callable=evaluate_pipeline,
)
extract >> transform >> load >> evaluate
The >> operator defines task dependencies. Transform waits for extract. Load waits for transform. Simple.
Task Implementation
Extract: Loading Documents
def extract_documents(**context):
from pathlib import Path
source_dir = Path("/opt/airflow/include/data/raw")
documents = []
for file_path in source_dir.glob("*.txt"):
documents.append({
'id': file_path.stem,
'content': file_path.read_text(),
'source': str(file_path),
})
context['ti'].xcom_push(key='documents', value=documents)
return len(documents)
XCom (cross-communication) passes data between tasks. The extract task pushes documents; transform pulls them.
Transform: Chunking and Embedding
def transform_documents(**context):
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
documents = context['ti'].xcom_pull(
task_ids='extract_documents', key='documents'
)
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, chunk_overlap=200
)
embeddings = OpenAIEmbeddings()
chunks = []
for doc in documents:
doc_chunks = splitter.split_text(doc['content'])
for i, chunk_text in enumerate(doc_chunks):
chunks.append({
'id': f"{doc['id']}_chunk_{i}",
'text': chunk_text,
'embedding': embeddings.embed_query(chunk_text),
'metadata': {'source': doc['source'], 'chunk_index': i}
})
context['ti'].xcom_push(key='chunks', value=chunks)
return len(chunks)
Load: Writing to Vector Database
def load_documents(**context):
import chromadb
chunks = context['ti'].xcom_pull(
task_ids='transform_documents', key='chunks'
)
client = chromadb.PersistentClient(path="/opt/airflow/include/data/chroma")
collection = client.get_or_create_collection(name="documents")
collection.upsert(
ids=[c['id'] for c in chunks],
embeddings=[c['embedding'] for c in chunks],
documents=[c['text'] for c in chunks],
metadatas=[c['metadata'] for c in chunks],
)
return len(chunks)
Note the upsert operation. This makes the task idempotent—running it twice produces the same result.
Handling Failures
Airflow’s retry mechanism handles transient failures:
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
For the embedding task, this is crucial. OpenAI rate limits happen. Automatic retries with exponential backoff handle most issues without human intervention.
Data Engineering Patterns Applied
Orchestration: Airflow manages dependencies, scheduling, and retries. You focus on the logic.
Idempotency: Tasks can run multiple times safely. Upserts instead of inserts. Deterministic chunk IDs.
Data Lineage: XCom provides a basic lineage trail. For production, add more explicit logging.
Modularity: Each task does one thing. Easy to test, easy to replace.
Observability: Airflow’s UI shows execution history, logs, and task duration.
What’s Next
Our pipeline now runs reliably. But which vector database should we use? ChromaDB is great for development, but what about production scale?
In the next post, we’ll benchmark different vector databases and help you choose the right one for your use case.
This is Part 3 of the “Data Engineering Meets AI” series. Read Part 2: Prompt Version Control