Skip to content

MLOps and ML Pipelines

MLOps applies DevOps principles to machine learning: version control for data/models, automated training pipelines, monitoring in production, and reproducible experiments. The gap between notebook prototype and production system is where most ML projects fail.

Experiment Tracking

MLflow

import mlflow
import mlflow.sklearn

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("fraud-detection-v2")

with mlflow.start_run(run_name="xgboost-baseline"):
    # Log parameters
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 6)
    mlflow.log_param("learning_rate", 0.1)

    model = XGBClassifier(**params)
    model.fit(X_train, y_train)

    # Log metrics
    y_pred = model.predict(X_test)
    mlflow.log_metric("f1", f1_score(y_test, y_pred))
    mlflow.log_metric("auprc", average_precision_score(y_test, y_pred))

    # Log model artifact
    mlflow.sklearn.log_model(model, "model")

    # Log data artifact
    mlflow.log_artifact("feature_config.yaml")

Weights & Biases (wandb)

import wandb

wandb.init(project="fraud-detection", config={"lr": 0.01, "epochs": 50})

for epoch in range(50):
    train_loss = train_one_epoch(model, train_loader)
    val_loss = evaluate(model, val_loader)
    wandb.log({"train_loss": train_loss, "val_loss": val_loss, "epoch": epoch})

wandb.finish()

Feature Store

Centralized repository for feature definitions and computed values. Prevents feature skew between training and serving.

# Feast example
from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# Define features
entity = Entity(name="customer_id", value_type=ValueType.INT64)

customer_features = FeatureView(
    name="customer_features",
    entities=[entity],
    schema=[
        Field(name="total_transactions_30d", dtype=Float32),
        Field(name="avg_transaction_amount", dtype=Float32),
        Field(name="account_age_days", dtype=Int64),
    ],
    source=BigQuerySource(table="project.dataset.customer_features"),
)

# Retrieve features for training
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["customer_features:total_transactions_30d",
              "customer_features:avg_transaction_amount"],
).to_df()

# Online serving (low-latency)
feature_vector = store.get_online_features(
    features=["customer_features:total_transactions_30d"],
    entity_rows=[{"customer_id": 12345}],
).to_dict()

Model Registry

Version models, track lineage, manage promotion stages.

# MLflow Model Registry
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "fraud-detector")

# Promote to production
client.transition_model_version_stage(
    name="fraud-detector",
    version=3,
    stage="Production"
)

# Load production model
model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")

Model Serving

FastAPI Serving

from fastapi import FastAPI
import mlflow.pyfunc
import numpy as np

app = FastAPI()
model = mlflow.pyfunc.load_model("models:/fraud-detector/Production")

@app.post("/predict")
async def predict(features: dict):
    input_df = pd.DataFrame([features])
    prediction = model.predict(input_df)
    return {"prediction": int(prediction[0]),
            "model_version": "3"}

BentoML (Batching + Serving)

import bentoml

# Save model
saved_model = bentoml.sklearn.save_model("fraud_model", model)

# Service definition
@bentoml.service
class FraudDetector:
    model_ref = bentoml.models.get("fraud_model:latest")

    @bentoml.api(batchable=True, max_batch_size=100)
    def predict(self, inputs: np.ndarray) -> np.ndarray:
        return self.model_ref.predict(inputs)

Model Monitoring

Data Drift Detection

from evidently import ColumnDriftMetric
from evidently.report import Report

# Compare training vs production distributions
drift_report = Report(metrics=[
    ColumnDriftMetric(column_name="transaction_amount"),
    ColumnDriftMetric(column_name="merchant_category"),
])
drift_report.run(reference_data=train_df, current_data=production_df)
drift_report.save_html("drift_report.html")

Key metrics to monitor: - Data drift: input feature distributions shift (KS-test, PSI) - Concept drift: relationship between features and target changes - Prediction drift: model output distribution changes - Performance degradation: actual metrics drop (requires ground truth)

Pipeline Orchestration

Prefect / Airflow Pattern

# Prefect pipeline example
from prefect import flow, task

@task
def load_data():
    return pd.read_parquet("s3://data/transactions.parquet")

@task
def preprocess(df):
    return feature_pipeline.transform(df)

@task
def train_model(X, y):
    model = XGBClassifier()
    model.fit(X, y)
    return model

@task
def evaluate_and_register(model, X_test, y_test):
    score = model.score(X_test, y_test)
    if score > 0.85:  # quality gate
        mlflow.sklearn.log_model(model, "model")
        return True
    return False

@flow(name="training-pipeline")
def training_pipeline():
    df = load_data()
    X, y = preprocess(df)
    model = train_model(X, y)
    evaluate_and_register(model, X, y)

CI/CD for ML

  • Data validation: Great Expectations or Pandera checks on input data
  • Model validation: automated tests on holdout set, A/B test framework
  • Shadow deployment: run new model alongside production, compare outputs
  • Canary deployment: route small % of traffic to new model, monitor metrics

Gotchas

  • Training-serving skew: features computed differently in training (batch SQL) vs serving (real-time). Feature stores solve this but add complexity. At minimum, share feature engineering code between training and serving codepaths
  • Model staleness: models degrade silently. Set up automated retraining triggers based on drift metrics or calendar schedule. Without monitoring, a model can serve bad predictions for months before anyone notices
  • Reproducibility requires more than code versioning: pin random seeds, log library versions, version the training data (DVC or similar), log preprocessing parameters. A model you cannot reproduce is a model you cannot debug

See Also