ChatGPT’s launch in November changed the conversation around AI. Suddenly, every product team wants AI features. But there’s a significant gap between impressive demos and reliable production systems. The excitement is justified, but execution requires engineering discipline.
Here’s what matters when putting AI in production.
Demo vs. Production
The Reality Gap
demo_vs_production:
demo:
- Works on curated examples
- Cherry-picked outputs
- No error handling needed
- Latency doesn't matter
- Cost is ignored
production:
- Must handle edge cases
- Consistent quality required
- Graceful degradation needed
- Latency is critical
- Cost is real
Production Requirements
production_requirements:
reliability:
- Handle API failures
- Timeout management
- Retry with backoff
- Fallback strategies
quality:
- Output validation
- Content filtering
- Hallucination detection
- Human-in-the-loop options
performance:
- Latency optimization
- Caching strategies
- Async processing
- Queue management
cost:
- Token optimization
- Caching to reduce calls
- Model selection
- Usage monitoring
Architecture Patterns
Basic Integration
# Simple but not production-ready
def get_ai_response(prompt):
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Production-Grade Integration
import openai
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit
import redis
import hashlib
class AIService:
def __init__(self, redis_client, cache_ttl=3600):
self.cache = redis_client
self.cache_ttl = cache_ttl
def _cache_key(self, prompt, model):
content = f"{model}:{prompt}"
return f"ai_cache:{hashlib.sha256(content.encode()).hexdigest()}"
def _get_cached(self, prompt, model):
key = self._cache_key(prompt, model)
cached = self.cache.get(key)
return cached.decode() if cached else None
def _set_cached(self, prompt, model, response):
key = self._cache_key(prompt, model)
self.cache.setex(key, self.cache_ttl, response)
@circuit(failure_threshold=5, recovery_timeout=60)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def _call_api(self, messages, model, temperature):
response = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=temperature,
timeout=30
)
return response.choices[0].message.content
def generate(self, prompt, model="gpt-3.5-turbo", temperature=0.7, use_cache=True):
# Check cache first
if use_cache and temperature == 0:
cached = self._get_cached(prompt, model)
if cached:
return {"content": cached, "cached": True}
# Call API
messages = [{"role": "user", "content": prompt}]
try:
content = self._call_api(messages, model, temperature)
except Exception as e:
# Fallback strategy
return {"error": str(e), "fallback": True}
# Cache deterministic responses
if use_cache and temperature == 0:
self._set_cached(prompt, model, content)
return {"content": content, "cached": False}
Quality Assurance
Output Validation
# Validate AI outputs before using
def validate_json_response(response, schema):
"""Validate that AI response matches expected schema."""
try:
data = json.loads(response)
jsonschema.validate(data, schema)
return data, None
except json.JSONDecodeError as e:
return None, f"Invalid JSON: {e}"
except jsonschema.ValidationError as e:
return None, f"Schema validation failed: {e.message}"
def validate_with_retry(prompt, schema, max_attempts=3):
"""Retry generation if validation fails."""
for attempt in range(max_attempts):
response = ai_service.generate(prompt, temperature=0)
data, error = validate_json_response(response["content"], schema)
if data:
return data
# Add validation error to prompt for retry
prompt = f"{prompt}\n\nPrevious response was invalid: {error}. Please try again."
return None
Content Filtering
content_safety:
input_filtering:
- PII detection
- Prompt injection detection
- Content policy checks
output_filtering:
- Toxic content detection
- PII in responses
- Factual claim validation
- Brand safety checks
def filter_response(content, context):
"""Apply content filtering to AI responses."""
# Check for PII
if contains_pii(content):
content = redact_pii(content)
# Check for toxic content
if is_toxic(content):
return None, "Response filtered for content policy"
# Validate claims if needed
if context.requires_fact_checking:
claims = extract_claims(content)
for claim in claims:
if not verify_claim(claim, context.knowledge_base):
content = add_disclaimer(content, claim)
return content, None
Latency Optimization
Streaming Responses
async def stream_response(prompt):
"""Stream AI response for better perceived latency."""
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Async Processing
# For non-interactive use cases
async def process_batch(items):
"""Process multiple items concurrently."""
tasks = [process_item(item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
# Queue-based processing for background tasks
def enqueue_ai_task(task_data):
"""Enqueue AI task for background processing."""
job = queue.enqueue(
'ai_worker.process',
task_data,
job_timeout=300,
result_ttl=3600
)
return job.id
Cost Management
Token Optimization
cost_optimization:
prompt_engineering:
- Concise prompts
- Efficient formatting
- Avoid repetition
model_selection:
- gpt-3.5-turbo for simple tasks
- gpt-4 only when needed
- Fine-tuned models for specific use cases
caching:
- Cache deterministic responses
- Cache embeddings
- Cache similar queries
batching:
- Batch API calls where possible
- Use embeddings for classification
Usage Monitoring
def track_usage(func):
"""Decorator to track AI usage and costs."""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
# Log usage metrics
metrics.increment('ai.requests')
metrics.timing('ai.latency', time.time() - start_time)
if hasattr(result, 'usage'):
metrics.increment('ai.tokens.prompt', result.usage.prompt_tokens)
metrics.increment('ai.tokens.completion', result.usage.completion_tokens)
# Approximate cost calculation
cost = calculate_cost(result.usage, kwargs.get('model'))
metrics.increment('ai.cost', cost)
return result
return wrapper
Observability
Logging and Monitoring
ai_observability:
metrics:
- Request rate
- Latency (p50, p95, p99)
- Error rate
- Token usage
- Cache hit rate
- Cost per request
logging:
- Prompts (sanitized)
- Response metadata
- Validation results
- Errors and retries
alerting:
- Error rate spikes
- Latency degradation
- Cost anomalies
- API availability
Key Takeaways
- Demos hide the complexity of production AI systems
- Build retry, circuit breaker, and fallback patterns
- Validate all AI outputs before using them
- Implement content filtering for safety
- Use streaming for better user experience
- Cache deterministic responses to reduce costs
- Monitor usage, latency, and costs closely
- Plan for API failures and degraded service
- Start simple, add complexity as needed
AI in production requires the same engineering discipline as any critical system. The excitement is real—so is the work.