You have built prototypes. You have called APIs, written prompts, wired up RAG pipelines, and maybe orchestrated agents with LangChain. Now the question changes from “can I make this work?” to “can I ship this?” Production AI is a different game. You need to pick the right model, keep costs from spiraling, stop the model from saying something catastrophic, measure whether it is actually working, and do all of that while handling real traffic. This lesson is the playbook.
The Model Landscape
The LLM market moves fast, but the tiers stay remarkably stable. Understanding the tiers matters more than memorizing specific model names.
Frontier models — best quality, highest cost, slowest:
- OpenAI GPT-4o, o1, o3
- Anthropic Claude Opus
- Google Gemini Ultra / 2.5 Pro
Mid-tier models — strong quality, reasonable cost, good speed:
- OpenAI GPT-4o-mini
- Anthropic Claude Sonnet
- Google Gemini Flash
- Meta Llama 3.3 70B (open-weight, self-hostable)
Small/fast models — limited reasoning, very cheap, very fast:
- Anthropic Claude Haiku
- Google Gemini Flash 8B
- Meta Llama 3.2 8B (runs on a laptop)
- Mistral 7B, Phi-3
Open-source / self-hosted — no API costs, full control, you handle infra:
- Llama 3 family (Meta)
- Mistral / Mixtral
- Qwen 2.5 (Alibaba)
- DeepSeek V3
The right model is rarely the biggest one. It is the cheapest model that meets your quality bar.
How to Choose a Model
Model selection is an engineering decision, not a brand loyalty exercise. Here is the decision framework:
Step 1: Define your task complexity.
# Simple classification, extraction, formatting
# → Small model (Haiku, GPT-4o-mini, Llama 8B)
SIMPLE_TASKS = [
"Classify this support ticket as billing/technical/general",
"Extract the date and amount from this invoice",
"Reformat this JSON into a markdown table",
]
# Multi-step reasoning, nuanced generation, complex analysis
# → Frontier model (Opus, GPT-4o, Gemini Pro)
COMPLEX_TASKS = [
"Analyze this contract for risks and suggest amendments",
"Debug this distributed system trace and identify the root cause",
"Write a migration plan for this database schema change",
]Step 2: Run a benchmark on YOUR data. Public benchmarks (MMLU, HumanEval) tell you general capability. They do not tell you how a model performs on your specific prompts with your specific data.
import json
import time
from openai import OpenAI
from anthropic import Anthropic
# Build a test set from real examples
test_cases = [
{
"input": "Customer says: I was charged twice for order #4821",
"expected_category": "billing",
"expected_sentiment": "negative",
},
# ... 50-100 real examples with known-good answers
]
def benchmark_model(client, model, test_cases):
results = []
for case in test_cases:
start = time.time()
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "Classify the ticket. Return JSON: {category, sentiment}"},
{"role": "user", "content": case["input"]},
],
temperature=0,
)
latency = time.time() - start
output = json.loads(response.choices[0].message.content)
results.append({
"correct": output["category"] == case["expected_category"],
"latency_ms": latency * 1000,
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
})
accuracy = sum(r["correct"] for r in results) / len(results)
avg_latency = sum(r["latency_ms"] for r in results) / len(results)
total_tokens = sum(r["input_tokens"] + r["output_tokens"] for r in results)
print(f"Model: {model}")
print(f" Accuracy: {accuracy:.1%}")
print(f" Avg latency: {avg_latency:.0f}ms")
print(f" Total tokens: {total_tokens}")
return {"accuracy": accuracy, "avg_latency": avg_latency, "total_tokens": total_tokens}Step 3: Calculate cost per request, not cost per token. A model that costs 10x more per token but needs 3x fewer tokens might be cheaper overall.
Cost Optimization
LLM costs scale with traffic. A prototype that costs $5/day can become $5,000/day at scale. Here are the levers.
Prompt Caching
Most providers now support prompt caching — if the same system prompt prefix is sent repeatedly, you pay reduced rates for the cached portion.
from anthropic import Anthropic
client = Anthropic()
# Long system prompt — cached after first request
SYSTEM_PROMPT = """You are a customer support classifier for Acme Corp.
[... 2000 tokens of classification rules, examples, edge cases ...]
"""
def classify_ticket(ticket_text: str) -> str:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=200,
system=[{
"type": "text",
"text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"}, # Enable prompt caching
}],
messages=[{"role": "user", "content": ticket_text}],
)
# First call: full price. Subsequent calls: system prompt tokens at 90% discount
return response.content[0].textModel Routing
The single biggest cost saver. Route simple requests to cheap models and complex requests to expensive ones.
import re
from openai import OpenAI
client = OpenAI()
def estimate_complexity(user_input: str) -> str:
"""Cheap heuristic to classify task complexity."""
word_count = len(user_input.split())
has_code = bool(re.search(r"```|def |class |function ", user_input))
has_analysis_words = bool(re.search(
r"analyze|compare|explain why|trade-?offs|architect", user_input, re.I
))
if word_count < 30 and not has_code and not has_analysis_words:
return "simple"
elif has_code or has_analysis_words or word_count > 200:
return "complex"
return "medium"
MODEL_MAP = {
"simple": "gpt-4o-mini", # $0.15 / 1M input tokens
"medium": "gpt-4o-mini", # Same model, still cheap enough
"complex": "gpt-4o", # $2.50 / 1M input tokens
}
def routed_completion(user_input: str, system_prompt: str) -> str:
complexity = estimate_complexity(user_input)
model = MODEL_MAP[complexity]
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input},
],
)
return response.choices[0].message.contentFor more sophisticated routing, use a small classifier model to decide which model handles the request:
def smart_route(user_input: str) -> str:
"""Use a cheap model to decide which model should answer."""
routing_response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"Rate this query's complexity as SIMPLE or COMPLEX. "
f"SIMPLE = factual lookup, formatting, classification. "
f"COMPLEX = reasoning, analysis, code generation, creative writing.\n\n"
f"Query: {user_input}\n\nRating:",
}],
max_tokens=10,
temperature=0,
)
rating = routing_response.choices[0].message.content.strip().upper()
return "gpt-4o" if "COMPLEX" in rating else "gpt-4o-mini"Batching
If responses are not time-sensitive, batch requests for lower costs. OpenAI’s Batch API gives a 50% discount.
import json
# Prepare batch file (JSONL format)
requests = []
for i, ticket in enumerate(tickets):
requests.append({
"custom_id": f"ticket-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [
{"role": "system", "content": "Classify this ticket."},
{"role": "user", "content": ticket},
],
"temperature": 0,
},
})
with open("/tmp/batch_input.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
# Submit batch — results arrive within 24 hours at 50% cost
batch_file = client.files.create(file=open("/tmp/batch_input.jsonl", "rb"), purpose="batch")
batch_job = client.batches.create(input_file_id=batch_file.id, endpoint="/v1/chat/completions", completion_window="24h")Guardrails
An LLM in production without guardrails is a liability. You need defenses on both input and output.
Input Validation
import re
def validate_input(user_input: str) -> tuple[bool, str]:
"""Validate user input before sending to LLM."""
# Length check — prevent token bombs
if len(user_input) > 10_000:
return False, "Input too long. Maximum 10,000 characters."
# Basic prompt injection detection
injection_patterns = [
r"ignore (?:all )?(?:previous|above|prior) instructions",
r"you are now",
r"new system prompt",
r"disregard (?:your|the) (?:rules|instructions|guidelines)",
r"pretend you",
]
for pattern in injection_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
return False, "Input contains disallowed patterns."
return True, "ok"Output Filtering
def filter_output(llm_response: str, context: dict) -> str:
"""Post-process LLM output before returning to user."""
# PII detection — crude but effective first pass
import re
# Mask SSNs
cleaned = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN REDACTED]", llm_response)
# Mask credit card numbers
cleaned = re.sub(r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "[CC REDACTED]", cleaned)
# Mask email addresses if not expected
if not context.get("allow_emails"):
cleaned = re.sub(r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b", "[EMAIL REDACTED]", cleaned)
return cleaned
def enforce_format(llm_response: str, expected_format: str = "json") -> dict | None:
"""Ensure LLM output matches expected format."""
if expected_format == "json":
# Strip markdown code fences if present
text = llm_response.strip()
if text.startswith("```"):
text = text.split("\n", 1)[1].rsplit("```", 1)[0]
try:
return json.loads(text)
except json.JSONDecodeError:
return None
return llm_responseContent Moderation
For user-facing applications, run a moderation check before and after the LLM call:
def check_moderation(text: str) -> bool:
"""Returns True if content is safe."""
response = client.moderations.create(
model="omni-moderation-latest",
input=text,
)
return not response.results[0].flagged
def safe_completion(user_input: str, system_prompt: str) -> str:
# Check input
if not check_moderation(user_input):
return "I can't process that request."
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input},
],
)
result = response.choices[0].message.content
# Check output
if not check_moderation(result):
return "I'm unable to provide a response to that query."
return resultEvaluation
“It seems to work” is not an evaluation strategy. You need automated evals that run on every prompt change, every model switch, and every code deploy.
Building an Eval Suite
import json
from dataclasses import dataclass
@dataclass
class EvalCase:
input_text: str
expected_output: str # Or expected properties
category: str # Group evals by type
grade_fn: str # "exact_match", "contains", "llm_judge"
# Define your eval set
eval_suite = [
EvalCase(
input_text="What is our refund policy?",
expected_output="30-day money-back guarantee",
category="factual",
grade_fn="contains",
),
EvalCase(
input_text="I hate your product, give me my money back NOW",
expected_output="empathetic and professional tone",
category="tone",
grade_fn="llm_judge",
),
]
def grade_exact_match(actual: str, expected: str) -> bool:
return actual.strip().lower() == expected.strip().lower()
def grade_contains(actual: str, expected: str) -> bool:
return expected.lower() in actual.lower()
def grade_llm_judge(actual: str, expected_criteria: str) -> bool:
"""Use a strong model to judge the output."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": f"Judge whether this AI response meets the criteria.\n\n"
f"Criteria: {expected_criteria}\n\n"
f"Response: {actual}\n\n"
f"Return ONLY 'PASS' or 'FAIL'.",
}],
temperature=0,
max_tokens=10,
)
return "PASS" in response.choices[0].message.content.upper()
GRADERS = {
"exact_match": grade_exact_match,
"contains": grade_contains,
"llm_judge": grade_llm_judge,
}
def run_eval(system_prompt: str, model: str, suite: list[EvalCase]) -> dict:
results = {"total": 0, "passed": 0, "by_category": {}}
for case in suite:
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": case.input_text},
],
temperature=0,
)
actual = response.choices[0].message.content
grader = GRADERS[case.grade_fn]
passed = grader(actual, case.expected_output)
results["total"] += 1
results["passed"] += int(passed)
cat = results["by_category"].setdefault(case.category, {"total": 0, "passed": 0})
cat["total"] += 1
cat["passed"] += int(passed)
results["score"] = results["passed"] / results["total"] if results["total"] else 0
return resultsRun this in CI. Set a threshold (e.g., 90% pass rate). Block deploys that drop below it.
Observability
You cannot improve what you do not measure. Log every LLM interaction.
import time
import uuid
import logging
from dataclasses import dataclass, asdict
logger = logging.getLogger("llm_obs")
@dataclass
class LLMCall:
request_id: str
model: str
prompt_tokens: int
completion_tokens: int
latency_ms: float
cost_usd: float
status: str # "success", "error", "filtered"
user_id: str | None
# Cost per 1M tokens (input, output) — update as pricing changes
PRICING = {
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
"claude-sonnet-4-20250514": (3.00, 15.00),
"claude-haiku-4-20250414": (0.25, 1.25),
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
input_rate, output_rate = PRICING.get(model, (0, 0))
return (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000
def tracked_completion(model: str, messages: list, user_id: str = None, **kwargs) -> str:
request_id = str(uuid.uuid4())
start = time.time()
try:
response = client.chat.completions.create(model=model, messages=messages, **kwargs)
latency_ms = (time.time() - start) * 1000
usage = response.usage
cost = calculate_cost(model, usage.prompt_tokens, usage.completion_tokens)
call = LLMCall(
request_id=request_id, model=model,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens,
latency_ms=latency_ms, cost_usd=cost,
status="success", user_id=user_id,
)
logger.info("llm_call", extra=asdict(call))
return response.choices[0].message.content
except Exception as e:
latency_ms = (time.time() - start) * 1000
call = LLMCall(
request_id=request_id, model=model,
prompt_tokens=0, completion_tokens=0,
latency_ms=latency_ms, cost_usd=0,
status=f"error:{type(e).__name__}", user_id=user_id,
)
logger.error("llm_call_error", extra=asdict(call))
raiseFor production systems, consider dedicated observability tools like LangSmith (by LangChain) or Langfuse (open-source). They provide trace visualization, cost dashboards, prompt versioning, and eval integration out of the box.
Caching Strategies
LLM calls are slow and expensive. Caching identical or near-identical requests can cut costs dramatically.
Exact-Match Cache
import hashlib
import json
import redis
redis_client = redis.Redis(host="localhost", port=6379, db=0)
CACHE_TTL = 3600 # 1 hour
def cached_completion(model: str, messages: list, temperature: float = 0, **kwargs) -> str:
# Only cache deterministic requests
if temperature > 0:
return tracked_completion(model, messages, **kwargs)
# Build cache key from model + messages
cache_key = hashlib.sha256(
json.dumps({"model": model, "messages": messages}, sort_keys=True).encode()
).hexdigest()
# Check cache
cached = redis_client.get(f"llm:{cache_key}")
if cached:
return cached.decode("utf-8")
# Cache miss — call LLM
result = tracked_completion(model, messages, **kwargs)
redis_client.setex(f"llm:{cache_key}", CACHE_TTL, result)
return resultSemantic Cache
For cases where users ask the same question with different wording, use embedding similarity to find cache hits:
import numpy as np
def get_embedding(text: str) -> list[float]:
response = client.embeddings.create(model="text-embedding-3-small", input=text)
return response.data[0].embedding
def cosine_similarity(a: list[float], b: list[float]) -> float:
a, b = np.array(a), np.array(b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.95):
self.threshold = similarity_threshold
self.entries: list[dict] = [] # In production, use a vector DB
def get(self, query: str) -> str | None:
query_embedding = get_embedding(query)
for entry in self.entries:
similarity = cosine_similarity(query_embedding, entry["embedding"])
if similarity >= self.threshold:
return entry["response"]
return None
def set(self, query: str, response: str):
self.entries.append({
"query": query,
"embedding": get_embedding(query),
"response": response,
})Set the similarity threshold high (0.95+). A false cache hit — returning the wrong answer because two different questions looked similar — is worse than a cache miss.
Error Handling in Production
LLM APIs fail. Rate limits hit. Models go down. Your application needs to handle all of it gracefully.
import time
import random
from openai import (
APIError, RateLimitError, APIConnectionError, APITimeoutError
)
FALLBACK_CHAIN = ["gpt-4o", "gpt-4o-mini", "claude-sonnet-4-20250514"]
def resilient_completion(
messages: list,
model: str = "gpt-4o",
max_retries: int = 3,
use_fallbacks: bool = True,
) -> str:
"""LLM call with retries, exponential backoff, and model fallback."""
models_to_try = [model]
if use_fallbacks:
models_to_try += [m for m in FALLBACK_CHAIN if m != model]
last_error = None
for current_model in models_to_try:
for attempt in range(max_retries):
try:
# Pick client based on model name
if "claude" in current_model:
return _call_anthropic(current_model, messages)
else:
return _call_openai(current_model, messages)
except RateLimitError as e:
wait = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait)
last_error = e
except APITimeoutError as e:
wait = 2 ** attempt
time.sleep(wait)
last_error = e
except (APIError, APIConnectionError) as e:
last_error = e
break # Don't retry server errors on same model — try fallback
# All models and retries exhausted
raise RuntimeError(f"All LLM calls failed. Last error: {last_error}")
def _call_openai(model: str, messages: list) -> str:
response = client.chat.completions.create(model=model, messages=messages, timeout=30)
return response.choices[0].message.content
def _call_anthropic(model: str, messages: list) -> str:
anthropic_client = Anthropic()
# Convert OpenAI message format to Anthropic format
system = next((m["content"] for m in messages if m["role"] == "system"), "")
user_msgs = [m for m in messages if m["role"] != "system"]
response = anthropic_client.messages.create(
model=model, max_tokens=1024, system=system, messages=user_msgs,
)
return response.content[0].textRate Limiting and Queue Management
When your AI feature goes viral (or gets hit by a bot), you need traffic management.
import asyncio
import time
from collections import deque
class TokenBucketRateLimiter:
"""Rate limiter that respects API provider token-per-minute limits."""
def __init__(self, requests_per_minute: int):
self.rpm = requests_per_minute
self.tokens = requests_per_minute
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.rpm, self.tokens + elapsed * (self.rpm / 60))
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / (self.rpm / 60)
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
class PriorityRequestQueue:
"""Queue that processes high-priority requests first."""
def __init__(self, rate_limiter: TokenBucketRateLimiter):
self.high_priority: asyncio.Queue = asyncio.Queue()
self.low_priority: asyncio.Queue = asyncio.Queue()
self.rate_limiter = rate_limiter
async def enqueue(self, request: dict, priority: str = "low"):
queue = self.high_priority if priority == "high" else self.low_priority
future = asyncio.get_event_loop().create_future()
await queue.put((request, future))
return await future # Caller awaits the result
async def process_loop(self):
while True:
# High priority first
if not self.high_priority.empty():
request, future = await self.high_priority.get()
elif not self.low_priority.empty():
request, future = await self.low_priority.get()
else:
await asyncio.sleep(0.1)
continue
await self.rate_limiter.acquire()
try:
result = await asyncio.to_thread(
tracked_completion, request["model"], request["messages"]
)
future.set_result(result)
except Exception as e:
future.set_exception(e)Deployment Patterns
How you deploy your AI feature depends on latency requirements and scale.
Pattern 1: Synchronous API wrapper — simplest, works for low-latency tasks.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class CompletionRequest(BaseModel):
prompt: str
user_id: str
class CompletionResponse(BaseModel):
result: str
model_used: str
@app.post("/api/ai/classify", response_model=CompletionResponse)
async def classify(req: CompletionRequest):
valid, msg = validate_input(req.prompt)
if not valid:
raise HTTPException(status_code=400, detail=msg)
model = smart_route(req.prompt)
result = cached_completion(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": req.prompt},
],
)
result = filter_output(result, {"allow_emails": False})
return CompletionResponse(result=result, model_used=model)Pattern 2: Async processing with webhooks — for tasks that take more than a few seconds.
import uuid
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import httpx
app = FastAPI()
job_store = {} # In production, use Redis or a database
class AsyncRequest(BaseModel):
prompt: str
webhook_url: str
user_id: str
@app.post("/api/ai/analyze")
async def analyze(req: AsyncRequest, background_tasks: BackgroundTasks):
job_id = str(uuid.uuid4())
job_store[job_id] = {"status": "processing"}
background_tasks.add_task(process_and_notify, job_id, req)
return {"job_id": job_id, "status": "processing"}
@app.get("/api/ai/jobs/{job_id}")
async def get_job(job_id: str):
return job_store.get(job_id, {"status": "not_found"})
async def process_and_notify(job_id: str, req: AsyncRequest):
try:
result = resilient_completion(
messages=[
{"role": "system", "content": "Perform deep analysis."},
{"role": "user", "content": req.prompt},
],
model="gpt-4o",
)
job_store[job_id] = {"status": "complete", "result": result}
async with httpx.AsyncClient() as http:
await http.post(req.webhook_url, json={"job_id": job_id, "result": result})
except Exception as e:
job_store[job_id] = {"status": "failed", "error": str(e)}Scaling considerations:
- Horizontal scaling is straightforward — LLM calls are I/O-bound, not CPU-bound
- Use connection pooling for your LLM client
- Set aggressive timeouts (15-30s for most tasks)
- Monitor queue depth as a scaling signal
- Consider separate worker pools for different model tiers (cheap/fast vs expensive/slow)
Key Takeaways
- There is no best model. Match the model to the task. Use the cheapest model that meets your quality bar. Benchmark on YOUR data, not public leaderboards.
- Model routing is the highest-leverage cost optimization. Route simple tasks to cheap models, complex tasks to expensive ones. This alone can cut costs 60-80%.
- Cache aggressively. Exact-match cache for deterministic requests, semantic cache for fuzzy matches. Set temperature to 0 for cacheable requests.
- Guardrails are not optional. Validate inputs (length, injection patterns), filter outputs (PII, format enforcement), and run content moderation for user-facing apps.
- Evaluate with automated evals, not vibes. Build a test suite from real examples. Run it in CI. Block deploys that drop below your quality threshold.
- Log everything. Every LLM call should record model, tokens, latency, cost, and status. You cannot optimize what you do not measure.
- Build for failure. Retries with exponential backoff, fallback model chains, and graceful degradation. LLM APIs will go down.
- Ship incrementally. Start with a single model, a simple prompt, and exact-match caching. Add routing, evals, and guardrails as you learn where the problems are. Over-engineering on day one is a waste — the model landscape will change before you finish.
