Event-Driven Architecture: Patterns and Practices

July 6, 2020

Event-driven architecture has become the foundation for modern distributed systems. Instead of services calling each other directly, they communicate through events—enabling loose coupling, scalability, and resilience.

Here’s how to build event-driven systems effectively.

Core Concepts

Events vs Commands

Event:
  what: Something that happened (past tense)
  example: "OrderPlaced", "UserCreated", "PaymentCompleted"
  characteristic: Immutable, can have multiple consumers
  coupling: Producer doesn't know/care about consumers

Command:
  what: Request to do something (imperative)
  example: "PlaceOrder", "CreateUser", "ProcessPayment"
  characteristic: Directed at specific service
  coupling: Sender knows about receiver

Event Types

Domain Events: Business-meaningful occurrences

{
  "eventType": "OrderShipped",
  "orderId": "12345",
  "shippedAt": "2020-07-06T10:30:00Z",
  "carrier": "UPS",
  "trackingNumber": "1Z999AA10123456784"
}

Integration Events: Cross-service communication

{
  "eventType": "InventoryReserved",
  "correlationId": "order-12345",
  "items": [
    {"sku": "ABC123", "quantity": 2, "warehouseId": "WH-001"}
  ]
}

Change Data Capture (CDC): Database changes as events

{
  "operation": "UPDATE",
  "table": "orders",
  "before": {"status": "pending"},
  "after": {"status": "shipped"},
  "timestamp": "2020-07-06T10:30:00Z"
}

Architecture Patterns

Event Notification

Simple notification, consumers fetch details:

┌─────────────┐    Event: OrderCreated(id=123)    ┌─────────────┐
│   Orders    │──────────────────────────────────►│  Shipping   │
│   Service   │                                   │   Service   │
└─────────────┘                                   └─────────────┘
                                                        │
                                                        │ GET /orders/123
                                                        ▼
                                                  ┌─────────────┐
                                                  │   Orders    │
                                                  │     API     │
                                                  └─────────────┘

Pros: Small events, no duplication Cons: Requires callback, coupling to API

Event-Carried State Transfer

Event contains all needed data:

{
  "eventType": "OrderCreated",
  "order": {
    "id": "123",
    "customerId": "456",
    "items": [...],
    "shippingAddress": {...},
    "total": 99.99
  }
}

Pros: No callback needed, consumers have data Cons: Larger events, data duplication

Event Sourcing

Store state as sequence of events:

Events:
├── OrderCreated { orderId: 123, items: [...] }
├── ItemAdded { orderId: 123, sku: "ABC" }
├── PaymentReceived { orderId: 123, amount: 99.99 }
└── OrderShipped { orderId: 123, trackingNo: "..." }

Current State = fold(events)

Pros: Complete audit trail, time travel, replay Cons: Complexity, eventual consistency

CQRS

Separate read and write models:

┌─────────────────────────────────────────────────────────┐
│                                                          │
│  Commands ──► Write Model ──► Events ──► Read Model     │
│                                              │           │
│                                              ▼           │
│                                          Queries        │
│                                                          │
└─────────────────────────────────────────────────────────┘

Often combined with event sourcing.

Message Brokers

Apache Kafka

Distributed log for high-throughput:

# Producer
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send('orders', {
    'eventType': 'OrderCreated',
    'orderId': '123',
    'timestamp': '2020-07-06T10:00:00Z'
})
# Consumer
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='shipping-service',
    auto_offset_reset='earliest'
)

for message in consumer:
    event = json.loads(message.value)
    handle_event(event)

Characteristics:

RabbitMQ

Traditional message broker:

import pika

# Publisher
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='fanout')

channel.basic_publish(
    exchange='orders',
    routing_key='',
    body=json.dumps({'orderId': '123'})
)

Characteristics:

AWS EventBridge

Managed event bus:

import boto3

events = boto3.client('events')

events.put_events(
    Entries=[{
        'Source': 'orders.service',
        'DetailType': 'OrderCreated',
        'Detail': json.dumps({'orderId': '123'}),
        'EventBusName': 'default'
    }]
)

Characteristics:

Event Design

Event Schema

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "properties": {
    "eventId": { "type": "string", "format": "uuid" },
    "eventType": { "type": "string" },
    "timestamp": { "type": "string", "format": "date-time" },
    "version": { "type": "string" },
    "source": { "type": "string" },
    "correlationId": { "type": "string" },
    "data": { "type": "object" }
  },
  "required": ["eventId", "eventType", "timestamp", "data"]
}

Envelope Pattern

Consistent wrapper for all events:

{
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "eventType": "com.company.orders.OrderCreated",
  "timestamp": "2020-07-06T10:30:00Z",
  "version": "1.0",
  "source": "orders-service",
  "correlationId": "request-12345",
  "data": {
    "orderId": "123",
    "customerId": "456",
    "items": [...]
  }
}

Versioning Events

Schemas evolve; handle it gracefully:

# Option 1: Versioned event types
"eventType": "OrderCreated.v1"
"eventType": "OrderCreated.v2"

# Option 2: Version field in envelope
"version": "2.0"

# Option 3: Additive changes only (backward compatible)
# Add fields, never remove or rename

Consumer handling:

def handle_order_created(event):
    version = event.get('version', '1.0')

    if version == '1.0':
        # Handle v1 schema
        customer_id = event['data']['customerId']
    else:
        # Handle v2 schema
        customer_id = event['data']['customer']['id']

Patterns for Reliability

Idempotent Consumers

Events may be delivered multiple times:

def handle_order_created(event):
    event_id = event['eventId']

    # Check if already processed
    if db.event_processed(event_id):
        return  # Skip duplicate

    # Process event
    create_shipment(event['data'])

    # Mark as processed
    db.mark_processed(event_id)

Outbox Pattern

Ensure event publication with database transaction:

# Write to database and outbox atomically
with db.transaction():
    db.save_order(order)
    db.save_outbox_event({
        'topic': 'orders',
        'event': {'eventType': 'OrderCreated', 'data': order.to_dict()},
        'created_at': now()
    })

# Separate process publishes from outbox
def outbox_publisher():
    events = db.get_unpublished_events()
    for event in events:
        kafka.send(event['topic'], event['event'])
        db.mark_published(event['id'])

Dead Letter Queues

Handle failures gracefully:

# Failed messages go to DLQ
Consumer → Process → Success
              
              └── Failure (after retries)
                      
                      
              Dead Letter Queue
                      
                      
              Alert/Manual Review

Saga Pattern

Coordinate multi-service transactions:

Order Saga:
1. CreateOrder → OrderCreated
2. ReserveInventory → InventoryReserved
3. ProcessPayment → PaymentCompleted
4. ShipOrder → OrderShipped

Compensation on failure:
- PaymentFailed → ReleaseInventory → CancelOrder
# Saga orchestrator
class OrderSaga:
    def start(self, order):
        self.publish('CreateOrder', order)

    def handle_order_created(self, event):
        self.publish('ReserveInventory', event.order_id)

    def handle_inventory_reserved(self, event):
        self.publish('ProcessPayment', event.order_id)

    def handle_payment_failed(self, event):
        # Compensation
        self.publish('ReleaseInventory', event.order_id)
        self.publish('CancelOrder', event.order_id)

Observability

Event Tracing

Correlation IDs track requests across services:

def handle_api_request(request):
    correlation_id = request.headers.get('X-Correlation-ID', generate_id())

    # Include in all events
    publish_event({
        'correlationId': correlation_id,
        'eventType': 'OrderCreated',
        ...
    })

def handle_event(event):
    correlation_id = event['correlationId']

    # Log with correlation ID
    logger.info('Processing event', extra={'correlationId': correlation_id})

    # Pass to downstream events
    publish_event({
        'correlationId': correlation_id,
        ...
    })

Metrics

Essential metrics:
  - Events published (by type)
  - Events consumed (by type, consumer)
  - Consumer lag
  - Processing duration
  - Error rate
  - Dead letter queue depth

Consumer Lag Monitoring

# Alert when consumers fall behind
consumer_lag:
  warning: > 1000 messages
  critical: > 10000 messages

processing_delay:
  warning: > 5 minutes
  critical: > 30 minutes

Key Takeaways

Event-driven architecture enables loose coupling and scalability. The patterns require discipline but pay off in resilient, evolvable systems.