MLOps Fundamentals: Operationalizing Machine Learning

March 22, 2021

Machine learning has moved from research to production. But deploying models is just the beginning—they need monitoring, retraining, and all the operational discipline we apply to traditional software. MLOps brings DevOps practices to machine learning.

Here’s how to operationalize ML systems.

The MLOps Challenge

Why ML Is Different

traditional_software:
  - Code changes → New behavior
  - Deterministic outputs
  - Test with assertions
  - Deploy and done

machine_learning:
  - Code + Data + Model → Behavior
  - Probabilistic outputs
  - Test with metrics
  - Deploy and monitor continuously
  - Model degrades over time

What Can Go Wrong

data_problems:
  - Training/serving skew
  - Feature drift
  - Label drift
  - Data quality issues

model_problems:
  - Model decay
  - Concept drift
  - Adversarial inputs
  - Fairness issues

operational_problems:
  - Latency requirements
  - Scalability
  - Version management
  - Reproducibility

MLOps Lifecycle

The Full Picture

┌─────────────────────────────────────────────────────────────────┐
│                       MLOps Lifecycle                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Data ──► Features ──► Training ──► Validation ──► Deployment   │
│    │         │            │             │              │         │
│    ▼         ▼            ▼             ▼              ▼         │
│  Version   Feature     Experiment    Model         Model        │
│  Control   Store       Tracking      Registry      Serving      │
│                                                        │         │
│                        Monitoring ◄────────────────────┘         │
│                            │                                     │
│                            ▼                                     │
│                        Retraining                                │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Data Management

Data Versioning

Track data like code:

# DVC (Data Version Control)
stages:
  prepare:
    cmd: python src/prepare.py data/raw data/prepared
    deps:
      - src/prepare.py
      - data/raw
    outs:
      - data/prepared

  train:
    cmd: python src/train.py data/prepared models/model.pkl
    deps:
      - src/train.py
      - data/prepared
    outs:
      - models/model.pkl
    metrics:
      - metrics.json
# Version data with DVC
dvc add data/training.csv
git add data/training.csv.dvc
git commit -m "Add training data v1"
dvc push

# Reproduce any version
git checkout v1.0
dvc checkout

Feature Store

Centralized feature management:

# Feast feature store
from feast import FeatureStore

store = FeatureStore(repo_path=".")

# Define features
@feast.entity
class User:
    id: int

@feast.feature_view
class UserFeatures:
    source = BigQuerySource(...)
    entities = [User]
    features = [
        Feature(name="age", dtype=Float),
        Feature(name="account_age_days", dtype=Int),
        Feature(name="purchase_count_30d", dtype=Int),
    ]

# Get features for training
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["user_features:age", "user_features:purchase_count_30d"]
).to_df()

# Get features for serving
online_features = store.get_online_features(
    features=["user_features:age", "user_features:purchase_count_30d"],
    entity_rows=[{"user_id": 123}]
).to_dict()

Experiment Tracking

MLflow Example

import mlflow

mlflow.set_experiment("fraud-detection")

with mlflow.start_run():
    # Log parameters
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_param("n_estimators", 100)

    # Train model
    model = train_model(X_train, y_train, lr=0.01, n_estimators=100)

    # Log metrics
    accuracy = evaluate(model, X_test, y_test)
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)

    # Log model
    mlflow.sklearn.log_model(model, "model")

    # Log artifacts
    mlflow.log_artifact("feature_importance.png")

Comparing Experiments

# Query experiments
runs = mlflow.search_runs(
    experiment_ids=[exp_id],
    filter_string="metrics.accuracy > 0.9",
    order_by=["metrics.f1_score DESC"]
)

# Get best run
best_run = runs.iloc[0]
best_model_uri = f"runs:/{best_run.run_id}/model"

Model Registry

Versioning and Staging

# Register model
mlflow.register_model(
    model_uri=f"runs:/{run_id}/model",
    name="fraud-detection"
)

# Transition stages
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="fraud-detection",
    version=1,
    stage="Staging"
)

# After validation
client.transition_model_version_stage(
    name="fraud-detection",
    version=1,
    stage="Production"
)

Model Metadata

model_info:
  name: fraud-detection
  version: 3
  stage: Production
  metrics:
    accuracy: 0.95
    f1_score: 0.89
    auc_roc: 0.97
  training_data:
    source: s3://data/training_v5.parquet
    size: 1_000_000
    date_range: 2021-01-01 to 2021-02-28
  features:
    - transaction_amount
    - merchant_category
    - user_age_days
    - transaction_count_24h
  training_run: runs:/abc123

Model Serving

REST API Serving

# FastAPI model server
from fastapi import FastAPI
import mlflow

app = FastAPI()
model = mlflow.pyfunc.load_model("models:/fraud-detection/Production")

@app.post("/predict")
async def predict(request: PredictionRequest):
    features = prepare_features(request)
    prediction = model.predict(features)
    return {"prediction": prediction[0], "confidence": confidence}

@app.get("/health")
async def health():
    return {"status": "healthy", "model_version": model.metadata.version}

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: fraud-detection-model
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fraud-detection
  template:
    spec:
      containers:
        - name: model
          image: myregistry/fraud-model:v3
          resources:
            requests:
              cpu: "500m"
              memory: "1Gi"
            limits:
              cpu: "2"
              memory: "4Gi"
          ports:
            - containerPort: 8080
          readinessProbe:
            httpGet:
              path: /health
              port: 8080

Batch Inference

# Spark batch inference
from pyspark.sql import SparkSession
import mlflow

spark = SparkSession.builder.getOrCreate()
model = mlflow.pyfunc.spark_udf(spark, "models:/fraud-detection/Production")

predictions = (
    spark.read.parquet("s3://data/transactions")
    .withColumn("prediction", model(struct(*feature_columns)))
    .write.parquet("s3://data/predictions")
)

Monitoring

Model Performance

# Track prediction metrics
from prometheus_client import Counter, Histogram

predictions_total = Counter(
    'model_predictions_total',
    'Total predictions',
    ['model', 'version']
)

prediction_latency = Histogram(
    'model_prediction_latency_seconds',
    'Prediction latency',
    ['model']
)

@app.post("/predict")
async def predict(request: PredictionRequest):
    with prediction_latency.labels(model='fraud').time():
        prediction = model.predict(features)

    predictions_total.labels(model='fraud', version='v3').inc()
    return prediction

Data Drift Detection

from evidently.dashboard import Dashboard
from evidently.tabs import DataDriftTab

# Compare reference and current data
dashboard = Dashboard(tabs=[DataDriftTab()])
dashboard.calculate(reference_data, current_data)
dashboard.save("drift_report.html")

# Programmatic drift detection
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection

profile = Profile(sections=[DataDriftProfileSection()])
profile.calculate(reference_data, current_data)

drift_detected = profile.json()["data_drift"]["data"]["metrics"]["dataset_drift"]
if drift_detected:
    trigger_retraining()

Model Decay Monitoring

alerts:
  - name: ModelAccuracyDrop
    condition: model_accuracy < 0.90
    action: alert_data_science

  - name: PredictionDistributionShift
    condition: prediction_distribution_divergence > 0.1
    action: investigate

  - name: FeatureDrift
    condition: feature_drift_score > threshold
    action: trigger_retraining

CI/CD for ML

Training Pipeline

# GitHub Actions ML pipeline
name: ML Pipeline

on:
  push:
    paths:
      - 'data/**'
      - 'src/**'

jobs:
  train:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Setup Python
        uses: actions/setup-python@v4

      - name: Pull data
        run: dvc pull

      - name: Train model
        run: python src/train.py

      - name: Evaluate
        run: python src/evaluate.py

      - name: Register model
        if: success()
        run: python src/register_model.py

Model Validation

def validate_model(model_uri):
    """Gate for production deployment"""
    model = mlflow.pyfunc.load_model(model_uri)

    # Performance check
    metrics = evaluate(model, test_data)
    assert metrics['accuracy'] > 0.90, "Accuracy too low"
    assert metrics['latency_p99'] < 100, "Latency too high"

    # Fairness check
    fairness = check_fairness(model, test_data)
    assert fairness['demographic_parity'] < 0.05, "Fairness violation"

    # A/B test comparison
    if current_production_model:
        improvement = metrics['accuracy'] - production_metrics['accuracy']
        assert improvement > -0.01, "Regression detected"

    return True

Key Takeaways

ML systems require ongoing operational attention. MLOps practices ensure models stay healthy and continue delivering value.