Putting AI in Production: Practical Considerations

January 9, 2023

ChatGPT’s launch in November changed the conversation around AI. Suddenly, every product team wants AI features. But there’s a significant gap between impressive demos and reliable production systems. The excitement is justified, but execution requires engineering discipline.

Here’s what matters when putting AI in production.

Demo vs. Production

The Reality Gap

demo_vs_production:
  demo:
    - Works on curated examples
    - Cherry-picked outputs
    - No error handling needed
    - Latency doesn't matter
    - Cost is ignored

  production:
    - Must handle edge cases
    - Consistent quality required
    - Graceful degradation needed
    - Latency is critical
    - Cost is real

Production Requirements

production_requirements:
  reliability:
    - Handle API failures
    - Timeout management
    - Retry with backoff
    - Fallback strategies

  quality:
    - Output validation
    - Content filtering
    - Hallucination detection
    - Human-in-the-loop options

  performance:
    - Latency optimization
    - Caching strategies
    - Async processing
    - Queue management

  cost:
    - Token optimization
    - Caching to reduce calls
    - Model selection
    - Usage monitoring

Architecture Patterns

Basic Integration

# Simple but not production-ready
def get_ai_response(prompt):
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

Production-Grade Integration

import openai
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit
import redis
import hashlib

class AIService:
    def __init__(self, redis_client, cache_ttl=3600):
        self.cache = redis_client
        self.cache_ttl = cache_ttl

    def _cache_key(self, prompt, model):
        content = f"{model}:{prompt}"
        return f"ai_cache:{hashlib.sha256(content.encode()).hexdigest()}"

    def _get_cached(self, prompt, model):
        key = self._cache_key(prompt, model)
        cached = self.cache.get(key)
        return cached.decode() if cached else None

    def _set_cached(self, prompt, model, response):
        key = self._cache_key(prompt, model)
        self.cache.setex(key, self.cache_ttl, response)

    @circuit(failure_threshold=5, recovery_timeout=60)
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def _call_api(self, messages, model, temperature):
        response = openai.ChatCompletion.create(
            model=model,
            messages=messages,
            temperature=temperature,
            timeout=30
        )
        return response.choices[0].message.content

    def generate(self, prompt, model="gpt-3.5-turbo", temperature=0.7, use_cache=True):
        # Check cache first
        if use_cache and temperature == 0:
            cached = self._get_cached(prompt, model)
            if cached:
                return {"content": cached, "cached": True}

        # Call API
        messages = [{"role": "user", "content": prompt}]
        try:
            content = self._call_api(messages, model, temperature)
        except Exception as e:
            # Fallback strategy
            return {"error": str(e), "fallback": True}

        # Cache deterministic responses
        if use_cache and temperature == 0:
            self._set_cached(prompt, model, content)

        return {"content": content, "cached": False}

Quality Assurance

Output Validation

# Validate AI outputs before using
def validate_json_response(response, schema):
    """Validate that AI response matches expected schema."""
    try:
        data = json.loads(response)
        jsonschema.validate(data, schema)
        return data, None
    except json.JSONDecodeError as e:
        return None, f"Invalid JSON: {e}"
    except jsonschema.ValidationError as e:
        return None, f"Schema validation failed: {e.message}"

def validate_with_retry(prompt, schema, max_attempts=3):
    """Retry generation if validation fails."""
    for attempt in range(max_attempts):
        response = ai_service.generate(prompt, temperature=0)
        data, error = validate_json_response(response["content"], schema)
        if data:
            return data
        # Add validation error to prompt for retry
        prompt = f"{prompt}\n\nPrevious response was invalid: {error}. Please try again."
    return None

Content Filtering

content_safety:
  input_filtering:
    - PII detection
    - Prompt injection detection
    - Content policy checks

  output_filtering:
    - Toxic content detection
    - PII in responses
    - Factual claim validation
    - Brand safety checks
def filter_response(content, context):
    """Apply content filtering to AI responses."""
    # Check for PII
    if contains_pii(content):
        content = redact_pii(content)

    # Check for toxic content
    if is_toxic(content):
        return None, "Response filtered for content policy"

    # Validate claims if needed
    if context.requires_fact_checking:
        claims = extract_claims(content)
        for claim in claims:
            if not verify_claim(claim, context.knowledge_base):
                content = add_disclaimer(content, claim)

    return content, None

Latency Optimization

Streaming Responses

async def stream_response(prompt):
    """Stream AI response for better perceived latency."""
    response = await openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    async for chunk in response:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Async Processing

# For non-interactive use cases
async def process_batch(items):
    """Process multiple items concurrently."""
    tasks = [process_item(item) for item in items]
    return await asyncio.gather(*tasks, return_exceptions=True)

# Queue-based processing for background tasks
def enqueue_ai_task(task_data):
    """Enqueue AI task for background processing."""
    job = queue.enqueue(
        'ai_worker.process',
        task_data,
        job_timeout=300,
        result_ttl=3600
    )
    return job.id

Cost Management

Token Optimization

cost_optimization:
  prompt_engineering:
    - Concise prompts
    - Efficient formatting
    - Avoid repetition

  model_selection:
    - gpt-3.5-turbo for simple tasks
    - gpt-4 only when needed
    - Fine-tuned models for specific use cases

  caching:
    - Cache deterministic responses
    - Cache embeddings
    - Cache similar queries

  batching:
    - Batch API calls where possible
    - Use embeddings for classification

Usage Monitoring

def track_usage(func):
    """Decorator to track AI usage and costs."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)

        # Log usage metrics
        metrics.increment('ai.requests')
        metrics.timing('ai.latency', time.time() - start_time)

        if hasattr(result, 'usage'):
            metrics.increment('ai.tokens.prompt', result.usage.prompt_tokens)
            metrics.increment('ai.tokens.completion', result.usage.completion_tokens)
            # Approximate cost calculation
            cost = calculate_cost(result.usage, kwargs.get('model'))
            metrics.increment('ai.cost', cost)

        return result
    return wrapper

Observability

Logging and Monitoring

ai_observability:
  metrics:
    - Request rate
    - Latency (p50, p95, p99)
    - Error rate
    - Token usage
    - Cache hit rate
    - Cost per request

  logging:
    - Prompts (sanitized)
    - Response metadata
    - Validation results
    - Errors and retries

  alerting:
    - Error rate spikes
    - Latency degradation
    - Cost anomalies
    - API availability

Key Takeaways

AI in production requires the same engineering discipline as any critical system. The excitement is real—so is the work.