Agentic Workflows in Production

April 1, 2024

AI agents—systems that reason, plan, and take actions—are moving from impressive demos to production deployments. But the gap between a demo agent and a reliable production agent is significant. Reliability, safety, and observability require careful engineering.

Here’s how to build production-ready agentic workflows.

From Demo to Production

The Gap

demo_vs_production:
  demo_agent:
    - Works for cherry-picked examples
    - Failures are interesting
    - No cost constraints
    - Single user testing

  production_agent:
    - Must handle edge cases
    - Failures affect users
    - Cost matters at scale
    - Concurrent users

Production Requirements

production_requirements:
  reliability:
    - Consistent behavior
    - Graceful failure handling
    - Timeout management
    - Recovery mechanisms

  safety:
    - Constrained action space
    - Human approval for risky actions
    - Audit trail
    - Rollback capability

  efficiency:
    - Token optimization
    - Parallel execution where possible
    - Caching of repeated operations
    - Cost controls

  observability:
    - Step-by-step logging
    - Performance metrics
    - Error tracking
    - Debugging support

Architecture Patterns

Supervised Agent Architecture

class SupervisedAgent:
    """Agent with human-in-the-loop for critical actions."""

    def __init__(self, llm, tools, approval_service):
        self.llm = llm
        self.tools = tools
        self.approval_service = approval_service

    async def run(self, task: str, max_steps: int = 10) -> AgentResult:
        steps = []
        context = {"task": task, "history": []}

        for i in range(max_steps):
            # Plan next action
            action = await self._plan_action(context)

            if action.type == "complete":
                return AgentResult(success=True, steps=steps, result=action.result)

            # Check if action requires approval
            if self._requires_approval(action):
                approved = await self.approval_service.request_approval(
                    action=action,
                    context=context,
                    timeout=300  # 5 minutes
                )
                if not approved:
                    steps.append(StepResult(action=action, status="rejected"))
                    continue

            # Execute action
            result = await self._execute_action(action)
            steps.append(StepResult(action=action, result=result))

            # Update context
            context["history"].append({"action": action, "result": result})

        return AgentResult(success=False, steps=steps, error="Max steps reached")

    def _requires_approval(self, action: Action) -> bool:
        HIGH_RISK_ACTIONS = ["delete", "send_email", "make_purchase", "modify_production"]
        return action.name in HIGH_RISK_ACTIONS or action.estimated_cost > 10

Bounded Agent

class BoundedAgent:
    """Agent with strict constraints on actions and resources."""

    def __init__(self, config: AgentConfig):
        self.allowed_tools = set(config.allowed_tools)
        self.max_steps = config.max_steps
        self.max_cost = config.max_cost
        self.timeout = config.timeout
        self.current_cost = 0

    async def run(self, task: str) -> AgentResult:
        start_time = time.time()

        for step in range(self.max_steps):
            # Check constraints
            if time.time() - start_time > self.timeout:
                return AgentResult(error="Timeout exceeded")

            if self.current_cost > self.max_cost:
                return AgentResult(error="Cost limit exceeded")

            # Get next action
            action = await self._plan_action(task)

            # Validate action is allowed
            if action.tool not in self.allowed_tools:
                logger.warning(f"Blocked disallowed tool: {action.tool}")
                continue

            # Execute with cost tracking
            result, cost = await self._execute_with_cost(action)
            self.current_cost += cost

            if result.is_final:
                return AgentResult(success=True, result=result.value)

        return AgentResult(error="Max steps without completion")

Workflow Agent

class WorkflowAgent:
    """Agent that follows predefined workflow with AI decision points."""

    def __init__(self, workflow_definition: dict):
        self.workflow = workflow_definition
        self.state = {}

    async def run(self, input_data: dict) -> WorkflowResult:
        self.state = {"input": input_data, "step_results": {}}
        current_step = self.workflow["start"]

        while current_step != "end":
            step_def = self.workflow["steps"][current_step]

            if step_def["type"] == "ai_decision":
                result = await self._ai_decision(step_def)
            elif step_def["type"] == "action":
                result = await self._execute_action(step_def)
            elif step_def["type"] == "condition":
                result = self._evaluate_condition(step_def)

            self.state["step_results"][current_step] = result
            current_step = self._get_next_step(step_def, result)

        return WorkflowResult(state=self.state)

# Workflow definition
workflow = {
    "start": "classify_request",
    "steps": {
        "classify_request": {
            "type": "ai_decision",
            "prompt": "Classify this request: {input.request}",
            "outputs": ["simple", "complex", "invalid"]
        },
        "handle_simple": {
            "type": "action",
            "tool": "simple_handler",
            "next": "end"
        },
        "handle_complex": {
            "type": "ai_decision",
            "prompt": "Break down into steps: {input.request}",
            "next": "execute_steps"
        }
    }
}

Reliability Patterns

Retry and Recovery

class ResilientAgent:
    async def execute_with_retry(self, action: Action, max_retries: int = 3):
        for attempt in range(max_retries):
            try:
                return await self._execute(action)
            except RetryableError as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
            except NonRetryableError:
                raise

    async def run_with_checkpoint(self, task: str) -> AgentResult:
        """Resume from checkpoint on failure."""
        checkpoint = await self.checkpoint_store.get(task)

        if checkpoint:
            steps = checkpoint.steps
            context = checkpoint.context
        else:
            steps = []
            context = {}

        try:
            result = await self._continue_from(steps, context)
            await self.checkpoint_store.clear(task)
            return result
        except Exception as e:
            await self.checkpoint_store.save(task, steps, context)
            raise

Timeout Management

async def run_with_timeout(self, task: str, timeout: int = 300):
    try:
        return await asyncio.wait_for(
            self._run_internal(task),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        # Clean up and return graceful failure
        await self._cleanup()
        return AgentResult(
            success=False,
            error="Task timed out",
            partial_result=self.state.get("last_result")
        )

Observability

Comprehensive Logging

class ObservableAgent:
    def __init__(self, tracer, metrics):
        self.tracer = tracer
        self.metrics = metrics

    async def run(self, task: str) -> AgentResult:
        with self.tracer.start_span("agent_run") as span:
            span.set_attribute("task", task[:100])
            step_count = 0

            try:
                while not self.is_complete():
                    with self.tracer.start_span("agent_step") as step_span:
                        action = await self._plan_action()
                        step_span.set_attribute("action", action.name)

                        result = await self._execute(action)
                        step_span.set_attribute("result_type", type(result).__name__)

                        step_count += 1

                self.metrics.histogram("agent.steps", step_count)
                self.metrics.increment("agent.success")
                return AgentResult(success=True)

            except Exception as e:
                span.record_exception(e)
                self.metrics.increment("agent.failure")
                raise

Key Takeaways

Agentic workflows are powerful. Build them carefully.