Async Architecture: Message Queues and Event Systems

July 25, 2022

Synchronous request-response works until it doesn’t. High load, slow operations, or service dependencies create bottlenecks. Asynchronous patterns—message queues, event streams, pub/sub—solve these problems but introduce complexity.

Here’s how to build async architectures that work.

When to Go Async

Sync vs. Async Trade-offs

synchronous:
  pros:
    - Simple mental model
    - Immediate feedback
    - Easy error handling
  cons:
    - Coupling
    - Timeout issues
    - Scaling bottlenecks

asynchronous:
  pros:
    - Decoupling
    - Better resilience
    - Horizontal scaling
  cons:
    - Eventual consistency
    - Harder debugging
    - Message handling complexity

When Async Makes Sense

good_for_async:
  slow_operations:
    - Email/notification sending
    - Report generation
    - File processing

  spike_absorption:
    - Order processing
    - User signups
    - Event ingestion

  decoupling:
    - Independent team releases
    - Different scaling needs
    - Technology heterogeneity

  fan_out:
    - Multiple consumers
    - Different processing needs
    - Event broadcasting

Message Queue Patterns

Point-to-Point

point_to_point:
  pattern: One producer, one consumer
  use_case: Task distribution
  guarantee: Each message processed once

  example:
    - Job queue
    - Background processing
    - Work distribution
# RabbitMQ work queue
import pika

# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='task payload',
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent
    )
)

# Consumer
def callback(ch, method, properties, body):
    process_task(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Publish/Subscribe

pub_sub:
  pattern: One producer, multiple consumers
  use_case: Event broadcasting
  guarantee: All subscribers receive message

  example:
    - User signup triggers email, analytics, onboarding
    - Order placed notifies inventory, shipping, notifications
# RabbitMQ fanout exchange
# Publisher
channel.exchange_declare(exchange='user_events', exchange_type='fanout')
channel.basic_publish(
    exchange='user_events',
    routing_key='',
    body='{"event": "user_signup", "user_id": "123"}'
)

# Subscriber 1 (email)
channel.exchange_declare(exchange='user_events', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='user_events', queue=queue_name)

# Subscriber 2 (analytics) - separate process
# Same binding pattern, different queue

Request/Reply

request_reply:
  pattern: Async request with correlated response
  use_case: RPC over messaging
  guarantee: Caller receives response

  implementation:
    - Correlation ID matches request/response
    - Reply-to queue for response
    - Timeout handling

Event Streaming with Kafka

When Kafka

kafka_use_cases:
  high_throughput:
    - Event ingestion
    - Metrics collection
    - Log aggregation

  event_replay:
    - Event sourcing
    - Data recovery
    - New consumer catch-up

  ordering:
    - Per-partition ordering
    - Transaction sequencing

Kafka Patterns

# Kafka producer
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',  # Wait for all replicas
    retries=3
)

# Partition by key for ordering
producer.send(
    'orders',
    key=order['customer_id'].encode(),  # Same customer = same partition
    value=order
)
# Kafka consumer
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['kafka:9092'],
    group_id='order-processor',
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # Manual commit for reliability
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    try:
        process_order(message.value)
        consumer.commit()
    except Exception as e:
        handle_error(message, e)

Consumer Groups

consumer_groups:
  purpose: Parallel consumption with coordination
  behavior:
    - Partitions distributed among consumers
    - One partition per consumer in group
    - Rebalancing on consumer join/leave

  scaling:
    - Add consumers up to partition count
    - More partitions = more parallelism

Message Reliability

Delivery Guarantees

delivery_guarantees:
  at_most_once:
    description: Message may be lost
    implementation: Fire and forget
    use_case: Metrics where loss is acceptable

  at_least_once:
    description: Message may be duplicated
    implementation: Ack after processing
    use_case: Most business operations

  exactly_once:
    description: No loss, no duplicates
    implementation: Idempotent consumers + transactions
    use_case: Financial transactions
    complexity: High

Idempotent Processing

def process_order_idempotently(message):
    order_id = message['order_id']

    # Check if already processed
    if order_repository.exists(order_id):
        logger.info(f"Order {order_id} already processed, skipping")
        return

    # Process with idempotency key
    try:
        with db.transaction():
            order = create_order(message)
            order_repository.save(order)
    except DuplicateKeyError:
        # Another consumer processed it
        logger.info(f"Order {order_id} processed by another consumer")

Dead Letter Queues

def process_with_dlq(message, max_retries=3):
    retry_count = message.headers.get('x-retry-count', 0)

    try:
        process_message(message)
    except RetryableError as e:
        if retry_count < max_retries:
            # Requeue with incremented retry count
            requeue_with_delay(message, retry_count + 1)
        else:
            # Send to DLQ for manual investigation
            send_to_dlq(message, str(e))
    except NonRetryableError as e:
        # Immediate DLQ
        send_to_dlq(message, str(e))

Observability

Message Tracing

tracing_strategy:
  correlation_id:
    - Include in all messages
    - Propagate through service chain
    - Use for log correlation

  distributed_tracing:
    - Span per message production
    - Span per message consumption
    - Link spans via trace context
# Propagate trace context in messages
from opentelemetry import trace
from opentelemetry.propagate import inject

def publish_message(topic, message):
    headers = {}
    inject(headers)  # Inject trace context

    producer.send(
        topic,
        value=message,
        headers=[(k, v.encode()) for k, v in headers.items()]
    )

Metrics

queue_metrics:
  producer:
    - Messages published
    - Publish latency
    - Publish errors

  queue:
    - Queue depth
    - Age of oldest message
    - Consumer lag (Kafka)

  consumer:
    - Messages consumed
    - Processing time
    - Error rate

Anti-Patterns

async_anti_patterns:
  unbounded_queues:
    problem: Queue grows forever on consumer issues
    solution: Backpressure, alerting on depth

  missing_idempotency:
    problem: Duplicate processing on retry
    solution: Always design for at-least-once

  sync_over_async:
    problem: Blocking wait for async response
    solution: True async or use sync pattern

  no_dlq:
    problem: Poison messages block queue
    solution: DLQ for failed messages

  ignoring_ordering:
    problem: Race conditions on related messages
    solution: Partition key or sequence handling

Key Takeaways

Async is powerful but not free. The complexity tax is real.