AI agents—systems that reason, plan, and take actions—are moving from impressive demos to production deployments. But the gap between a demo agent and a reliable production agent is significant. Reliability, safety, and observability require careful engineering.
Here’s how to build production-ready agentic workflows.
From Demo to Production
The Gap
demo_vs_production:
demo_agent:
- Works for cherry-picked examples
- Failures are interesting
- No cost constraints
- Single user testing
production_agent:
- Must handle edge cases
- Failures affect users
- Cost matters at scale
- Concurrent users
Production Requirements
production_requirements:
reliability:
- Consistent behavior
- Graceful failure handling
- Timeout management
- Recovery mechanisms
safety:
- Constrained action space
- Human approval for risky actions
- Audit trail
- Rollback capability
efficiency:
- Token optimization
- Parallel execution where possible
- Caching of repeated operations
- Cost controls
observability:
- Step-by-step logging
- Performance metrics
- Error tracking
- Debugging support
Architecture Patterns
Supervised Agent Architecture
class SupervisedAgent:
"""Agent with human-in-the-loop for critical actions."""
def __init__(self, llm, tools, approval_service):
self.llm = llm
self.tools = tools
self.approval_service = approval_service
async def run(self, task: str, max_steps: int = 10) -> AgentResult:
steps = []
context = {"task": task, "history": []}
for i in range(max_steps):
# Plan next action
action = await self._plan_action(context)
if action.type == "complete":
return AgentResult(success=True, steps=steps, result=action.result)
# Check if action requires approval
if self._requires_approval(action):
approved = await self.approval_service.request_approval(
action=action,
context=context,
timeout=300 # 5 minutes
)
if not approved:
steps.append(StepResult(action=action, status="rejected"))
continue
# Execute action
result = await self._execute_action(action)
steps.append(StepResult(action=action, result=result))
# Update context
context["history"].append({"action": action, "result": result})
return AgentResult(success=False, steps=steps, error="Max steps reached")
def _requires_approval(self, action: Action) -> bool:
HIGH_RISK_ACTIONS = ["delete", "send_email", "make_purchase", "modify_production"]
return action.name in HIGH_RISK_ACTIONS or action.estimated_cost > 10
Bounded Agent
class BoundedAgent:
"""Agent with strict constraints on actions and resources."""
def __init__(self, config: AgentConfig):
self.allowed_tools = set(config.allowed_tools)
self.max_steps = config.max_steps
self.max_cost = config.max_cost
self.timeout = config.timeout
self.current_cost = 0
async def run(self, task: str) -> AgentResult:
start_time = time.time()
for step in range(self.max_steps):
# Check constraints
if time.time() - start_time > self.timeout:
return AgentResult(error="Timeout exceeded")
if self.current_cost > self.max_cost:
return AgentResult(error="Cost limit exceeded")
# Get next action
action = await self._plan_action(task)
# Validate action is allowed
if action.tool not in self.allowed_tools:
logger.warning(f"Blocked disallowed tool: {action.tool}")
continue
# Execute with cost tracking
result, cost = await self._execute_with_cost(action)
self.current_cost += cost
if result.is_final:
return AgentResult(success=True, result=result.value)
return AgentResult(error="Max steps without completion")
Workflow Agent
class WorkflowAgent:
"""Agent that follows predefined workflow with AI decision points."""
def __init__(self, workflow_definition: dict):
self.workflow = workflow_definition
self.state = {}
async def run(self, input_data: dict) -> WorkflowResult:
self.state = {"input": input_data, "step_results": {}}
current_step = self.workflow["start"]
while current_step != "end":
step_def = self.workflow["steps"][current_step]
if step_def["type"] == "ai_decision":
result = await self._ai_decision(step_def)
elif step_def["type"] == "action":
result = await self._execute_action(step_def)
elif step_def["type"] == "condition":
result = self._evaluate_condition(step_def)
self.state["step_results"][current_step] = result
current_step = self._get_next_step(step_def, result)
return WorkflowResult(state=self.state)
# Workflow definition
workflow = {
"start": "classify_request",
"steps": {
"classify_request": {
"type": "ai_decision",
"prompt": "Classify this request: {input.request}",
"outputs": ["simple", "complex", "invalid"]
},
"handle_simple": {
"type": "action",
"tool": "simple_handler",
"next": "end"
},
"handle_complex": {
"type": "ai_decision",
"prompt": "Break down into steps: {input.request}",
"next": "execute_steps"
}
}
}
Reliability Patterns
Retry and Recovery
class ResilientAgent:
async def execute_with_retry(self, action: Action, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await self._execute(action)
except RetryableError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
except NonRetryableError:
raise
async def run_with_checkpoint(self, task: str) -> AgentResult:
"""Resume from checkpoint on failure."""
checkpoint = await self.checkpoint_store.get(task)
if checkpoint:
steps = checkpoint.steps
context = checkpoint.context
else:
steps = []
context = {}
try:
result = await self._continue_from(steps, context)
await self.checkpoint_store.clear(task)
return result
except Exception as e:
await self.checkpoint_store.save(task, steps, context)
raise
Timeout Management
async def run_with_timeout(self, task: str, timeout: int = 300):
try:
return await asyncio.wait_for(
self._run_internal(task),
timeout=timeout
)
except asyncio.TimeoutError:
# Clean up and return graceful failure
await self._cleanup()
return AgentResult(
success=False,
error="Task timed out",
partial_result=self.state.get("last_result")
)
Observability
Comprehensive Logging
class ObservableAgent:
def __init__(self, tracer, metrics):
self.tracer = tracer
self.metrics = metrics
async def run(self, task: str) -> AgentResult:
with self.tracer.start_span("agent_run") as span:
span.set_attribute("task", task[:100])
step_count = 0
try:
while not self.is_complete():
with self.tracer.start_span("agent_step") as step_span:
action = await self._plan_action()
step_span.set_attribute("action", action.name)
result = await self._execute(action)
step_span.set_attribute("result_type", type(result).__name__)
step_count += 1
self.metrics.histogram("agent.steps", step_count)
self.metrics.increment("agent.success")
return AgentResult(success=True)
except Exception as e:
span.record_exception(e)
self.metrics.increment("agent.failure")
raise
Key Takeaways
- Production agents need reliability, safety, and observability
- Bound actions: allow lists, cost limits, timeouts
- Human-in-the-loop for high-risk actions
- Workflow agents combine structure with AI flexibility
- Implement retry, checkpoint, and recovery patterns
- Comprehensive logging enables debugging
- Start with constrained agents, expand carefully
- Monitor step counts, costs, and success rates
- Test with adversarial inputs
- Production agents are systems, not just prompts
Agentic workflows are powerful. Build them carefully.