Synchronous request-response works until it doesn’t. High load, slow operations, or service dependencies create bottlenecks. Asynchronous patterns—message queues, event streams, pub/sub—solve these problems but introduce complexity.
Here’s how to build async architectures that work.
When to Go Async
Sync vs. Async Trade-offs
synchronous:
pros:
- Simple mental model
- Immediate feedback
- Easy error handling
cons:
- Coupling
- Timeout issues
- Scaling bottlenecks
asynchronous:
pros:
- Decoupling
- Better resilience
- Horizontal scaling
cons:
- Eventual consistency
- Harder debugging
- Message handling complexity
When Async Makes Sense
good_for_async:
slow_operations:
- Email/notification sending
- Report generation
- File processing
spike_absorption:
- Order processing
- User signups
- Event ingestion
decoupling:
- Independent team releases
- Different scaling needs
- Technology heterogeneity
fan_out:
- Multiple consumers
- Different processing needs
- Event broadcasting
Message Queue Patterns
Point-to-Point
point_to_point:
pattern: One producer, one consumer
use_case: Task distribution
guarantee: Each message processed once
example:
- Job queue
- Background processing
- Work distribution
# RabbitMQ work queue
import pika
# Producer
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='task payload',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
# Consumer
def callback(ch, method, properties, body):
process_task(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
Publish/Subscribe
pub_sub:
pattern: One producer, multiple consumers
use_case: Event broadcasting
guarantee: All subscribers receive message
example:
- User signup triggers email, analytics, onboarding
- Order placed notifies inventory, shipping, notifications
# RabbitMQ fanout exchange
# Publisher
channel.exchange_declare(exchange='user_events', exchange_type='fanout')
channel.basic_publish(
exchange='user_events',
routing_key='',
body='{"event": "user_signup", "user_id": "123"}'
)
# Subscriber 1 (email)
channel.exchange_declare(exchange='user_events', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='user_events', queue=queue_name)
# Subscriber 2 (analytics) - separate process
# Same binding pattern, different queue
Request/Reply
request_reply:
pattern: Async request with correlated response
use_case: RPC over messaging
guarantee: Caller receives response
implementation:
- Correlation ID matches request/response
- Reply-to queue for response
- Timeout handling
Event Streaming with Kafka
When Kafka
kafka_use_cases:
high_throughput:
- Event ingestion
- Metrics collection
- Log aggregation
event_replay:
- Event sourcing
- Data recovery
- New consumer catch-up
ordering:
- Per-partition ordering
- Transaction sequencing
Kafka Patterns
# Kafka producer
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Wait for all replicas
retries=3
)
# Partition by key for ordering
producer.send(
'orders',
key=order['customer_id'].encode(), # Same customer = same partition
value=order
)
# Kafka consumer
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['kafka:9092'],
group_id='order-processor',
auto_offset_reset='earliest',
enable_auto_commit=False, # Manual commit for reliability
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
try:
process_order(message.value)
consumer.commit()
except Exception as e:
handle_error(message, e)
Consumer Groups
consumer_groups:
purpose: Parallel consumption with coordination
behavior:
- Partitions distributed among consumers
- One partition per consumer in group
- Rebalancing on consumer join/leave
scaling:
- Add consumers up to partition count
- More partitions = more parallelism
Message Reliability
Delivery Guarantees
delivery_guarantees:
at_most_once:
description: Message may be lost
implementation: Fire and forget
use_case: Metrics where loss is acceptable
at_least_once:
description: Message may be duplicated
implementation: Ack after processing
use_case: Most business operations
exactly_once:
description: No loss, no duplicates
implementation: Idempotent consumers + transactions
use_case: Financial transactions
complexity: High
Idempotent Processing
def process_order_idempotently(message):
order_id = message['order_id']
# Check if already processed
if order_repository.exists(order_id):
logger.info(f"Order {order_id} already processed, skipping")
return
# Process with idempotency key
try:
with db.transaction():
order = create_order(message)
order_repository.save(order)
except DuplicateKeyError:
# Another consumer processed it
logger.info(f"Order {order_id} processed by another consumer")
Dead Letter Queues
def process_with_dlq(message, max_retries=3):
retry_count = message.headers.get('x-retry-count', 0)
try:
process_message(message)
except RetryableError as e:
if retry_count < max_retries:
# Requeue with incremented retry count
requeue_with_delay(message, retry_count + 1)
else:
# Send to DLQ for manual investigation
send_to_dlq(message, str(e))
except NonRetryableError as e:
# Immediate DLQ
send_to_dlq(message, str(e))
Observability
Message Tracing
tracing_strategy:
correlation_id:
- Include in all messages
- Propagate through service chain
- Use for log correlation
distributed_tracing:
- Span per message production
- Span per message consumption
- Link spans via trace context
# Propagate trace context in messages
from opentelemetry import trace
from opentelemetry.propagate import inject
def publish_message(topic, message):
headers = {}
inject(headers) # Inject trace context
producer.send(
topic,
value=message,
headers=[(k, v.encode()) for k, v in headers.items()]
)
Metrics
queue_metrics:
producer:
- Messages published
- Publish latency
- Publish errors
queue:
- Queue depth
- Age of oldest message
- Consumer lag (Kafka)
consumer:
- Messages consumed
- Processing time
- Error rate
Anti-Patterns
async_anti_patterns:
unbounded_queues:
problem: Queue grows forever on consumer issues
solution: Backpressure, alerting on depth
missing_idempotency:
problem: Duplicate processing on retry
solution: Always design for at-least-once
sync_over_async:
problem: Blocking wait for async response
solution: True async or use sync pattern
no_dlq:
problem: Poison messages block queue
solution: DLQ for failed messages
ignoring_ordering:
problem: Race conditions on related messages
solution: Partition key or sequence handling
Key Takeaways
- Go async for slow operations, spike absorption, and decoupling
- Point-to-point for task distribution; pub/sub for event broadcasting
- Kafka for high throughput, replay, and ordering requirements
- Design for at-least-once delivery with idempotent consumers
- Use dead letter queues for poison messages
- Consumer groups enable horizontal scaling
- Include correlation IDs for tracing
- Monitor queue depth, consumer lag, and processing errors
- Partition by key when ordering matters
- Async adds complexity—use when benefits justify it
Async is powerful but not free. The complexity tax is real.