Async Job Processing Patterns

December 17, 2018

Not everything should happen in the request-response cycle. Sending emails, processing payments, generating reports, and syncing data are better handled asynchronously. But async processing introduces complexity: jobs fail, queues back up, and ordering matters.

Here’s how to build reliable async job processing systems.

When to Use Async

Move to Background When

The work is slow:

The work is unreliable:

The work can be delayed:

The work should be retried:

Keep Synchronous When

User expects immediate response:

Work is fast and reliable:

Architecture Patterns

Simple Queue Processing

Producer → Queue → Worker
# Producer
def create_order(order):
    db.save(order)
    queue.enqueue('send_confirmation_email', order_id=order.id)
    return order

# Worker
@worker.task('send_confirmation_email')
def send_confirmation_email(order_id):
    order = db.get_order(order_id)
    email.send(order.customer_email, template='confirmation', order=order)

Fan-Out Pattern

One event triggers multiple jobs:

# Single event, multiple handlers
def order_placed(order_id):
    queue.enqueue('send_confirmation', order_id=order_id)
    queue.enqueue('notify_warehouse', order_id=order_id)
    queue.enqueue('update_inventory', order_id=order_id)
    queue.enqueue('track_analytics', order_id=order_id)

Priority Queues

Different priorities for different work:

# High priority - user-facing
queue.enqueue('send_password_reset', email=email, queue='high')

# Default priority
queue.enqueue('sync_to_crm', user_id=user_id, queue='default')

# Low priority - batch work
queue.enqueue('generate_report', report_id=report_id, queue='low')

Configure workers per queue:

# 4 workers for high priority
# 2 workers for default
# 1 worker for low priority

Dead Letter Queues

Capture failed jobs for investigation:

Main Queue → Worker → Success
                ↓
              Failure (after retries)
                ↓
         Dead Letter Queue → Manual Review
@worker.task('process_payment', max_retries=3)
def process_payment(order_id):
    try:
        # Process payment
        pass
    except PermanentError as e:
        # Don't retry, send to DLQ
        dlq.send('process_payment', order_id=order_id, error=str(e))
        raise

Reliability Patterns

Idempotency

Jobs may run multiple times (retries, duplicates):

@worker.task('send_email')
def send_email(job_id, recipient, template):
    # Check if already processed
    if db.job_processed(job_id):
        return  # Idempotent - already done

    # Process
    email.send(recipient, template)

    # Mark as processed
    db.mark_processed(job_id)

Or use idempotency keys:

def charge_payment(idempotency_key, amount):
    # External service handles idempotency
    return stripe.charge(
        amount=amount,
        idempotency_key=idempotency_key
    )

Exactly-Once Processing

True exactly-once is hard. Options:

At-least-once + idempotency: Most common approach Transactional outbox: Database + queue atomicity Stream processing: Kafka with exactly-once semantics

# Transactional outbox pattern
def create_order(order):
    with db.transaction():
        db.save(order)
        db.save_outbox_message(
            topic='orders',
            message={'type': 'order_created', 'order_id': order.id}
        )

# Separate process reads outbox, publishes to queue, marks as sent

Retry with Backoff

@worker.task(
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=3600,  # Max 1 hour between retries
    retry_on=(ConnectionError, Timeout, RateLimitError)
)
def sync_external_service(item_id):
    external_api.sync(item_id)

Exponential backoff prevents overwhelming recovering services.

Timeouts

Jobs should have time limits:

@worker.task(soft_time_limit=300, time_limit=360)
def process_large_file(file_id):
    # 5 minute soft limit (can handle gracefully)
    # 6 minute hard limit (process killed)
    pass

Health Checks

Monitor job processing health:

# Queue depth
queue_size = queue.size()
if queue_size > 10000:
    alert("Queue backing up")

# Processing rate
jobs_per_minute = metrics.get('jobs_processed_per_minute')
if jobs_per_minute < expected_rate * 0.5:
    alert("Processing rate dropped")

# Error rate
error_rate = metrics.get('job_error_rate')
if error_rate > 0.05:
    alert("High job error rate")

Job Design

Small, Focused Jobs

# Bad - one giant job
@worker.task
def process_order(order_id):
    validate_order()
    charge_payment()
    update_inventory()
    send_confirmation()
    notify_warehouse()
    update_analytics()

# Good - separate jobs
@worker.task
def charge_payment(order_id): ...

@worker.task
def update_inventory(order_id): ...

@worker.task
def send_confirmation(order_id): ...

Job Arguments

Pass IDs, not objects:

# Bad - large payload, stale data
queue.enqueue('process_order', order=order_object)

# Good - fetch fresh data
queue.enqueue('process_order', order_id=order.id)

@worker.task
def process_order(order_id):
    order = db.get_order(order_id)  # Fresh data
    # Process...

Failure Information

Include context for debugging:

@worker.task
def process_item(item_id, context=None):
    try:
        item = db.get_item(item_id)
        # Process...
    except Exception as e:
        logger.error(
            "Job failed",
            item_id=item_id,
            context=context,
            error=str(e),
            traceback=traceback.format_exc()
        )
        raise

Queue Technologies

Redis-Based

Sidekiq (Ruby), RQ (Python), Bull (Node.js):

Pros:

Cons:

Message Brokers

RabbitMQ:

Pros:

Cons:

Kafka:

Pros:

Cons:

Managed Services

AWS SQS, Google Cloud Tasks, Azure Queue Storage:

Pros:

Cons:

Monitoring and Observability

Key Metrics

# Queue metrics
jobs_enqueued_total
jobs_processed_total
jobs_failed_total
queue_depth
processing_latency_seconds

# Worker metrics
workers_active
workers_idle
memory_usage

Dashboards

Track:

Alerting

Alert on:

Key Takeaways

Async processing is essential for scalable systems. Design for reliability from the start.