Designing Event-Sourced Systems: Patterns and Pitfalls

March 19, 2018

Event sourcing stores the full history of changes as a sequence of immutable events. Instead of storing current state, you store everything that happened. Current state is derived by replaying events.

The approach offers powerful benefits but introduces significant complexity. Here’s how to design event-sourced systems that work.

Core Concepts

Events as First-Class Citizens

Events are immutable facts about what happened:

{
  "event_type": "OrderPlaced",
  "event_id": "evt_abc123",
  "timestamp": "2018-03-19T10:15:30Z",
  "aggregate_id": "order_789",
  "data": {
    "customer_id": "cust_456",
    "items": [
      {"product_id": "prod_123", "quantity": 2, "price": 29.99}
    ],
    "total": 59.98
  }
}

Key properties:

Event Store

The event store is an append-only log:

OrderPlaced(order_789, {items: [...], total: 59.98})
PaymentReceived(order_789, {amount: 59.98, method: "credit"})
OrderShipped(order_789, {carrier: "FedEx", tracking: "123456"})

Operations:

Aggregates

Aggregates are the boundaries of consistency:

class Order:
    def __init__(self, events):
        self.id = None
        self.status = None
        self.items = []
        self.total = 0

        for event in events:
            self.apply(event)

    def apply(self, event):
        if event.type == "OrderPlaced":
            self.id = event.aggregate_id
            self.status = "placed"
            self.items = event.data["items"]
            self.total = event.data["total"]
        elif event.type == "OrderShipped":
            self.status = "shipped"
        # ... other event handlers

    def ship(self, carrier, tracking):
        if self.status != "paid":
            raise InvalidStateError("Cannot ship unpaid order")
        return OrderShipped(self.id, carrier, tracking)

Aggregates:

Why Event Sourcing

Complete Audit Trail

Every state change is captured. You can answer:

Debugging, compliance, and analytics benefit from complete history.

Temporal Queries

Query historical state:

def get_order_state_at(order_id, timestamp):
    events = event_store.get_events(
        aggregate_id=order_id,
        until=timestamp
    )
    return Order(events)

Useful for reporting, reconciliation, and debugging.

Decoupled Systems

Events enable loose coupling:

Order Service → OrderPlaced event → Inventory Service
                                 → Notification Service
                                 → Analytics Service

Services react to events without tight integration.

Flexibility for Future Needs

Events capture intent, not just state. New projections can be created from historical events:

Design Patterns

CQRS (Command Query Responsibility Segregation)

Separate write and read models:

Commands → Event Store → Events → Read Model
              ↑                       ↓
         Aggregates              Query Handlers

Write side: Event-sourced aggregates enforce business rules Read side: Denormalized views optimized for queries

-- Read model: Denormalized order view
CREATE TABLE order_summary (
    order_id VARCHAR PRIMARY KEY,
    customer_name VARCHAR,
    status VARCHAR,
    total DECIMAL,
    item_count INTEGER,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

Read models are updated by projecting events.

Projections

Transform events into read models:

class OrderSummaryProjection:
    def __init__(self, db):
        self.db = db

    def handle(self, event):
        if event.type == "OrderPlaced":
            self.db.execute("""
                INSERT INTO order_summary (order_id, status, total, created_at)
                VALUES (%s, 'placed', %s, %s)
            """, event.aggregate_id, event.data["total"], event.timestamp)

        elif event.type == "OrderShipped":
            self.db.execute("""
                UPDATE order_summary SET status = 'shipped', updated_at = %s
                WHERE order_id = %s
            """, event.timestamp, event.aggregate_id)

Projections can be rebuilt from scratch by replaying all events.

Snapshotting

For aggregates with many events, snapshots avoid replaying everything:

def load_aggregate(aggregate_id):
    snapshot = snapshot_store.get_latest(aggregate_id)

    if snapshot:
        aggregate = deserialize(snapshot.data)
        events = event_store.get_events(
            aggregate_id=aggregate_id,
            after_version=snapshot.version
        )
    else:
        aggregate = Order()
        events = event_store.get_events(aggregate_id=aggregate_id)

    for event in events:
        aggregate.apply(event)

    return aggregate

Create snapshots periodically (every N events).

Sagas/Process Managers

Coordinate multi-aggregate transactions:

class OrderFulfillmentSaga:
    def handle(self, event):
        if event.type == "OrderPlaced":
            self.reserve_inventory(event.data["items"])

        elif event.type == "InventoryReserved":
            self.charge_payment(event.data["order_id"])

        elif event.type == "PaymentCharged":
            self.ship_order(event.data["order_id"])

        elif event.type == "InventoryReservationFailed":
            self.cancel_order(event.data["order_id"])

Sagas maintain eventual consistency across aggregates.

Common Pitfalls

Events as Commands

Wrong:

{"event_type": "UpdateCustomerAddress", "new_address": {...}}

Right:

{"event_type": "CustomerAddressChanged", "previous_address": {...}, "new_address": {...}}

Events describe facts, not instructions. Commands are requests; events are results.

Missing Context

Events should be understandable without additional queries:

Wrong:

{"event_type": "OrderItemRemoved", "item_id": "123"}

Right:

{
  "event_type": "OrderItemRemoved",
  "item_id": "123",
  "product_name": "Widget",
  "quantity_removed": 2,
  "price_at_time": 29.99,
  "reason": "customer_request"
}

Future processing shouldn’t require external lookups.

Schema Evolution Ignored

Events are forever. Plan for schema changes:

def deserialize_event(event):
    if event.version == 1:
        # Old format
        return migrate_v1_to_current(event.data)
    elif event.version == 2:
        # Current format
        return event.data

Strategies:

Giant Aggregates

Large aggregates cause problems:

Keep aggregates small. Split if they grow large.

Synchronous Projections

Updating projections synchronously couples read and write performance:

# Don't do this
def handle_command(command):
    event = aggregate.process(command)
    event_store.append(event)
    projection.handle(event)  # Blocks command processing

Project asynchronously:

def handle_command(command):
    event = aggregate.process(command)
    event_store.append(event)
    # Projection updated asynchronously via subscription

Accept eventual consistency in read models.

Ignoring Idempotency

Events may be delivered multiple times. Projections must handle duplicates:

def handle(self, event):
    if self.already_processed(event.event_id):
        return  # Idempotent

    # Process event
    self.mark_processed(event.event_id)

Operational Considerations

Event Store Selection

Options:

Consider:

Monitoring

Track:

Alert on:

Disaster Recovery

Events are your source of truth:

When Not to Use Event Sourcing

Event sourcing adds complexity. Don’t use it when:

Start with traditional state-based persistence. Adopt event sourcing when the benefits justify the complexity.

Key Takeaways

Event sourcing is powerful for the right problems. The key is understanding both the benefits and the costs.