Event Sourcing Patterns: Practical Implementation

October 25, 2021

Event sourcing stores state as a sequence of events rather than current state. Every change is recorded, providing a complete audit trail and the ability to reconstruct state at any point. But it’s not a silver bullet—it adds complexity that must be managed.

Here’s how to implement event sourcing practically.

When to Use Event Sourcing

Good Fit

event_sourcing_fits:
  audit_requirements:
    - Financial systems
    - Healthcare records
    - Compliance-heavy domains
    - Need complete history

  temporal_queries:
    - "What was state on date X?"
    - Historical analysis
    - Point-in-time reporting

  complex_domains:
    - Many state transitions
    - Business rules depend on history
    - Undo/redo requirements

  event_driven:
    - Already publishing events
    - Multiple downstream consumers
    - Integration requirements

Poor Fit

event_sourcing_poor_fit:
  simple_crud:
    - Basic CRUD operations
    - No audit requirements
    - History not valuable

  heavy_querying:
    - Complex ad-hoc queries
    - Reports from current state
    - (Unless combined with CQRS)

  team_inexperience:
    - Steep learning curve
    - Debugging is different
    - Requires architectural discipline

Core Concepts

Events

// Event interface
type Event interface {
    EventType() string
    OccurredAt() time.Time
    AggregateID() string
}

// Order events
type OrderCreated struct {
    OrderID     string    `json:"orderId"`
    CustomerID  string    `json:"customerId"`
    Items       []Item    `json:"items"`
    CreatedAt   time.Time `json:"createdAt"`
}

func (e OrderCreated) EventType() string { return "OrderCreated" }
func (e OrderCreated) OccurredAt() time.Time { return e.CreatedAt }
func (e OrderCreated) AggregateID() string { return e.OrderID }

type OrderItemAdded struct {
    OrderID   string    `json:"orderId"`
    Item      Item      `json:"item"`
    AddedAt   time.Time `json:"addedAt"`
}

type OrderShipped struct {
    OrderID    string    `json:"orderId"`
    TrackingID string    `json:"trackingId"`
    ShippedAt  time.Time `json:"shippedAt"`
}

type OrderCancelled struct {
    OrderID     string    `json:"orderId"`
    Reason      string    `json:"reason"`
    CancelledAt time.Time `json:"cancelledAt"`
}

Aggregates

// Order aggregate
type Order struct {
    id         string
    customerID string
    items      []Item
    status     OrderStatus
    version    int

    // Uncommitted events
    changes []Event
}

// Apply events to rebuild state
func (o *Order) Apply(event Event) {
    switch e := event.(type) {
    case OrderCreated:
        o.id = e.OrderID
        o.customerID = e.CustomerID
        o.items = e.Items
        o.status = OrderStatusPending

    case OrderItemAdded:
        o.items = append(o.items, e.Item)

    case OrderShipped:
        o.status = OrderStatusShipped

    case OrderCancelled:
        o.status = OrderStatusCancelled
    }
    o.version++
}

// Commands produce events
func (o *Order) AddItem(item Item) error {
    if o.status != OrderStatusPending {
        return errors.New("cannot modify non-pending order")
    }

    event := OrderItemAdded{
        OrderID: o.id,
        Item:    item,
        AddedAt: time.Now(),
    }

    o.Apply(event)
    o.changes = append(o.changes, event)
    return nil
}

func (o *Order) Ship(trackingID string) error {
    if o.status != OrderStatusPending {
        return errors.New("cannot ship non-pending order")
    }
    if len(o.items) == 0 {
        return errors.New("cannot ship empty order")
    }

    event := OrderShipped{
        OrderID:    o.id,
        TrackingID: trackingID,
        ShippedAt:  time.Now(),
    }

    o.Apply(event)
    o.changes = append(o.changes, event)
    return nil
}

Event Store

type EventStore interface {
    // Append events with optimistic concurrency
    Append(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error

    // Load all events for aggregate
    Load(ctx context.Context, aggregateID string) ([]Event, error)

    // Subscribe to events
    Subscribe(ctx context.Context, handler func(Event)) error
}

// PostgreSQL implementation
type PostgresEventStore struct {
    db *sql.DB
}

func (s *PostgresEventStore) Append(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Check current version (optimistic concurrency)
    var currentVersion int
    err = tx.QueryRowContext(ctx,
        "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
        aggregateID,
    ).Scan(&currentVersion)
    if err != nil {
        return err
    }

    if currentVersion != expectedVersion {
        return ErrConcurrencyConflict
    }

    // Insert events
    for i, event := range events {
        eventData, _ := json.Marshal(event)
        version := expectedVersion + i + 1

        _, err = tx.ExecContext(ctx,
            `INSERT INTO events (aggregate_id, version, event_type, event_data, occurred_at)
             VALUES ($1, $2, $3, $4, $5)`,
            aggregateID, version, event.EventType(), eventData, event.OccurredAt(),
        )
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

func (s *PostgresEventStore) Load(ctx context.Context, aggregateID string) ([]Event, error) {
    rows, err := s.db.QueryContext(ctx,
        `SELECT event_type, event_data FROM events
         WHERE aggregate_id = $1
         ORDER BY version`,
        aggregateID,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var events []Event
    for rows.Next() {
        var eventType string
        var eventData []byte
        rows.Scan(&eventType, &eventData)

        event := deserializeEvent(eventType, eventData)
        events = append(events, event)
    }

    return events, nil
}

CQRS Pattern

Separating Reads and Writes

cqrs_pattern:
  command_side:
    - Event sourced aggregates
    - Business logic validation
    - Produces events

  query_side:
    - Read-optimized projections
    - Denormalized for queries
    - Eventually consistent
Commands ──► Aggregates ──► Events ──► Event Store
                                           │
                                           ▼
                                     Projections
                                           │
                                           ▼
                                   Read Database
                                           │
                                           ▼
                                       Queries

Projections

// Projection handler
type OrderProjection struct {
    db *sql.DB
}

func (p *OrderProjection) Handle(event Event) error {
    switch e := event.(type) {
    case OrderCreated:
        return p.insertOrder(e)
    case OrderItemAdded:
        return p.updateOrderItems(e)
    case OrderShipped:
        return p.updateOrderStatus(e.OrderID, "shipped")
    case OrderCancelled:
        return p.updateOrderStatus(e.OrderID, "cancelled")
    }
    return nil
}

func (p *OrderProjection) insertOrder(e OrderCreated) error {
    _, err := p.db.Exec(
        `INSERT INTO orders_read (id, customer_id, status, item_count, total, created_at)
         VALUES ($1, $2, 'pending', $3, $4, $5)`,
        e.OrderID, e.CustomerID, len(e.Items), calculateTotal(e.Items), e.CreatedAt,
    )
    return err
}

// Query from projection
type OrderQueryService struct {
    db *sql.DB
}

func (s *OrderQueryService) GetCustomerOrders(customerID string) ([]OrderSummary, error) {
    rows, err := s.db.Query(
        `SELECT id, status, item_count, total, created_at
         FROM orders_read
         WHERE customer_id = $1
         ORDER BY created_at DESC`,
        customerID,
    )
    // ...
}

Patterns and Practices

Snapshots

For aggregates with many events:

type Snapshot struct {
    AggregateID string
    Version     int
    State       []byte
    CreatedAt   time.Time
}

func (r *OrderRepository) Load(ctx context.Context, orderID string) (*Order, error) {
    // Try to load snapshot first
    snapshot, err := r.snapshotStore.Load(ctx, orderID)

    var order Order
    var startVersion int

    if err == nil && snapshot != nil {
        // Restore from snapshot
        json.Unmarshal(snapshot.State, &order)
        startVersion = snapshot.Version
    }

    // Load events since snapshot
    events, err := r.eventStore.LoadFromVersion(ctx, orderID, startVersion)
    if err != nil {
        return nil, err
    }

    // Apply events
    for _, event := range events {
        order.Apply(event)
    }

    return &order, nil
}

func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
    // Save events
    err := r.eventStore.Append(ctx, order.id, order.version-len(order.changes), order.changes)
    if err != nil {
        return err
    }

    // Create snapshot every N events
    if order.version%100 == 0 {
        r.createSnapshot(ctx, order)
    }

    return nil
}

Event Versioning

// Handle schema evolution
type OrderCreatedV1 struct {
    OrderID    string `json:"orderId"`
    CustomerID string `json:"customerId"`
}

type OrderCreatedV2 struct {
    OrderID     string    `json:"orderId"`
    CustomerID  string    `json:"customerId"`
    Items       []Item    `json:"items"`
    CreatedAt   time.Time `json:"createdAt"`
}

// Upcaster transforms old events
func upcastOrderCreated(data []byte) Event {
    var v1 OrderCreatedV1
    if err := json.Unmarshal(data, &v1); err == nil {
        // Check if it's v1 (missing fields)
        if v1.Items == nil {
            return OrderCreatedV2{
                OrderID:    v1.OrderID,
                CustomerID: v1.CustomerID,
                Items:      []Item{}, // Default
                CreatedAt:  time.Now(), // Default
            }
        }
    }

    var v2 OrderCreatedV2
    json.Unmarshal(data, &v2)
    return v2
}

Idempotent Projections

func (p *OrderProjection) Handle(event Event, metadata EventMetadata) error {
    // Check if already processed (idempotency)
    processed, err := p.isProcessed(metadata.EventID)
    if err != nil {
        return err
    }
    if processed {
        return nil // Already handled
    }

    // Process event
    err = p.processEvent(event)
    if err != nil {
        return err
    }

    // Mark as processed
    return p.markProcessed(metadata.EventID)
}

Common Pitfalls

pitfalls:
  large_aggregates:
    problem: Too many events per aggregate
    solution: Smaller aggregates, snapshots

  chatty_events:
    problem: Event per field change
    solution: Meaningful domain events

  synchronous_projections:
    problem: Slow writes waiting for projections
    solution: Async projection, eventual consistency

  no_versioning:
    problem: Can't evolve event schemas
    solution: Version events, upcasters

  rebuilding_projections:
    problem: Takes forever with many events
    solution: Snapshots, parallel processing

Key Takeaways

Event sourcing is powerful but demanding. Apply it strategically where the benefits outweigh the complexity.