Data Engineering Patterns for the Modern Stack

May 17, 2021

Data engineering has transformed from ETL scripting to building sophisticated data platforms. The modern data stack emphasizes modularity, real-time capabilities, and self-service analytics. Understanding current patterns is essential for building data infrastructure that scales.

Here’s what’s shaping data engineering today.

The Modern Data Stack

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    Data Sources                                  │
│  Databases, APIs, Events, Files, SaaS Applications              │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Ingestion Layer                               │
│  Fivetran, Airbyte, Kafka, Debezium                             │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Storage Layer                                 │
│  Snowflake, BigQuery, Databricks, Redshift                      │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Transformation Layer                          │
│  dbt, Spark, SQL                                                │
└───────────────────────────┬─────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Serving Layer                                 │
│  BI Tools, Reverse ETL, ML Platforms, APIs                      │
└─────────────────────────────────────────────────────────────────┘

Key Characteristics

modern_stack_traits:
  cloud_native:
    - Managed services
    - Elastic scaling
    - Pay-per-use

  modular:
    - Best-of-breed tools
    - Replaceable components
    - Standard interfaces

  elt_over_etl:
    - Load raw data first
    - Transform in warehouse
    - Preserve source data

  analytics_engineering:
    - SQL-first transformation
    - Version controlled
    - Tested data models

Data Ingestion

Batch Ingestion

# Airbyte connection
connections:
  - source: postgres
    destination: snowflake
    schedule: "@hourly"
    sync_mode: incremental
    cursor_field: updated_at

  - source: salesforce
    destination: snowflake
    schedule: "@daily"
    sync_mode: full_refresh

Change Data Capture

# Debezium CDC
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
spec:
  config:
    connector.class: io.debezium.connector.postgresql.PostgresConnector
    database.hostname: postgres
    database.dbname: orders
    table.include.list: public.orders,public.customers
    plugin.name: pgoutput

Event Streaming

# Kafka producer
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def emit_event(event_type, data):
    producer.send('events', {
        'type': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'data': data
    })

Transformation with dbt

Model Structure

-- models/staging/stg_orders.sql
{{ config(materialized='view') }}

select
    id as order_id,
    customer_id,
    created_at,
    status,
    total_amount
from {{ source('raw', 'orders') }}
where created_at >= '2020-01-01'
-- models/marts/fct_orders.sql
{{ config(materialized='table') }}

select
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.customer_segment,
    o.created_at,
    o.status,
    o.total_amount,
    count(oi.item_id) as item_count
from {{ ref('stg_orders') }} o
left join {{ ref('dim_customers') }} c on o.customer_id = c.customer_id
left join {{ ref('stg_order_items') }} oi on o.order_id = oi.order_id
group by 1, 2, 3, 4, 5, 6, 7

Testing

# models/marts/schema.yml
version: 2

models:
  - name: fct_orders
    description: Order fact table
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id

Incremental Models

-- models/marts/fct_events.sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge'
) }}

select
    event_id,
    user_id,
    event_type,
    event_timestamp,
    properties
from {{ source('raw', 'events') }}

{% if is_incremental() %}
where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}

Data Quality

Great Expectations

import great_expectations as gx

context = gx.get_context()

# Define expectations
suite = context.add_expectation_suite("orders_suite")

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="orders_suite"
)

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("total_amount", min_value=0)
validator.expect_column_values_to_be_in_set("status", ["pending", "shipped", "delivered"])

dbt Tests

-- tests/assert_orders_balance.sql
-- Custom test: order totals match line items
select
    o.order_id
from {{ ref('fct_orders') }} o
join (
    select order_id, sum(line_total) as calculated_total
    from {{ ref('fct_order_items') }}
    group by 1
) oi on o.order_id = oi.order_id
where abs(o.total_amount - oi.calculated_total) > 0.01

Orchestration

Airflow DAGs

from airflow import DAG
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from datetime import datetime

with DAG(
    'daily_data_pipeline',
    schedule_interval='@daily',
    start_date=datetime(2021, 1, 1),
    catchup=False
) as dag:

    sync_salesforce = AirbyteTriggerSyncOperator(
        task_id='sync_salesforce',
        airbyte_conn_id='airbyte_default',
        connection_id='salesforce_connection_id'
    )

    sync_postgres = AirbyteTriggerSyncOperator(
        task_id='sync_postgres',
        airbyte_conn_id='airbyte_default',
        connection_id='postgres_connection_id'
    )

    run_dbt = DbtCloudRunJobOperator(
        task_id='run_dbt',
        dbt_cloud_conn_id='dbt_cloud',
        job_id=12345,
        check_interval=60
    )

    [sync_salesforce, sync_postgres] >> run_dbt

Dagster

from dagster import asset, job, schedule

@asset
def raw_orders():
    """Load raw orders from source"""
    return extract_orders()

@asset
def cleaned_orders(raw_orders):
    """Clean and validate orders"""
    return clean_orders(raw_orders)

@asset
def order_metrics(cleaned_orders):
    """Calculate order metrics"""
    return calculate_metrics(cleaned_orders)

@job
def daily_pipeline():
    order_metrics()

@schedule(job=daily_pipeline, cron_schedule="0 6 * * *")
def daily_schedule():
    return {}

Real-Time Patterns

Stream Processing

# Flink SQL or Spark Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count

spark = SparkSession.builder.getOrCreate()

events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load()

windowed_counts = events \
    .groupBy(
        window("timestamp", "5 minutes"),
        "event_type"
    ) \
    .agg(count("*").alias("event_count"))

query = windowed_counts \
    .writeStream \
    .outputMode("update") \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/events") \
    .start("/data/event_metrics")

Lambda Architecture (Simplified)

architecture:
  batch_layer:
    - Process historical data
    - dbt models in warehouse
    - Updated daily/hourly

  speed_layer:
    - Process real-time events
    - Stream processing
    - Low latency

  serving_layer:
    - Combine batch and speed
    - Query from unified view
    - Materialized views or CQRS

Reverse ETL

Push data back to operational systems:

# Census or Hightouch
syncs:
  - name: customer_health_to_salesforce
    source:
      warehouse: snowflake
      model: customer_health_score
    destination:
      service: salesforce
      object: Account
    mapping:
      customer_id: Id
      health_score: Health_Score__c
      last_order_date: Last_Order_Date__c
    schedule: "@hourly"

Key Takeaways

Data engineering has become a sophisticated discipline. The modern stack provides powerful capabilities, but requires thoughtful architecture and operational practices.