Machine learning has moved from research to production. But deploying models is just the beginning—they need monitoring, retraining, and all the operational discipline we apply to traditional software. MLOps brings DevOps practices to machine learning.
Here’s how to operationalize ML systems.
The MLOps Challenge
Why ML Is Different
traditional_software:
- Code changes → New behavior
- Deterministic outputs
- Test with assertions
- Deploy and done
machine_learning:
- Code + Data + Model → Behavior
- Probabilistic outputs
- Test with metrics
- Deploy and monitor continuously
- Model degrades over time
What Can Go Wrong
data_problems:
- Training/serving skew
- Feature drift
- Label drift
- Data quality issues
model_problems:
- Model decay
- Concept drift
- Adversarial inputs
- Fairness issues
operational_problems:
- Latency requirements
- Scalability
- Version management
- Reproducibility
MLOps Lifecycle
The Full Picture
┌─────────────────────────────────────────────────────────────────┐
│ MLOps Lifecycle │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Data ──► Features ──► Training ──► Validation ──► Deployment │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ Version Feature Experiment Model Model │
│ Control Store Tracking Registry Serving │
│ │ │
│ Monitoring ◄────────────────────┘ │
│ │ │
│ ▼ │
│ Retraining │
│ │
└─────────────────────────────────────────────────────────────────┘
Data Management
Data Versioning
Track data like code:
# DVC (Data Version Control)
stages:
prepare:
cmd: python src/prepare.py data/raw data/prepared
deps:
- src/prepare.py
- data/raw
outs:
- data/prepared
train:
cmd: python src/train.py data/prepared models/model.pkl
deps:
- src/train.py
- data/prepared
outs:
- models/model.pkl
metrics:
- metrics.json
# Version data with DVC
dvc add data/training.csv
git add data/training.csv.dvc
git commit -m "Add training data v1"
dvc push
# Reproduce any version
git checkout v1.0
dvc checkout
Feature Store
Centralized feature management:
# Feast feature store
from feast import FeatureStore
store = FeatureStore(repo_path=".")
# Define features
@feast.entity
class User:
id: int
@feast.feature_view
class UserFeatures:
source = BigQuerySource(...)
entities = [User]
features = [
Feature(name="age", dtype=Float),
Feature(name="account_age_days", dtype=Int),
Feature(name="purchase_count_30d", dtype=Int),
]
# Get features for training
training_df = store.get_historical_features(
entity_df=entity_df,
features=["user_features:age", "user_features:purchase_count_30d"]
).to_df()
# Get features for serving
online_features = store.get_online_features(
features=["user_features:age", "user_features:purchase_count_30d"],
entity_rows=[{"user_id": 123}]
).to_dict()
Experiment Tracking
MLflow Example
import mlflow
mlflow.set_experiment("fraud-detection")
with mlflow.start_run():
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("n_estimators", 100)
# Train model
model = train_model(X_train, y_train, lr=0.01, n_estimators=100)
# Log metrics
accuracy = evaluate(model, X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
Comparing Experiments
# Query experiments
runs = mlflow.search_runs(
experiment_ids=[exp_id],
filter_string="metrics.accuracy > 0.9",
order_by=["metrics.f1_score DESC"]
)
# Get best run
best_run = runs.iloc[0]
best_model_uri = f"runs:/{best_run.run_id}/model"
Model Registry
Versioning and Staging
# Register model
mlflow.register_model(
model_uri=f"runs:/{run_id}/model",
name="fraud-detection"
)
# Transition stages
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="fraud-detection",
version=1,
stage="Staging"
)
# After validation
client.transition_model_version_stage(
name="fraud-detection",
version=1,
stage="Production"
)
Model Metadata
model_info:
name: fraud-detection
version: 3
stage: Production
metrics:
accuracy: 0.95
f1_score: 0.89
auc_roc: 0.97
training_data:
source: s3://data/training_v5.parquet
size: 1_000_000
date_range: 2021-01-01 to 2021-02-28
features:
- transaction_amount
- merchant_category
- user_age_days
- transaction_count_24h
training_run: runs:/abc123
Model Serving
REST API Serving
# FastAPI model server
from fastapi import FastAPI
import mlflow
app = FastAPI()
model = mlflow.pyfunc.load_model("models:/fraud-detection/Production")
@app.post("/predict")
async def predict(request: PredictionRequest):
features = prepare_features(request)
prediction = model.predict(features)
return {"prediction": prediction[0], "confidence": confidence}
@app.get("/health")
async def health():
return {"status": "healthy", "model_version": model.metadata.version}
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: fraud-detection-model
spec:
replicas: 3
selector:
matchLabels:
app: fraud-detection
template:
spec:
containers:
- name: model
image: myregistry/fraud-model:v3
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "4Gi"
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /health
port: 8080
Batch Inference
# Spark batch inference
from pyspark.sql import SparkSession
import mlflow
spark = SparkSession.builder.getOrCreate()
model = mlflow.pyfunc.spark_udf(spark, "models:/fraud-detection/Production")
predictions = (
spark.read.parquet("s3://data/transactions")
.withColumn("prediction", model(struct(*feature_columns)))
.write.parquet("s3://data/predictions")
)
Monitoring
Model Performance
# Track prediction metrics
from prometheus_client import Counter, Histogram
predictions_total = Counter(
'model_predictions_total',
'Total predictions',
['model', 'version']
)
prediction_latency = Histogram(
'model_prediction_latency_seconds',
'Prediction latency',
['model']
)
@app.post("/predict")
async def predict(request: PredictionRequest):
with prediction_latency.labels(model='fraud').time():
prediction = model.predict(features)
predictions_total.labels(model='fraud', version='v3').inc()
return prediction
Data Drift Detection
from evidently.dashboard import Dashboard
from evidently.tabs import DataDriftTab
# Compare reference and current data
dashboard = Dashboard(tabs=[DataDriftTab()])
dashboard.calculate(reference_data, current_data)
dashboard.save("drift_report.html")
# Programmatic drift detection
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection
profile = Profile(sections=[DataDriftProfileSection()])
profile.calculate(reference_data, current_data)
drift_detected = profile.json()["data_drift"]["data"]["metrics"]["dataset_drift"]
if drift_detected:
trigger_retraining()
Model Decay Monitoring
alerts:
- name: ModelAccuracyDrop
condition: model_accuracy < 0.90
action: alert_data_science
- name: PredictionDistributionShift
condition: prediction_distribution_divergence > 0.1
action: investigate
- name: FeatureDrift
condition: feature_drift_score > threshold
action: trigger_retraining
CI/CD for ML
Training Pipeline
# GitHub Actions ML pipeline
name: ML Pipeline
on:
push:
paths:
- 'data/**'
- 'src/**'
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
- name: Pull data
run: dvc pull
- name: Train model
run: python src/train.py
- name: Evaluate
run: python src/evaluate.py
- name: Register model
if: success()
run: python src/register_model.py
Model Validation
def validate_model(model_uri):
"""Gate for production deployment"""
model = mlflow.pyfunc.load_model(model_uri)
# Performance check
metrics = evaluate(model, test_data)
assert metrics['accuracy'] > 0.90, "Accuracy too low"
assert metrics['latency_p99'] < 100, "Latency too high"
# Fairness check
fairness = check_fairness(model, test_data)
assert fairness['demographic_parity'] < 0.05, "Fairness violation"
# A/B test comparison
if current_production_model:
improvement = metrics['accuracy'] - production_metrics['accuracy']
assert improvement > -0.01, "Regression detected"
return True
Key Takeaways
- MLOps brings DevOps practices to ML: versioning, CI/CD, monitoring
- Data versioning is as important as code versioning; use DVC or similar
- Feature stores ensure consistency between training and serving
- Experiment tracking enables reproducibility and comparison
- Model registry manages versions and staging (dev → staging → prod)
- Monitor model performance and data drift continuously
- Models decay over time; plan for retraining
- Automate training pipelines with CI/CD
- Validate models for performance, fairness, and regression before production
- Start simple and add complexity as needed
ML systems require ongoing operational attention. MLOps practices ensure models stay healthy and continue delivering value.