Event sourcing stores state as a sequence of events rather than current state. Every change is recorded, providing a complete audit trail and the ability to reconstruct state at any point. But it’s not a silver bullet—it adds complexity that must be managed.
Here’s how to implement event sourcing practically.
When to Use Event Sourcing
Good Fit
event_sourcing_fits:
audit_requirements:
- Financial systems
- Healthcare records
- Compliance-heavy domains
- Need complete history
temporal_queries:
- "What was state on date X?"
- Historical analysis
- Point-in-time reporting
complex_domains:
- Many state transitions
- Business rules depend on history
- Undo/redo requirements
event_driven:
- Already publishing events
- Multiple downstream consumers
- Integration requirements
Poor Fit
event_sourcing_poor_fit:
simple_crud:
- Basic CRUD operations
- No audit requirements
- History not valuable
heavy_querying:
- Complex ad-hoc queries
- Reports from current state
- (Unless combined with CQRS)
team_inexperience:
- Steep learning curve
- Debugging is different
- Requires architectural discipline
Core Concepts
Events
// Event interface
type Event interface {
EventType() string
OccurredAt() time.Time
AggregateID() string
}
// Order events
type OrderCreated struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Items []Item `json:"items"`
CreatedAt time.Time `json:"createdAt"`
}
func (e OrderCreated) EventType() string { return "OrderCreated" }
func (e OrderCreated) OccurredAt() time.Time { return e.CreatedAt }
func (e OrderCreated) AggregateID() string { return e.OrderID }
type OrderItemAdded struct {
OrderID string `json:"orderId"`
Item Item `json:"item"`
AddedAt time.Time `json:"addedAt"`
}
type OrderShipped struct {
OrderID string `json:"orderId"`
TrackingID string `json:"trackingId"`
ShippedAt time.Time `json:"shippedAt"`
}
type OrderCancelled struct {
OrderID string `json:"orderId"`
Reason string `json:"reason"`
CancelledAt time.Time `json:"cancelledAt"`
}
Aggregates
// Order aggregate
type Order struct {
id string
customerID string
items []Item
status OrderStatus
version int
// Uncommitted events
changes []Event
}
// Apply events to rebuild state
func (o *Order) Apply(event Event) {
switch e := event.(type) {
case OrderCreated:
o.id = e.OrderID
o.customerID = e.CustomerID
o.items = e.Items
o.status = OrderStatusPending
case OrderItemAdded:
o.items = append(o.items, e.Item)
case OrderShipped:
o.status = OrderStatusShipped
case OrderCancelled:
o.status = OrderStatusCancelled
}
o.version++
}
// Commands produce events
func (o *Order) AddItem(item Item) error {
if o.status != OrderStatusPending {
return errors.New("cannot modify non-pending order")
}
event := OrderItemAdded{
OrderID: o.id,
Item: item,
AddedAt: time.Now(),
}
o.Apply(event)
o.changes = append(o.changes, event)
return nil
}
func (o *Order) Ship(trackingID string) error {
if o.status != OrderStatusPending {
return errors.New("cannot ship non-pending order")
}
if len(o.items) == 0 {
return errors.New("cannot ship empty order")
}
event := OrderShipped{
OrderID: o.id,
TrackingID: trackingID,
ShippedAt: time.Now(),
}
o.Apply(event)
o.changes = append(o.changes, event)
return nil
}
Event Store
type EventStore interface {
// Append events with optimistic concurrency
Append(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error
// Load all events for aggregate
Load(ctx context.Context, aggregateID string) ([]Event, error)
// Subscribe to events
Subscribe(ctx context.Context, handler func(Event)) error
}
// PostgreSQL implementation
type PostgresEventStore struct {
db *sql.DB
}
func (s *PostgresEventStore) Append(ctx context.Context, aggregateID string, expectedVersion int, events []Event) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Check current version (optimistic concurrency)
var currentVersion int
err = tx.QueryRowContext(ctx,
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregateID,
).Scan(¤tVersion)
if err != nil {
return err
}
if currentVersion != expectedVersion {
return ErrConcurrencyConflict
}
// Insert events
for i, event := range events {
eventData, _ := json.Marshal(event)
version := expectedVersion + i + 1
_, err = tx.ExecContext(ctx,
`INSERT INTO events (aggregate_id, version, event_type, event_data, occurred_at)
VALUES ($1, $2, $3, $4, $5)`,
aggregateID, version, event.EventType(), eventData, event.OccurredAt(),
)
if err != nil {
return err
}
}
return tx.Commit()
}
func (s *PostgresEventStore) Load(ctx context.Context, aggregateID string) ([]Event, error) {
rows, err := s.db.QueryContext(ctx,
`SELECT event_type, event_data FROM events
WHERE aggregate_id = $1
ORDER BY version`,
aggregateID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var events []Event
for rows.Next() {
var eventType string
var eventData []byte
rows.Scan(&eventType, &eventData)
event := deserializeEvent(eventType, eventData)
events = append(events, event)
}
return events, nil
}
CQRS Pattern
Separating Reads and Writes
cqrs_pattern:
command_side:
- Event sourced aggregates
- Business logic validation
- Produces events
query_side:
- Read-optimized projections
- Denormalized for queries
- Eventually consistent
Commands ──► Aggregates ──► Events ──► Event Store
│
▼
Projections
│
▼
Read Database
│
▼
Queries
Projections
// Projection handler
type OrderProjection struct {
db *sql.DB
}
func (p *OrderProjection) Handle(event Event) error {
switch e := event.(type) {
case OrderCreated:
return p.insertOrder(e)
case OrderItemAdded:
return p.updateOrderItems(e)
case OrderShipped:
return p.updateOrderStatus(e.OrderID, "shipped")
case OrderCancelled:
return p.updateOrderStatus(e.OrderID, "cancelled")
}
return nil
}
func (p *OrderProjection) insertOrder(e OrderCreated) error {
_, err := p.db.Exec(
`INSERT INTO orders_read (id, customer_id, status, item_count, total, created_at)
VALUES ($1, $2, 'pending', $3, $4, $5)`,
e.OrderID, e.CustomerID, len(e.Items), calculateTotal(e.Items), e.CreatedAt,
)
return err
}
// Query from projection
type OrderQueryService struct {
db *sql.DB
}
func (s *OrderQueryService) GetCustomerOrders(customerID string) ([]OrderSummary, error) {
rows, err := s.db.Query(
`SELECT id, status, item_count, total, created_at
FROM orders_read
WHERE customer_id = $1
ORDER BY created_at DESC`,
customerID,
)
// ...
}
Patterns and Practices
Snapshots
For aggregates with many events:
type Snapshot struct {
AggregateID string
Version int
State []byte
CreatedAt time.Time
}
func (r *OrderRepository) Load(ctx context.Context, orderID string) (*Order, error) {
// Try to load snapshot first
snapshot, err := r.snapshotStore.Load(ctx, orderID)
var order Order
var startVersion int
if err == nil && snapshot != nil {
// Restore from snapshot
json.Unmarshal(snapshot.State, &order)
startVersion = snapshot.Version
}
// Load events since snapshot
events, err := r.eventStore.LoadFromVersion(ctx, orderID, startVersion)
if err != nil {
return nil, err
}
// Apply events
for _, event := range events {
order.Apply(event)
}
return &order, nil
}
func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
// Save events
err := r.eventStore.Append(ctx, order.id, order.version-len(order.changes), order.changes)
if err != nil {
return err
}
// Create snapshot every N events
if order.version%100 == 0 {
r.createSnapshot(ctx, order)
}
return nil
}
Event Versioning
// Handle schema evolution
type OrderCreatedV1 struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
}
type OrderCreatedV2 struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Items []Item `json:"items"`
CreatedAt time.Time `json:"createdAt"`
}
// Upcaster transforms old events
func upcastOrderCreated(data []byte) Event {
var v1 OrderCreatedV1
if err := json.Unmarshal(data, &v1); err == nil {
// Check if it's v1 (missing fields)
if v1.Items == nil {
return OrderCreatedV2{
OrderID: v1.OrderID,
CustomerID: v1.CustomerID,
Items: []Item{}, // Default
CreatedAt: time.Now(), // Default
}
}
}
var v2 OrderCreatedV2
json.Unmarshal(data, &v2)
return v2
}
Idempotent Projections
func (p *OrderProjection) Handle(event Event, metadata EventMetadata) error {
// Check if already processed (idempotency)
processed, err := p.isProcessed(metadata.EventID)
if err != nil {
return err
}
if processed {
return nil // Already handled
}
// Process event
err = p.processEvent(event)
if err != nil {
return err
}
// Mark as processed
return p.markProcessed(metadata.EventID)
}
Common Pitfalls
pitfalls:
large_aggregates:
problem: Too many events per aggregate
solution: Smaller aggregates, snapshots
chatty_events:
problem: Event per field change
solution: Meaningful domain events
synchronous_projections:
problem: Slow writes waiting for projections
solution: Async projection, eventual consistency
no_versioning:
problem: Can't evolve event schemas
solution: Version events, upcasters
rebuilding_projections:
problem: Takes forever with many events
solution: Snapshots, parallel processing
Key Takeaways
- Event sourcing stores state as sequence of events
- Aggregates apply events to rebuild state, commands produce new events
- Event stores provide append-only storage with optimistic concurrency
- CQRS separates reads (projections) from writes (events)
- Use snapshots for aggregates with many events
- Version events and use upcasters for schema evolution
- Make projections idempotent for reliable replay
- Not everything needs event sourcing—use where value justifies complexity
- Start with aggregates that truly need audit trail or temporal queries
- Consider eventual consistency implications for read models
Event sourcing is powerful but demanding. Apply it strategically where the benefits outweigh the complexity.