← Back to Skills

MLOps

ops

Machine learning operations, model deployment, experiment tracking, and ML pipeline best practices.

MLOps

Guidelines for machine learning operations, model lifecycle management, and ML pipelines.

When to Activate

Experiment Tracking

MLflow setup

import mlflow

# Set tracking URI
mlflow.set_tracking_uri("http://mlflow.internal:5000")
mlflow.set_experiment("recommendation-model")

with mlflow.start_run(run_name="xgboost-v2"):
    # Log parameters
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_param("max_depth", 6)
    mlflow.log_param("n_estimators", 100)
    
    # Train model
    model = train_model(params)
    
    # Log metrics
    mlflow.log_metric("accuracy", 0.95)
    mlflow.log_metric("f1_score", 0.93)
    mlflow.log_metric("auc_roc", 0.97)
    
    # Log model artifact
    mlflow.sklearn.log_model(model, "model")
    
    # Log additional artifacts
    mlflow.log_artifact("confusion_matrix.png")
    mlflow.log_artifact("feature_importance.csv")

Weights & Biases

import wandb

wandb.init(
    project="recommendation-model",
    config={
        "learning_rate": 0.01,
        "epochs": 100,
        "batch_size": 32,
    }
)

for epoch in range(epochs):
    train_loss = train_epoch(model, train_loader)
    val_loss, val_accuracy = evaluate(model, val_loader)
    
    wandb.log({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_loss": val_loss,
        "val_accuracy": val_accuracy,
    })

# Log model
wandb.save("model.pth")
wandb.finish()

Model Serving

FastAPI model server

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np

app = FastAPI(title="ML Model API")

# Load model at startup
model = None

@app.on_event("startup")
async def load_model():
    global model
    model = joblib.load("model.joblib")

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    features = np.array(request.features).reshape(1, -1)
    prediction = model.predict(features)[0]
    confidence = model.predict_proba(features).max()
    
    return PredictionResponse(
        prediction=float(prediction),
        confidence=float(confidence)
    )

@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": model is not None}

Docker for ML

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model and code
COPY model.joblib .
COPY app.py .

# Non-root user
RUN useradd -m appuser
USER appuser

EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

Feature Store

Feast configuration

# feature_store.yaml
project: recommendation
registry: s3://feature-store/registry.db
provider: aws
online_store:
  type: redis
  connection_string: redis://redis:6379
offline_store:
  type: redshift
  cluster_id: feature-store
  region: us-east-1
  database: features

Feature definitions

from feast import Entity, Feature, FeatureView, FileSource
from feast.types import Float32, Int64
from datetime import timedelta

# Entity definition
user = Entity(
    name="user_id",
    join_keys=["user_id"],
    description="User identifier"
)

# Feature source
user_features_source = FileSource(
    path="s3://features/user_features.parquet",
    timestamp_field="event_timestamp",
)

# Feature view
user_features = FeatureView(
    name="user_features",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Feature(name="avg_purchase_amount", dtype=Float32),
        Feature(name="purchase_count", dtype=Int64),
        Feature(name="days_since_last_purchase", dtype=Int64),
    ],
    source=user_features_source,
)

ML Pipeline

Kubeflow pipeline

from kfp import dsl
from kfp.components import create_component_from_func

@create_component_from_func
def preprocess_data(input_path: str, output_path: str):
    import pandas as pd
    
    df = pd.read_parquet(input_path)
    # Preprocessing logic
    df_processed = preprocess(df)
    df_processed.to_parquet(output_path)
    return output_path

@create_component_from_func
def train_model(data_path: str, model_path: str, params: dict):
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    
    data = load_data(data_path)
    model = RandomForestClassifier(**params)
    model.fit(data.X, data.y)
    joblib.dump(model, model_path)
    return model_path

@create_component_from_func
def evaluate_model(model_path: str, test_data_path: str) -> float:
    import joblib
    
    model = joblib.load(model_path)
    test_data = load_data(test_data_path)
    accuracy = model.score(test_data.X, test_data.y)
    return accuracy

@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(input_data: str):
    preprocess_task = preprocess_data(input_data, "/tmp/processed")
    
    train_task = train_model(
        preprocess_task.output,
        "/tmp/model.joblib",
        {"n_estimators": 100, "max_depth": 10}
    )
    
    evaluate_task = evaluate_model(
        train_task.output,
        "/tmp/test_data"
    )

Model Monitoring

Drift detection

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metrics import (
    DataDriftTable,
    DatasetDriftMetric,
    ColumnDriftMetric,
)

def detect_drift(reference_data, current_data):
    report = Report(metrics=[
        DatasetDriftMetric(),
        DataDriftTable(),
        ColumnDriftMetric(column_name="feature_1"),
    ])
    
    report.run(
        reference_data=reference_data,
        current_data=current_data,
    )
    
    result = report.as_dict()
    
    if result["metrics"][0]["result"]["dataset_drift"]:
        alert_team("Data drift detected!")
        trigger_retraining()
    
    return result

Performance monitoring

import prometheus_client as prom

# Metrics
prediction_latency = prom.Histogram(
    'model_prediction_latency_seconds',
    'Model prediction latency',
    buckets=[.01, .025, .05, .1, .25, .5, 1.0]
)

prediction_count = prom.Counter(
    'model_prediction_total',
    'Total predictions',
    ['model_version', 'outcome']
)

feature_distribution = prom.Histogram(
    'model_feature_value',
    'Feature value distribution',
    ['feature_name'],
    buckets=[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)

@app.post("/predict")
async def predict(request: PredictionRequest):
    with prediction_latency.time():
        result = model.predict(request.features)
    
    prediction_count.labels(
        model_version="v2",
        outcome="success"
    ).inc()
    
    # Track feature distributions
    for i, value in enumerate(request.features):
        feature_distribution.labels(f"feature_{i}").observe(value)
    
    return result

Model Registry

MLflow model registry

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

# Register model
model_uri = f"runs:/{run_id}/model"
model_version = mlflow.register_model(model_uri, "recommendation-model")

# Transition to staging
client.transition_model_version_stage(
    name="recommendation-model",
    version=model_version.version,
    stage="Staging"
)

# After validation, promote to production
client.transition_model_version_stage(
    name="recommendation-model",
    version=model_version.version,
    stage="Production"
)

# Load production model
model = mlflow.pyfunc.load_model(
    model_uri="models:/recommendation-model/Production"
)

A/B Testing Models

import random

class ModelRouter:
    def __init__(self):
        self.models = {
            "control": load_model("v1"),
            "treatment": load_model("v2"),
        }
        self.traffic_split = {"control": 0.9, "treatment": 0.1}
    
    def route(self, user_id: str) -> str:
        # Deterministic routing based on user_id
        hash_val = hash(user_id) % 100
        cumulative = 0
        for variant, weight in self.traffic_split.items():
            cumulative += weight * 100
            if hash_val < cumulative:
                return variant
        return "control"
    
    def predict(self, user_id: str, features):
        variant = self.route(user_id)
        model = self.models[variant]
        
        prediction = model.predict(features)
        
        # Log for analysis
        log_experiment(user_id, variant, prediction)
        
        return prediction