Event sourcing stores the full history of changes as a sequence of immutable events. Instead of storing current state, you store everything that happened. Current state is derived by replaying events.
The approach offers powerful benefits but introduces significant complexity. Here’s how to design event-sourced systems that work.
Core Concepts
Events as First-Class Citizens
Events are immutable facts about what happened:
{
"event_type": "OrderPlaced",
"event_id": "evt_abc123",
"timestamp": "2018-03-19T10:15:30Z",
"aggregate_id": "order_789",
"data": {
"customer_id": "cust_456",
"items": [
{"product_id": "prod_123", "quantity": 2, "price": 29.99}
],
"total": 59.98
}
}
Key properties:
- Immutable: Events never change after creation
- Past tense: Events describe what happened, not what to do
- Self-contained: Events include all relevant data
Event Store
The event store is an append-only log:
OrderPlaced(order_789, {items: [...], total: 59.98})
PaymentReceived(order_789, {amount: 59.98, method: "credit"})
OrderShipped(order_789, {carrier: "FedEx", tracking: "123456"})
Operations:
- Append: Add new events
- Read: Get events by aggregate ID or stream position
- Subscribe: React to new events
Aggregates
Aggregates are the boundaries of consistency:
class Order:
def __init__(self, events):
self.id = None
self.status = None
self.items = []
self.total = 0
for event in events:
self.apply(event)
def apply(self, event):
if event.type == "OrderPlaced":
self.id = event.aggregate_id
self.status = "placed"
self.items = event.data["items"]
self.total = event.data["total"]
elif event.type == "OrderShipped":
self.status = "shipped"
# ... other event handlers
def ship(self, carrier, tracking):
if self.status != "paid":
raise InvalidStateError("Cannot ship unpaid order")
return OrderShipped(self.id, carrier, tracking)
Aggregates:
- Protect invariants
- Emit events for state changes
- Rebuild from event history
Why Event Sourcing
Complete Audit Trail
Every state change is captured. You can answer:
- What was the state at any point in time?
- Who made what changes when?
- Why did the system reach this state?
Debugging, compliance, and analytics benefit from complete history.
Temporal Queries
Query historical state:
def get_order_state_at(order_id, timestamp):
events = event_store.get_events(
aggregate_id=order_id,
until=timestamp
)
return Order(events)
Useful for reporting, reconciliation, and debugging.
Decoupled Systems
Events enable loose coupling:
Order Service → OrderPlaced event → Inventory Service
→ Notification Service
→ Analytics Service
Services react to events without tight integration.
Flexibility for Future Needs
Events capture intent, not just state. New projections can be created from historical events:
- Build new reports from existing events
- Add new derived data without migration
- Create read models optimized for specific queries
Design Patterns
CQRS (Command Query Responsibility Segregation)
Separate write and read models:
Commands → Event Store → Events → Read Model
↑ ↓
Aggregates Query Handlers
Write side: Event-sourced aggregates enforce business rules Read side: Denormalized views optimized for queries
-- Read model: Denormalized order view
CREATE TABLE order_summary (
order_id VARCHAR PRIMARY KEY,
customer_name VARCHAR,
status VARCHAR,
total DECIMAL,
item_count INTEGER,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
Read models are updated by projecting events.
Projections
Transform events into read models:
class OrderSummaryProjection:
def __init__(self, db):
self.db = db
def handle(self, event):
if event.type == "OrderPlaced":
self.db.execute("""
INSERT INTO order_summary (order_id, status, total, created_at)
VALUES (%s, 'placed', %s, %s)
""", event.aggregate_id, event.data["total"], event.timestamp)
elif event.type == "OrderShipped":
self.db.execute("""
UPDATE order_summary SET status = 'shipped', updated_at = %s
WHERE order_id = %s
""", event.timestamp, event.aggregate_id)
Projections can be rebuilt from scratch by replaying all events.
Snapshotting
For aggregates with many events, snapshots avoid replaying everything:
def load_aggregate(aggregate_id):
snapshot = snapshot_store.get_latest(aggregate_id)
if snapshot:
aggregate = deserialize(snapshot.data)
events = event_store.get_events(
aggregate_id=aggregate_id,
after_version=snapshot.version
)
else:
aggregate = Order()
events = event_store.get_events(aggregate_id=aggregate_id)
for event in events:
aggregate.apply(event)
return aggregate
Create snapshots periodically (every N events).
Sagas/Process Managers
Coordinate multi-aggregate transactions:
class OrderFulfillmentSaga:
def handle(self, event):
if event.type == "OrderPlaced":
self.reserve_inventory(event.data["items"])
elif event.type == "InventoryReserved":
self.charge_payment(event.data["order_id"])
elif event.type == "PaymentCharged":
self.ship_order(event.data["order_id"])
elif event.type == "InventoryReservationFailed":
self.cancel_order(event.data["order_id"])
Sagas maintain eventual consistency across aggregates.
Common Pitfalls
Events as Commands
Wrong:
{"event_type": "UpdateCustomerAddress", "new_address": {...}}
Right:
{"event_type": "CustomerAddressChanged", "previous_address": {...}, "new_address": {...}}
Events describe facts, not instructions. Commands are requests; events are results.
Missing Context
Events should be understandable without additional queries:
Wrong:
{"event_type": "OrderItemRemoved", "item_id": "123"}
Right:
{
"event_type": "OrderItemRemoved",
"item_id": "123",
"product_name": "Widget",
"quantity_removed": 2,
"price_at_time": 29.99,
"reason": "customer_request"
}
Future processing shouldn’t require external lookups.
Schema Evolution Ignored
Events are forever. Plan for schema changes:
def deserialize_event(event):
if event.version == 1:
# Old format
return migrate_v1_to_current(event.data)
elif event.version == 2:
# Current format
return event.data
Strategies:
- Upcasting (transform old to new on read)
- Event versioning
- New event types for significant changes
Giant Aggregates
Large aggregates cause problems:
- Slow to load (many events)
- Contention on updates
- Memory pressure
Keep aggregates small. Split if they grow large.
Synchronous Projections
Updating projections synchronously couples read and write performance:
# Don't do this
def handle_command(command):
event = aggregate.process(command)
event_store.append(event)
projection.handle(event) # Blocks command processing
Project asynchronously:
def handle_command(command):
event = aggregate.process(command)
event_store.append(event)
# Projection updated asynchronously via subscription
Accept eventual consistency in read models.
Ignoring Idempotency
Events may be delivered multiple times. Projections must handle duplicates:
def handle(self, event):
if self.already_processed(event.event_id):
return # Idempotent
# Process event
self.mark_processed(event.event_id)
Operational Considerations
Event Store Selection
Options:
- EventStoreDB: Purpose-built, excellent for event sourcing
- PostgreSQL: Append-only table, simpler operations
- Kafka: Stream-based, good for high throughput
- DynamoDB: Managed, scales well
Consider:
- Operational complexity
- Performance characteristics
- Team familiarity
- Ecosystem integration
Monitoring
Track:
- Event throughput
- Projection lag
- Aggregate load times
- Event store size
Alert on:
- Projection falling behind
- Unusual event patterns
- Processing errors
Disaster Recovery
Events are your source of truth:
- Back up event store rigorously
- Projections can be rebuilt (but this takes time)
- Consider cross-region replication
- Test recovery procedures
When Not to Use Event Sourcing
Event sourcing adds complexity. Don’t use it when:
- Simple CRUD is sufficient
- Audit requirements are minimal
- Team is unfamiliar with the pattern
- You don’t need temporal queries or event-driven architecture
Start with traditional state-based persistence. Adopt event sourcing when the benefits justify the complexity.
Key Takeaways
- Events are immutable facts describing what happened
- Aggregates enforce consistency and emit events
- CQRS separates write models (events) from read models (projections)
- Projections can be rebuilt from events at any time
- Plan for schema evolution from the start
- Keep aggregates small to avoid performance issues
- Project asynchronously; accept eventual consistency
- Ensure idempotent event handling
- Choose event store based on operational requirements and team expertise
- Don’t adopt event sourcing without clear benefits justifying the complexity
Event sourcing is powerful for the right problems. The key is understanding both the benefits and the costs.