Skip to main content

Production Observability with MLflow Tracing

· 7 min read

Instrument a customer-facing chatbot for production traffic, then query and analyze trace data to build latency dashboards, error rate monitors, and token usage reports.

Prerequisites
pip install mlflow openai

What You'll Build

import mlflow
import openai

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("production-chatbot")

mlflow.openai.autolog()

client = openai.OpenAI()


@mlflow.trace
def support_chatbot(
user_message: str,
conversation_history: list[dict] | None = None,
) -> str:
messages = [
{
"role": "system",
"content": (
"You are a helpful customer support agent "
"for a SaaS platform. Be concise and "
"actionable."
),
},
]
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})

response = client.chat.completions.create(
model="gpt-5.4-mini",
messages=messages,
)

return response.choices[0].message.content

Test the agent to verify tracing works:

answer = support_chatbot("How do I reset my password?")
print(answer)
# Check http://127.0.0.1:5000 — you should see a trace with
# spans for support_chatbot and the OpenAI chat completion.

Use mlflow.update_current_trace() to attach production metadata -- user identity, session tracking, and environment labels. Tags are mutable (editable after the trace is logged), while metadata is immutable and suited for values fixed at trace creation time.

import uuid


@mlflow.trace
def support_chatbot(
user_message: str,
user_id: str,
session_id: str,
conversation_history: list[dict] | None = None,
) -> str:
# Attach production context to the trace
mlflow.update_current_trace(
tags={
"environment": "production",
"app_version": "2.1.0",
},
metadata={
"mlflow.trace.user": user_id,
"mlflow.trace.session": session_id,
},
)

messages = [
{
"role": "system",
"content": (
"You are a helpful customer support agent "
"for a SaaS platform. Be concise and "
"actionable."
),
},
]
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})

response = client.chat.completions.create(
model="gpt-5.4-mini",
messages=messages,
)

return response.choices[0].message.content


# Simulate a production request
answer = support_chatbot(
user_message="How do I upgrade my plan?",
user_id="user-8f3a2b",
session_id=uuid.uuid4().hex,
)
print(answer)

mlflow.trace.user and mlflow.trace.session are reserved metadata keys that MLflow uses to group traces by user and session in the UI. The environment and app_version tags let you filter traces by deployment context.

By default on OSS MLflow, traces are logged synchronously -- the application blocks until the trace is persisted. In production, set the MLFLOW_ENABLE_ASYNC_TRACE_LOGGING environment variable to decouple tracing from your application's critical path.

import os

# Enable async trace logging (non-blocking)
os.environ["MLFLOW_ENABLE_ASYNC_TRACE_LOGGING"] = "true"

# Optional: tune worker pool and queue size for your
# throughput requirements
os.environ["MLFLOW_ASYNC_TRACE_LOGGING_MAX_WORKERS"] = "10"
os.environ["MLFLOW_ASYNC_TRACE_LOGGING_MAX_QUEUE_SIZE"] = "1000"

Set these environment variables before importing or initializing MLflow. With async logging enabled, trace export happens in background threads. If the queue fills up under extreme load, new traces are dropped with a warning rather than blocking your application.

note

Async logging flushes automatically at program exit. In long-running services, traces are exported continuously in the background with no manual flush required.

At high request volumes, tracing every request is unnecessary and expensive. Use the MLFLOW_TRACE_SAMPLING_RATIO environment variable to sample a fraction of traces.

# Sample 10% of traces in production
os.environ["MLFLOW_TRACE_SAMPLING_RATIO"] = "0.1"

You can also override the sampling ratio per-function using the sampling_ratio_override parameter on @mlflow.trace. This is useful when you want full visibility into a critical code path while sampling the rest.

# Always trace billing operations, regardless of global
# sampling ratio
@mlflow.trace(sampling_ratio_override=1.0)
def process_billing_request(user_id: str, action: str):
mlflow.update_current_trace(
tags={"request_type": "billing"},
metadata={"mlflow.trace.user": user_id},
)
response = client.chat.completions.create(
model="gpt-5.4-mini",
messages=[
{
"role": "system",
"content": "Handle billing inquiries.",
},
{"role": "user", "content": action},
],
)
return response.choices[0].message.content


# Sample only 5% of FAQ traffic
@mlflow.trace(sampling_ratio_override=0.05)
def handle_faq(question: str):
response = client.chat.completions.create(
model="gpt-5.4-mini",
messages=[
{
"role": "system",
"content": "Answer common questions briefly.",
},
{"role": "user", "content": question},
],
)
return response.choices[0].message.content

Simulate realistic production traffic to populate traces for analysis.

import random
import uuid

user_ids = [f"user-{i:04d}" for i in range(20)]
questions = [
"How do I reset my password?",
"Can I export my data?",
"What's the API rate limit?",
"How do I add team members?",
"My integration isn't working.",
"How do I cancel my subscription?",
"Where are the API docs?",
"How do I enable SSO?",
]

for _ in range(30):
uid = random.choice(user_ids)
sid = uuid.uuid4().hex
question = random.choice(questions)
try:
support_chatbot(
user_message=question,
user_id=uid,
session_id=sid,
)
except Exception:
# In production, errors are captured in the trace
# status automatically
pass

Use mlflow.search_traces() to query traces by tags, metadata, status, and time ranges.

import time

# Get all traces from the experiment
all_traces = mlflow.search_traces(
return_type="list",
)
print(f"Total traces: {len(all_traces)}")

# Filter by tag: only production traces
prod_traces = mlflow.search_traces(
filter_string="tag.environment = 'production'",
return_type="list",
)
print(f"Production traces: {len(prod_traces)}")

# Filter by status: find errors
error_traces = mlflow.search_traces(
filter_string="trace.status = 'ERROR'",
return_type="list",
)
print(f"Error traces: {len(error_traces)}")

# Filter by time: traces from the last hour
one_hour_ago = int((time.time() - 3600) * 1000)
recent_traces = mlflow.search_traces(
filter_string=(
f"trace.timestamp_ms > {one_hour_ago}"
),
return_type="list",
)
print(f"Traces in last hour: {len(recent_traces)}")

# Filter by metadata: traces for a specific user
user_traces = mlflow.search_traces(
filter_string=(
"metadata.`mlflow.trace.user` = 'user-0001'"
),
return_type="list",
)
print(f"Traces for user-0001: {len(user_traces)}")

# Combine filters: slow production errors
slow_errors = mlflow.search_traces(
filter_string=(
"tag.environment = 'production' "
"AND trace.status = 'ERROR' "
"AND trace.execution_time_ms > 5000"
),
return_type="list",
)
print(f"Slow production errors: {len(slow_errors)}")

Compute latency distributions, error rates, and token usage patterns from trace data.

import json

traces = mlflow.search_traces(return_type="list")


# --- Latency analysis ---
latencies = []
for t in traces:
if t.info.execution_time_ms is not None:
latencies.append(t.info.execution_time_ms)

latencies.sort()
if latencies:
p50 = latencies[len(latencies) // 2]
p95 = latencies[int(len(latencies) * 0.95)]
p99 = latencies[int(len(latencies) * 0.99)]
print(f"Latency p50={p50}ms p95={p95}ms p99={p99}ms")
# Example: Latency p50=1200ms p95=3400ms p99=5100ms


# --- Error rate ---
total = len(traces)
errors = sum(
1 for t in traces if t.info.status == "ERROR"
)
if total > 0:
error_rate = errors / total * 100
print(f"Error rate: {error_rate:.1f}% ({errors}/{total})")
# Example: Error rate: 3.3% (1/30)


# --- Token usage from trace metadata ---
total_input_tokens = 0
total_output_tokens = 0
for t in traces:
token_meta = t.info.request_metadata.get(
"mlflow.trace.tokenUsage"
)
if token_meta:
usage = json.loads(token_meta)
total_input_tokens += usage.get(
"input_tokens", 0
)
total_output_tokens += usage.get(
"output_tokens", 0
)

print(
f"Token usage: {total_input_tokens} input, "
f"{total_output_tokens} output, "
f"{total_input_tokens + total_output_tokens} total"
)
# Example: Token usage: 4500 input, 6200 output, 10700 total


# --- Per-user latency breakdown ---
from collections import defaultdict

user_latencies = defaultdict(list)
for t in traces:
uid = t.info.request_metadata.get(
"mlflow.trace.user", "unknown"
)
if t.info.execution_time_ms is not None:
user_latencies[uid].append(
t.info.execution_time_ms
)

print("\nPer-user average latency:")
for uid, lats in sorted(user_latencies.items()):
avg = sum(lats) / len(lats)
print(f" {uid}: {avg:.0f}ms ({len(lats)} traces)")

Build a reusable monitoring function that queries trace metrics and fires alerts when thresholds are breached. Run this on a schedule (cron, Airflow, etc.) to catch issues early.

import time


def check_production_health(
lookback_minutes: int = 30,
error_rate_threshold: float = 5.0,
p95_latency_threshold_ms: int = 5000,
):
"""
Query recent traces and check against
alerting thresholds.
"""
cutoff_ms = int(
(time.time() - lookback_minutes * 60) * 1000
)

traces = mlflow.search_traces(
filter_string=(
f"trace.timestamp_ms > {cutoff_ms} "
"AND tag.environment = 'production'"
),
return_type="list",
)

if not traces:
print("No traces in monitoring window.")
return

# Error rate check
total = len(traces)
errors = sum(
1 for t in traces if t.info.status == "ERROR"
)
error_rate = errors / total * 100

if error_rate > error_rate_threshold:
print(
f"ALERT: Error rate {error_rate:.1f}% "
f"exceeds {error_rate_threshold}% threshold"
)

# Latency check
latencies = sorted(
t.info.execution_time_ms
for t in traces
if t.info.execution_time_ms is not None
)
if latencies:
p95 = latencies[int(len(latencies) * 0.95)]
if p95 > p95_latency_threshold_ms:
print(
f"ALERT: p95 latency {p95}ms "
f"exceeds {p95_latency_threshold_ms}ms "
f"threshold"
)

# Summary
print(
f"Health check: {total} traces, "
f"{error_rate:.1f}% error rate, "
f"p95={latencies[int(len(latencies) * 0.95)]}ms"
if latencies
else f"Health check: {total} traces, "
f"{error_rate:.1f}% error rate"
)


# Run the health check
check_production_health(
lookback_minutes=60,
error_rate_threshold=5.0,
p95_latency_threshold_ms=5000,
)
# Example output:
# Health check: 30 traces, 3.3% error rate, p95=3400ms

Results

After completing this cookbook, you have:

  1. Structured trace metadata -- every trace carries user ID, session ID, environment, and app version, making it filterable and groupable in the MLflow UI.
  2. Non-blocking trace export -- async logging keeps your application's response time unaffected by tracing overhead.
  3. Sampling controls -- global and per-function sampling ratios reduce storage and compute costs at high throughput, while preserving full visibility into critical paths.
  4. Queryable observability data -- mlflow.search_traces() with tag, metadata, status, and time filters gives you the building blocks for any dashboard or alert.
  5. Monitoring alerts -- a reusable health check function that catches error spikes and latency regressions before users notice.

Next Steps