Message queues enable asynchronous, decoupled communication between services. They provide reliability, scalability, and loose coupling. But they also introduce complexity. Here are the patterns that make message queues work reliably.
When to Use Queues
Good Use Cases
Async processing:
User submits order → Queue → Process payment (async)
→ Send confirmation email
→ Update inventory
Load leveling:
Traffic spike → Queue buffers requests → Workers process at capacity
Decoupling:
Service A publishes events
Services B, C, D consume independently
A doesn't know or care about consumers
Poor Use Cases
Synchronous requirements:
User expects immediate response
Don't queue if you need sync response
Simple request-response:
HTTP/gRPC is simpler for point-to-point
Don't add queue complexity unnecessarily
Core Patterns
Work Queue
Distribute work across multiple workers:
Producer → [Queue] → Consumer 1
→ Consumer 2
→ Consumer 3
# Producer
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=task_data,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
))
# Consumer
def callback(ch, method, properties, body):
process_task(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
Pub/Sub
Broadcast to multiple subscribers:
Producer → [Exchange] → Queue A → Consumer A
→ Queue B → Consumer B
→ Queue C → Consumer C
# Publisher
channel.exchange_declare(exchange='events', exchange_type='fanout')
channel.basic_publish(exchange='events', routing_key='', body=event_data)
# Subscriber
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='events', queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
Topic Routing
Route based on message attributes:
Producer → [Exchange] → Queue: orders.created → Order Service
→ Queue: orders.* → Analytics Service
→ Queue: #.payment → Payment Service
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.basic_publish(
exchange='topic_logs',
routing_key='orders.created',
body=message)
Request-Reply
Synchronous communication over async:
# Client
correlation_id = str(uuid.uuid4())
response_queue = channel.queue_declare(queue='', exclusive=True).method.queue
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=response_queue,
correlation_id=correlation_id,
),
body=request)
# Wait for response
for method, props, body in channel.consume(response_queue):
if props.correlation_id == correlation_id:
return body
Reliability Patterns
Message Persistence
Survive broker restart:
# RabbitMQ
channel.basic_publish(
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
))
# Kafka - persistent by design
producer.send('topic', value=message)
Publisher Confirms
Know when message is stored:
channel.confirm_delivery()
try:
channel.basic_publish(...)
except pika.exceptions.UnroutableError:
# Handle undeliverable message
Consumer Acknowledgments
Don’t lose messages:
def callback(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
# Requeue for retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
Dead Letter Queues
Handle failed messages:
# Declare queue with DLQ
channel.queue_declare(
queue='tasks',
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed_tasks'
})
Idempotent Consumers
Handle duplicate messages:
def process_message(message):
message_id = message.properties.message_id
if already_processed(message_id):
return # Idempotent
do_processing(message.body)
mark_processed(message_id)
Ordering Guarantees
Per-Partition Ordering (Kafka)
# Messages with same key go to same partition
producer.send('orders', key=user_id.encode(), value=order)
# All orders for user_id processed in order
Single Queue Ordering (RabbitMQ)
# Single consumer for strict ordering
channel.basic_qos(prefetch_count=1)
# Process one at a time
When Ordering Doesn’t Matter
Many cases don’t need ordering:
- Independent events
- Events with timestamps for reordering
- Last-write-wins semantics
Scaling Patterns
Consumer Groups (Kafka)
consumer = KafkaConsumer(
'orders',
group_id='order-processors', # Shared among consumers
auto_offset_reset='earliest'
)
# Multiple consumers in group share partitions
Competing Consumers (RabbitMQ)
Queue → Consumer 1 (ack before next)
→ Consumer 2
→ Consumer 3
Backpressure
Handle overflow:
# Limit in-flight messages
channel.basic_qos(prefetch_count=100)
# Or reject when busy
if queue_depth > MAX_DEPTH:
return 'server_busy', 503
Technology Comparison
RabbitMQ
Strengths:
- Flexible routing
- Multiple protocols (AMQP, MQTT, STOMP)
- Per-message ACK
Best for:
- Traditional message queuing
- Complex routing requirements
- RPC patterns
Kafka
Strengths:
- High throughput
- Persistent log
- Replay capability
- Stream processing
Best for:
- Event sourcing
- High volume ingestion
- Stream processing
SQS
Strengths:
- Managed (no operations)
- Simple
- Scales automatically
Best for:
- AWS-native applications
- Simple queuing needs
- Minimal operational overhead
Monitoring
Key Metrics
queue_depth:
warning: > 10000
critical: > 100000
consumer_lag:
warning: > 1000 messages
critical: > 10000 messages
processing_rate:
track: messages per second
alert: sudden drops
error_rate:
warning: > 1%
critical: > 5%
Dead Letter Monitoring
# Alert on DLQ growth
if dlq_depth > 0:
alert("Messages in dead letter queue")
Key Takeaways
- Use queues for async processing, load leveling, and decoupling
- Work queues distribute load; pub/sub broadcasts to many consumers
- Enable persistence and acknowledgments for reliability
- Use dead letter queues to handle failed messages
- Make consumers idempotent; duplicates happen
- Order is expensive; only require it when needed
- Scale with consumer groups or competing consumers
- Monitor queue depth, consumer lag, and error rates
- Choose RabbitMQ for flexibility, Kafka for throughput, SQS for simplicity
Message queues enable reliable async systems when used with the right patterns.