Your LLM application works in development. You deploy it. Users complain it’s slow. Or wrong. Or both.
“What happened?” you ask.
Without observability, you’re debugging in the dark. You don’t know which retrieval step was slow, what context was passed to the model, or why the response was off.
Data engineers solved this problem years ago with distributed tracing. Let’s apply it to LLM applications.
Why LLM Observability is Different
Traditional observability tracks request latency and error rates. LLM applications need more:
- Prompt content: What did we actually send to the model?
- Token usage: How many tokens did this cost?
- Retrieval quality: What documents did we retrieve? Were they relevant?
- Response content: What did the model return?
- Latency breakdown: Time spent in retrieval vs. generation?
You need to trace the full journey: user query → retrieval → prompt construction → LLM call → response.
OpenTelemetry for LLM Tracing
OpenTelemetry is the industry standard for observability. It works for LLM applications too.
Here’s the core setup:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
def setup_tracer(service_name: str):
"""Set up OpenTelemetry tracer."""
resource = Resource(attributes={
"service.name": service_name
})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
Tracing LLM Calls
Create a decorator that automatically traces LLM calls:
from opentelemetry.trace import Status, StatusCode
from functools import wraps
def trace_llm_call(func):
"""Decorator to trace LLM calls."""
@wraps(func)
def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(f"llm.{func.__name__}") as span:
# Record input prompt
if "prompt" in kwargs:
span.set_attribute("llm.prompt", kwargs["prompt"])
elif len(args) > 0:
span.set_attribute("llm.prompt", args[0])
try:
result = func(*args, **kwargs)
# Record output
if isinstance(result, str):
span.set_attribute("llm.completion", result)
elif hasattr(result, "content"):
span.set_attribute("llm.completion", result.content)
# Record token usage
if hasattr(result, "usage"):
span.set_attribute("llm.tokens.prompt", result.usage.prompt_tokens)
span.set_attribute("llm.tokens.completion", result.usage.completion_tokens)
span.set_attribute("llm.tokens.total", result.usage.total_tokens)
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
return wrapper
Now any LLM call is automatically traced:
@trace_llm_call
def ask_question(prompt: str):
from langchain_openai import OpenAI
llm = OpenAI()
return llm.invoke(prompt)
# This call is now fully traced
result = ask_question(prompt="What is the capital of France?")
Tracing the Full RAG Pipeline
A RAG application has multiple steps. Trace each one:
def answer_question(query: str):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("rag.answer_question") as parent_span:
parent_span.set_attribute("rag.query", query)
# Step 1: Embed the query
with tracer.start_as_current_span("rag.embed_query") as span:
query_embedding = embed(query)
span.set_attribute("embedding.dimensions", len(query_embedding))
# Step 2: Retrieve documents
with tracer.start_as_current_span("rag.retrieve") as span:
docs = vector_db.query(query_embedding, n_results=5)
span.set_attribute("retrieval.num_docs", len(docs))
span.set_attribute("retrieval.sources", [d.source for d in docs])
# Step 3: Build prompt
with tracer.start_as_current_span("rag.build_prompt") as span:
context = "\n".join([d.content for d in docs])
prompt = f"Context: {context}\n\nQuestion: {query}"
span.set_attribute("prompt.length", len(prompt))
# Step 4: Call LLM
with tracer.start_as_current_span("rag.generate") as span:
response = llm.invoke(prompt)
span.set_attribute("response.length", len(response))
parent_span.set_attribute("rag.response", response)
return response
Now you can see exactly where time is spent and what data flows through each step.
Key Metrics to Track
Beyond tracing, collect these metrics:
Latency metrics:
- End-to-end response time
- Retrieval latency
- LLM generation latency
Quality metrics:
- Retrieval relevance scores
- Response length
- Error rates
Cost metrics:
- Token usage per request
- Cost per request
- Daily/monthly spend
from opentelemetry import metrics
meter = metrics.get_meter(__name__)
token_counter = meter.create_counter(
"llm.tokens.total",
description="Total tokens used"
)
latency_histogram = meter.create_histogram(
"llm.latency",
description="LLM call latency in milliseconds"
)
def track_llm_metrics(tokens: int, latency_ms: float, model: str):
token_counter.add(tokens, {"model": model})
latency_histogram.record(latency_ms, {"model": model})
Alerting on Anomalies
Set up alerts for:
- Latency spikes: P99 latency > 5 seconds
- Error rate increases: Error rate > 1%
- Cost anomalies: Daily spend > 2x average
- Token usage spikes: Unusual token consumption patterns
# Example: Check for latency anomalies
def check_latency_anomaly(current_p99: float, baseline_p99: float):
if current_p99 > baseline_p99 * 2:
send_alert(
title="LLM Latency Spike",
message=f"P99 latency {current_p99}ms exceeds 2x baseline ({baseline_p99}ms)"
)
Debugging with Traces
When something goes wrong, traces tell the story:
[rag.answer_question] 2340ms
├── [rag.embed_query] 45ms
├── [rag.retrieve] 120ms - retrieved 5 docs
├── [rag.build_prompt] 2ms - 4500 tokens
└── [rag.generate] 2170ms - ERROR: Rate limit exceeded
Now you know: the LLM call failed due to rate limiting, and the prompt was 4500 tokens (maybe too large?).
Production Backends
For production, export traces to a proper backend:
- Jaeger: Open source, self-hosted
- Zipkin: Open source, lightweight
- Honeycomb: SaaS, excellent for high-cardinality data
- Datadog: SaaS, full observability platform
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def setup_production_tracer(service_name: str, otlp_endpoint: str):
resource = Resource(attributes={"service.name": service_name})
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
Data Engineering Patterns Applied
Distributed Tracing: The same technique we use for microservices applies to LLM pipelines. Each step is a span; the full request is a trace.
Metrics and Monitoring: Track the numbers that matter. Token usage is your cloud bill. Latency is your user experience.
Alerting: Don’t wait for users to tell you something’s wrong. Detect anomalies automatically.
Data Lineage: Traces provide lineage. You can see exactly what documents were retrieved and what prompt was constructed for any request.
Conclusion
Observability transforms debugging from guesswork to investigation. When your LLM application misbehaves, you’ll know exactly where to look.
This concludes the “Data Engineering Meets AI” series. We’ve covered:
- RAG as ETL
- Prompt version control
- Airflow orchestration
- Vector database selection
- LLM observability
The theme throughout: AI applications aren’t magic. They’re software systems. Apply the same engineering discipline you’d apply to any production system, and you’ll build AI applications that actually work.
This is Part 5 of the “Data Engineering Meets AI” series. Read Part 4: Vector DB Comparison