Building Data Pipelines That Don't Break

April 24, 2017

Data pipelines have a reputation for brittleness. They break when source schemas change. They fail silently, corrupting downstream data. They’re difficult to debug and tedious to maintain.

This reputation is earned, but not inevitable. With careful design, data pipelines can be as reliable as any other production system.

Why Pipelines Break

Understanding failure modes helps prevent them:

Schema Evolution

Upstream systems change schemas without warning. New fields appear. Fields are renamed. Types change. Pipelines expecting specific schemas fail.

Data Quality Issues

Bad data enters the pipeline:

Pipelines assuming clean data corrupt downstream systems.

Volume Spikes

Traffic increases beyond expected levels. Pipelines that worked at 1000 records/minute fail at 10,000.

Dependency Failures

External systems become unavailable:

Pipelines without failure handling stop completely.

Silent Failures

The worst failure: pipelines that appear to work but produce wrong results. No errors, but data is incorrect or incomplete.

Design Principles

Schema Flexibility

Don’t assume rigid schemas:

Schema-on-read: Store raw data as received. Apply schema when reading. Tolerates upstream changes.

Explicit schema versioning: Track schema versions. Handle multiple versions in pipeline logic.

Graceful degradation: Unknown fields logged but not failed on. Missing optional fields handled with defaults.

def process_record(record):
    # Required fields - fail if missing
    user_id = record['user_id']

    # Optional fields - default if missing
    email = record.get('email', None)

    # Unknown fields - log and continue
    known_fields = {'user_id', 'email', 'name', ...}
    unknown = set(record.keys()) - known_fields
    if unknown:
        log.warning(f"Unknown fields: {unknown}")

    return process(user_id, email, ...)

Idempotency

Operations should be safe to retry:

Same result on re-run: Processing the same data twice produces the same outcome.

Deduplication: Detect and handle duplicate records.

Upsert over insert: Update if exists, insert if not. Handles replays gracefully.

def write_record(record):
    # Upsert - safe to retry
    db.execute("""
        INSERT INTO users (id, data)
        VALUES (%s, %s)
        ON CONFLICT (id) DO UPDATE SET data = %s
    """, record.id, record.data, record.data)

Data Validation

Validate data at pipeline boundaries:

Input validation: Check incoming data against expectations. Reject or quarantine invalid records.

Output validation: Verify pipeline output meets quality standards before committing.

Assertions: Explicit checks for invariants.

def validate_record(record):
    if not record.get('user_id'):
        raise ValidationError("Missing user_id")

    if record.get('age') and (record['age'] < 0 or record['age'] > 150):
        raise ValidationError(f"Invalid age: {record['age']}")

    return record

Dead Letter Queues

Don’t lose failed records:

Input → Processing → Success → Output
              ↓
            Failure
              ↓
         Dead Letter Queue

Failed records go to a dead letter queue for investigation and reprocessing.

Checkpointing

For long-running pipelines, save progress:

Checkpoint state: Periodically save current position.

Restart from checkpoint: On failure, resume from last checkpoint rather than restarting.

Consistent checkpoints: Ensure checkpoint represents consistent state.

Monitoring and Alerting

Key Metrics

Throughput: Records processed per time period. Sudden drops indicate problems.

Latency: Time from source to destination. Increasing latency indicates bottlenecks.

Error rate: Failed records / total records. Any sustained error rate needs investigation.

Lag: For streaming, how far behind real-time. Growing lag indicates processing can’t keep up.

Data freshness: Age of newest data in destination. Stale data indicates pipeline issues.

Alerting

Alert on:

Data Quality Monitoring

Check output data quality:

-- Example quality checks
SELECT
  COUNT(*) as row_count,
  COUNT(*) FILTER (WHERE user_id IS NULL) as null_user_ids,
  COUNT(DISTINCT user_id) as unique_users,
  MIN(created_at) as earliest_record,
  MAX(created_at) as latest_record
FROM output_table
WHERE created_at > NOW() - INTERVAL '1 day'

Testing

Unit Tests

Test transformation logic:

Integration Tests

Test against realistic data:

Data Tests

Run tests on actual output:

Pipeline Tests

Test full pipeline behavior:

Operational Practices

Backfill Capability

Pipelines need to reprocess historical data:

Design for easy backfill:

Version Control

Pipeline code is code. Treat it accordingly:

Runbooks

Document operational procedures:

Tool Selection

Batch Processing

Apache Spark: Distributed processing for large datasets.

Apache Airflow: Workflow orchestration. DAGs define pipeline dependencies.

dbt: SQL-based transformations. Great for analytics pipelines.

Stream Processing

Apache Kafka: Distributed streaming platform. Often the backbone of real-time pipelines.

Apache Flink: Stream processing with exactly-once semantics.

AWS Kinesis / GCP Dataflow: Managed streaming services.

Choose Based On

Key Takeaways