Deploying Machine Learning Models to Production

A practical guide to taking your ML models from Jupyter notebooks to production APIs.

D
Dery Febriantara Developer
Deploying Machine Learning Models to Production

Training a model is only half the battle. The real challenge begins when you need to deploy it to production where it can serve real users. This comprehensive guide covers everything from model serialization to cloud deployment, monitoring, and scaling.

Table of Contents

  1. Model Serialization
  2. Building REST APIs
  3. Containerization with Docker
  4. Model Versioning and Registry
  5. Inference Patterns
  6. Cloud Deployment
  7. Monitoring and Observability
  8. Scaling and Performance
  9. CI/CD for ML
  10. Security Best Practices

Model Serialization

Before deploying, you need to save your trained model in a format that can be loaded efficiently in production.

Scikit-learn Models

import joblib
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

# Train model
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100))
])
pipeline.fit(X_train, y_train)

# Method 1: Joblib (recommended for sklearn)
joblib.dump(pipeline, 'model.joblib')

# Method 2: Pickle
with open('model.pkl', 'wb') as f:
    pickle.dump(pipeline, f)

# Method 3: Compressed joblib
joblib.dump(pipeline, 'model.joblib.gz', compress=3)

# Loading
model = joblib.load('model.joblib')
predictions = model.predict(X_new)

PyTorch Models

import torch
import torch.nn as nn

class NeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

model = NeuralNetwork(10, 64, 2)
model.fit(X_train, y_train)  # Simplified

# Method 1: Save state dict only (recommended)
torch.save(model.state_dict(), 'model_state.pth')

# Load
model = NeuralNetwork(10, 64, 2)
model.load_state_dict(torch.load('model_state.pth'))
model.eval()

# Method 2: Save entire model
torch.save(model, 'model_full.pth')
model = torch.load('model_full.pth')

# Method 3: TorchScript for production (optimized)
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')

# Load scripted model
loaded_script = torch.jit.load('model_scripted.pt')
loaded_script.eval()

# Method 4: ONNX export for interoperability
import torch.onnx
dummy_input = torch.randn(1, 10)
torch.onnx.export(
    model,
    dummy_input,
    'model.onnx',
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}}
)

TensorFlow/Keras Models

from tensorflow import keras
import tensorflow as tf

model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(10,)),
    keras.layers.Dense(2, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(X_train, y_train, epochs=10)

# Method 1: HDF5 format
model.save('model.h5')
loaded_model = keras.models.load_model('model.h5')

# Method 2: SavedModel format (recommended for TF Serving)
model.save('saved_model/')
loaded_model = keras.models.load_model('saved_model/')

# Method 3: TensorFlow Lite for mobile/edge
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)

# Method 4: Weights only
model.save_weights('model_weights.h5')
new_model = create_model()  # Same architecture
new_model.load_weights('model_weights.h5')

ONNX for Framework Interoperability

import onnx
import onnxruntime as ort
import numpy as np

# Convert sklearn to ONNX
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType

initial_type = [('float_input', FloatTensorType([None, X_train.shape[1]]))]
onnx_model = convert_sklearn(pipeline, initial_types=initial_type)

with open('model.onnx', 'wb') as f:
    f.write(onnx_model.SerializeToString())

# Run inference with ONNX Runtime
session = ort.InferenceSession('model.onnx')
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

predictions = session.run(
    [output_name],
    {input_name: X_test.astype(np.float32)}
)[0]

Building REST APIs

FastAPI is modern, fast, and includes automatic OpenAPI documentation.

# app.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import List, Optional
import joblib
import numpy as np
import logging
from datetime import datetime
import asyncio

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI
app = FastAPI(
    title="ML Model API",
    description="Production API for machine learning predictions",
    version="1.0.0"
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Load model on startup
model = None

@app.on_event("startup")
async def load_model():
    global model
    model = joblib.load('model.joblib')
    logger.info("Model loaded successfully")

@app.on_event("shutdown")
async def shutdown():
    logger.info("Shutting down API")

# Request/Response schemas
class PredictionRequest(BaseModel):
    features: List[float] = Field(..., min_items=1, description="Input features")
    request_id: Optional[str] = None

    @validator('features')
    def validate_features(cls, v):
        if len(v) != 10:  # Expected feature count
            raise ValueError(f'Expected 10 features, got {len(v)}')
        if any(np.isnan(x) or np.isinf(x) for x in v):
            raise ValueError('Features contain NaN or Inf values')
        return v

    class Config:
        schema_extra = {
            "example": {
                "features": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
                "request_id": "req-123"
            }
        }

class PredictionResponse(BaseModel):
    prediction: int
    probability: float
    probabilities: List[float]
    model_version: str
    latency_ms: float
    request_id: Optional[str]

class BatchPredictionRequest(BaseModel):
    instances: List[List[float]]

class BatchPredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]
    count: int

class HealthResponse(BaseModel):
    status: str
    model_loaded: bool
    timestamp: str

# Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Check API health and model status"""
    return HealthResponse(
        status="healthy",
        model_loaded=model is not None,
        timestamp=datetime.utcnow().isoformat()
    )

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make a single prediction"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    start_time = datetime.now()

    try:
        features = np.array(request.features).reshape(1, -1)
        prediction = model.predict(features)[0]
        probabilities = model.predict_proba(features)[0].tolist()

        latency = (datetime.now() - start_time).total_seconds() * 1000

        response = PredictionResponse(
            prediction=int(prediction),
            probability=float(max(probabilities)),
            probabilities=probabilities,
            model_version="1.0.0",
            latency_ms=latency,
            request_id=request.request_id
        )

        # Log prediction
        logger.info(f"Prediction: {prediction}, Latency: {latency:.2f}ms")

        return response

    except Exception as e:
        logger.error(f"Prediction error: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
    """Make batch predictions"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    try:
        features = np.array(request.instances)
        predictions = model.predict(features).tolist()
        probabilities = model.predict_proba(features).tolist()

        return BatchPredictionResponse(
            predictions=predictions,
            probabilities=probabilities,
            count=len(predictions)
        )

    except Exception as e:
        logger.error(f"Batch prediction error: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/model/info")
async def model_info():
    """Get model metadata"""
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")

    return {
        "model_type": type(model).__name__,
        "n_features": model.n_features_in_ if hasattr(model, 'n_features_in_') else "unknown",
        "classes": model.classes_.tolist() if hasattr(model, 'classes_') else "unknown"
    }

# Run: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

Flask Alternative

from flask import Flask, request, jsonify
import joblib
import numpy as np

app = Flask(__name__)
model = joblib.load('model.joblib')

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'})

@app.route('/predict', methods=['POST'])
def predict():
    try:
        data = request.get_json()
        features = np.array(data['features']).reshape(1, -1)
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features).max()

        return jsonify({
            'prediction': int(prediction),
            'probability': float(probability)
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

gRPC for High Performance

# prediction.proto
syntax = "proto3";

service PredictionService {
    rpc Predict(PredictionRequest) returns (PredictionResponse);
    rpc PredictBatch(BatchRequest) returns (BatchResponse);
}

message PredictionRequest {
    repeated float features = 1;
}

message PredictionResponse {
    int32 prediction = 1;
    float probability = 2;
}

message BatchRequest {
    repeated PredictionRequest instances = 1;
}

message BatchResponse {
    repeated PredictionResponse predictions = 1;
}
# server.py
import grpc
from concurrent import futures
import prediction_pb2
import prediction_pb2_grpc
import joblib
import numpy as np

class PredictionServicer(prediction_pb2_grpc.PredictionServiceServicer):
    def __init__(self):
        self.model = joblib.load('model.joblib')

    def Predict(self, request, context):
        features = np.array(request.features).reshape(1, -1)
        prediction = self.model.predict(features)[0]
        probability = float(self.model.predict_proba(features).max())

        return prediction_pb2.PredictionResponse(
            prediction=int(prediction),
            probability=probability
        )

    def PredictBatch(self, request, context):
        instances = [np.array(inst.features) for inst in request.instances]
        features = np.array(instances)
        predictions = self.model.predict(features)
        probabilities = self.model.predict_proba(features).max(axis=1)

        responses = [
            prediction_pb2.PredictionResponse(prediction=int(p), probability=float(prob))
            for p, prob in zip(predictions, probabilities)
        ]

        return prediction_pb2.BatchResponse(predictions=responses)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    prediction_pb2_grpc.add_PredictionServiceServicer_to_server(
        PredictionServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

Containerization with Docker

Basic Dockerfile

# Dockerfile
FROM python:3.10-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements first for caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY app.py .
COPY model.joblib .

# Create non-root user
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run application
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

Optimized Multi-Stage Build

# Dockerfile.multistage
# Build stage
FROM python:3.10-slim as builder

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt

# Production stage
FROM python:3.10-slim

WORKDIR /app

# Copy only wheels from builder
COPY --from=builder /app/wheels /wheels
RUN pip install --no-cache /wheels/*

# Copy application
COPY app.py .
COPY model.joblib .

# Security
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Docker Compose for Full Stack

# docker-compose.yml
version: '3.8'

services:
  ml-api:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models
    environment:
      - MODEL_PATH=/app/models/model.joblib
      - LOG_LEVEL=INFO
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 512M

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  redis_data:
  grafana_data:

Requirements File

# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
joblib==1.3.2
numpy==1.26.2
scikit-learn==1.3.2
pydantic==2.5.2
python-multipart==0.0.6
prometheus-client==0.19.0

Model Versioning and Registry

MLflow

import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score

# Set tracking URI
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("model-training")

# Start run
with mlflow.start_run(run_name="random-forest-v1"):
    # Log parameters
    params = {
        "n_estimators": 100,
        "max_depth": 10,
        "min_samples_split": 2,
        "random_state": 42
    }
    mlflow.log_params(params)

    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)

    # Evaluate
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')

    # Log metrics
    mlflow.log_metrics({
        "accuracy": accuracy,
        "f1_score": f1
    })

    # Log model
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="production-classifier",
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )

    # Log artifacts
    mlflow.log_artifact("requirements.txt")
    mlflow.log_artifact("config.yaml")

# Load model from registry
model_uri = "models:/production-classifier/Production"
loaded_model = mlflow.sklearn.load_model(model_uri)

# Serve model
# mlflow models serve -m "models:/production-classifier/1" -p 5001

DVC (Data Version Control)

# Initialize DVC
dvc init

# Track model file
dvc add model.joblib

# Commit to git
git add model.joblib.dvc .gitignore
git commit -m "Add model v1"

# Push to remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push

# Pull specific version
git checkout v1.0.0
dvc pull
# dvc.yaml
stages:
  train:
    cmd: python train.py
    deps:
      - train.py
      - data/train.csv
    params:
      - train.n_estimators
      - train.max_depth
    outs:
      - model.joblib
    metrics:
      - metrics.json:
          cache: false

  evaluate:
    cmd: python evaluate.py
    deps:
      - evaluate.py
      - model.joblib
      - data/test.csv
    metrics:
      - evaluation.json:
          cache: false

Weights & Biases

import wandb
from sklearn.ensemble import RandomForestClassifier

# Initialize
wandb.init(project="ml-deployment", name="training-run-1")

# Config
config = wandb.config
config.n_estimators = 100
config.max_depth = 10

# Train
model = RandomForestClassifier(
    n_estimators=config.n_estimators,
    max_depth=config.max_depth
)
model.fit(X_train, y_train)

# Log metrics
accuracy = model.score(X_test, y_test)
wandb.log({"accuracy": accuracy})

# Save model as artifact
artifact = wandb.Artifact('model', type='model')
joblib.dump(model, 'model.joblib')
artifact.add_file('model.joblib')
wandb.log_artifact(artifact)

wandb.finish()

Inference Patterns

Real-time (Online) Inference

from fastapi import FastAPI
import asyncio
from cachetools import TTLCache
import hashlib

app = FastAPI()
model = load_model()

# Cache for repeated predictions
prediction_cache = TTLCache(maxsize=1000, ttl=300)

def get_cache_key(features):
    return hashlib.md5(str(features).encode()).hexdigest()

@app.post("/predict")
async def predict(request: PredictionRequest):
    # Check cache
    cache_key = get_cache_key(request.features)
    if cache_key in prediction_cache:
        return prediction_cache[cache_key]

    # Make prediction
    prediction = model.predict([request.features])[0]

    response = {"prediction": int(prediction)}

    # Cache result
    prediction_cache[cache_key] = response

    return response

Batch Inference

import pandas as pd
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class BatchPredictor:
    def __init__(self, model_path, batch_size=1000):
        self.model = joblib.load(model_path)
        self.batch_size = batch_size

    def predict_file(self, input_path, output_path):
        """Process file in batches"""
        logger.info(f"Starting batch prediction: {input_path}")
        start_time = datetime.now()

        # Read in chunks
        chunks = pd.read_csv(input_path, chunksize=self.batch_size)
        results = []

        for i, chunk in enumerate(chunks):
            predictions = self.model.predict(chunk)
            probabilities = self.model.predict_proba(chunk).max(axis=1)

            chunk['prediction'] = predictions
            chunk['probability'] = probabilities
            results.append(chunk)

            logger.info(f"Processed batch {i+1}")

        # Combine and save
        output_df = pd.concat(results)
        output_df.to_csv(output_path, index=False)

        duration = (datetime.now() - start_time).total_seconds()
        logger.info(f"Completed in {duration:.2f}s, {len(output_df)} rows")

        return output_path

# Schedule with Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'batch_predictions',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
)

def run_batch_prediction():
    predictor = BatchPredictor('model.joblib')
    predictor.predict_file(
        '/data/input/daily_data.csv',
        '/data/output/predictions.csv'
    )

batch_task = PythonOperator(
    task_id='run_predictions',
    python_callable=run_batch_prediction,
    dag=dag,
)

Streaming Inference

from kafka import KafkaConsumer, KafkaProducer
import json
import threading
import queue

class StreamingPredictor:
    def __init__(self, model_path, input_topic, output_topic,
                 bootstrap_servers='localhost:9092'):
        self.model = joblib.load(model_path)
        self.input_topic = input_topic
        self.output_topic = output_topic

        self.consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='latest',
            group_id='prediction-service'
        )

        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def process_messages(self):
        """Process messages from Kafka"""
        for message in self.consumer:
            try:
                data = message.value
                features = data['features']

                prediction = self.model.predict([features])[0]
                probability = float(self.model.predict_proba([features]).max())

                result = {
                    'id': data.get('id'),
                    'prediction': int(prediction),
                    'probability': probability,
                    'timestamp': datetime.utcnow().isoformat()
                }

                self.producer.send(self.output_topic, value=result)

            except Exception as e:
                logger.error(f"Error processing message: {e}")

    def run(self, num_workers=4):
        """Run with multiple workers"""
        threads = []
        for _ in range(num_workers):
            t = threading.Thread(target=self.process_messages)
            t.start()
            threads.append(t)

        for t in threads:
            t.join()

# Run
predictor = StreamingPredictor(
    'model.joblib',
    'prediction-requests',
    'prediction-results'
)
predictor.run()

Asynchronous Inference with Celery

# tasks.py
from celery import Celery
import joblib

app = Celery('tasks', broker='redis://localhost:6379/0')

model = None

@app.task
def predict_async(features, request_id):
    global model
    if model is None:
        model = joblib.load('model.joblib')

    prediction = model.predict([features])[0]
    probability = float(model.predict_proba([features]).max())

    return {
        'request_id': request_id,
        'prediction': int(prediction),
        'probability': probability
    }

# API endpoint
@app.post("/predict/async")
async def predict_async_endpoint(request: PredictionRequest):
    task = predict_async.delay(request.features, request.request_id)
    return {"task_id": task.id, "status": "processing"}

@app.get("/predict/result/{task_id}")
async def get_result(task_id: str):
    task = predict_async.AsyncResult(task_id)
    if task.ready():
        return {"status": "completed", "result": task.result}
    return {"status": "processing"}

Cloud Deployment

AWS SageMaker

import boto3
import sagemaker
from sagemaker.sklearn import SKLearnModel
from sagemaker.serverless import ServerlessInferenceConfig

# Create SageMaker session
session = sagemaker.Session()
role = sagemaker.get_execution_role()

# Package model
import tarfile
with tarfile.open('model.tar.gz', 'w:gz') as tar:
    tar.add('model.joblib')

# Upload to S3
s3_model_path = session.upload_data('model.tar.gz', key_prefix='models')

# Create model
sklearn_model = SKLearnModel(
    model_data=s3_model_path,
    role=role,
    entry_point='inference.py',
    framework_version='1.0-1',
    py_version='py3'
)

# Deploy to endpoint
predictor = sklearn_model.deploy(
    instance_type='ml.m5.large',
    initial_instance_count=1,
    endpoint_name='my-model-endpoint'
)

# Or use serverless inference
serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=2048,
    max_concurrency=10
)

predictor = sklearn_model.deploy(
    serverless_inference_config=serverless_config,
    endpoint_name='my-serverless-endpoint'
)

# Make predictions
result = predictor.predict([[1.0, 2.0, 3.0, 4.0, 5.0]])
print(result)

# Clean up
predictor.delete_endpoint()
# inference.py (SageMaker entry point)
import joblib
import os

def model_fn(model_dir):
    """Load model"""
    model_path = os.path.join(model_dir, 'model.joblib')
    return joblib.load(model_path)

def input_fn(request_body, request_content_type):
    """Parse input"""
    import json
    import numpy as np

    if request_content_type == 'application/json':
        data = json.loads(request_body)
        return np.array(data['instances'])
    raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model):
    """Make predictions"""
    return model.predict(input_data)

def output_fn(prediction, accept):
    """Format output"""
    import json
    return json.dumps({'predictions': prediction.tolist()})

Google Cloud AI Platform (Vertex AI)

from google.cloud import aiplatform

# Initialize
aiplatform.init(project='my-project', location='us-central1')

# Upload model
model = aiplatform.Model.upload(
    display_name='my-sklearn-model',
    artifact_uri='gs://my-bucket/model/',
    serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest'
)

# Deploy to endpoint
endpoint = model.deploy(
    machine_type='n1-standard-2',
    min_replica_count=1,
    max_replica_count=3,
    accelerator_type=None
)

# Make predictions
predictions = endpoint.predict(instances=[[1.0, 2.0, 3.0, 4.0, 5.0]])
print(predictions)

# Clean up
endpoint.undeploy_all()
endpoint.delete()

Azure Machine Learning

from azureml.core import Workspace, Model, Environment
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig

# Connect to workspace
ws = Workspace.from_config()

# Register model
model = Model.register(
    workspace=ws,
    model_path='model.joblib',
    model_name='my-model',
    tags={'framework': 'sklearn', 'version': '1.0'}
)

# Create environment
env = Environment.from_pip_requirements(
    name='sklearn-env',
    file_path='requirements.txt'
)

# Inference config
inference_config = InferenceConfig(
    entry_script='score.py',
    environment=env
)

# Deployment config
deployment_config = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=1,
    auth_enabled=True
)

# Deploy
service = Model.deploy(
    workspace=ws,
    name='my-model-service',
    models=[model],
    inference_config=inference_config,
    deployment_config=deployment_config
)

service.wait_for_deployment(show_output=True)
print(service.scoring_uri)

Kubernetes with KServe

# inferenceservice.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: sklearn-model
  namespace: ml-serving
spec:
  predictor:
    minReplicas: 1
    maxReplicas: 5
    sklearn:
      storageUri: "gs://my-bucket/models/sklearn"
      resources:
        requests:
          cpu: 100m
          memory: 256Mi
        limits:
          cpu: 1
          memory: 1Gi
# Deploy
kubectl apply -f inferenceservice.yaml

# Check status
kubectl get inferenceservice sklearn-model

# Test
curl -v -H "Content-Type: application/json" \
  http://sklearn-model.ml-serving.example.com/v1/models/sklearn-model:predict \
  -d '{"instances": [[1.0, 2.0, 3.0, 4.0, 5.0]]}'

Monitoring and Observability

Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Define metrics
PREDICTION_COUNTER = Counter(
    'ml_predictions_total',
    'Total number of predictions',
    ['model_version', 'prediction_class']
)

PREDICTION_LATENCY = Histogram(
    'ml_prediction_latency_seconds',
    'Prediction latency in seconds',
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)

MODEL_LOAD_TIME = Gauge(
    'ml_model_load_time_seconds',
    'Time to load model'
)

PREDICTION_PROBABILITY = Histogram(
    'ml_prediction_probability',
    'Prediction probability distribution',
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)

# Start metrics server
start_http_server(9090)

# Use in prediction endpoint
@app.post("/predict")
async def predict(request: PredictionRequest):
    with PREDICTION_LATENCY.time():
        prediction = model.predict([request.features])[0]
        probability = model.predict_proba([request.features]).max()

    PREDICTION_COUNTER.labels(
        model_version='1.0.0',
        prediction_class=str(prediction)
    ).inc()

    PREDICTION_PROBABILITY.observe(probability)

    return {"prediction": int(prediction)}

Data Drift Detection

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import *
import pandas as pd

class DriftMonitor:
    def __init__(self, reference_data):
        self.reference_data = reference_data

    def check_drift(self, current_data, threshold=0.1):
        """Check for data drift"""
        report = Report(metrics=[
            DataDriftPreset(),
            TargetDriftPreset()
        ])

        report.run(
            reference_data=self.reference_data,
            current_data=current_data
        )

        # Extract drift score
        drift_results = report.as_dict()
        drift_detected = drift_results['metrics'][0]['result']['dataset_drift']

        return {
            'drift_detected': drift_detected,
            'drift_score': drift_results['metrics'][0]['result']['drift_share'],
            'drifted_features': [
                col for col, data in drift_results['metrics'][0]['result']['drift_by_columns'].items()
                if data['drift_detected']
            ]
        }

    def generate_report(self, current_data, output_path):
        """Generate HTML drift report"""
        report = Report(metrics=[
            DataDriftPreset(),
            DataQualityPreset(),
            TargetDriftPreset()
        ])

        report.run(
            reference_data=self.reference_data,
            current_data=current_data
        )

        report.save_html(output_path)

# Usage
monitor = DriftMonitor(training_data)
drift_info = monitor.check_drift(production_data)

if drift_info['drift_detected']:
    logger.warning(f"Data drift detected! Drifted features: {drift_info['drifted_features']}")
    # Trigger retraining pipeline or alert

Logging with Structured Logs

import structlog
import json
from datetime import datetime

# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

@app.post("/predict")
async def predict(request: PredictionRequest):
    request_id = str(uuid.uuid4())
    start_time = time.time()

    logger.info(
        "prediction_request",
        request_id=request_id,
        features_count=len(request.features)
    )

    try:
        prediction = model.predict([request.features])[0]
        probability = float(model.predict_proba([request.features]).max())
        latency = time.time() - start_time

        logger.info(
            "prediction_complete",
            request_id=request_id,
            prediction=int(prediction),
            probability=probability,
            latency_ms=latency * 1000
        )

        return {"prediction": int(prediction), "request_id": request_id}

    except Exception as e:
        logger.error(
            "prediction_error",
            request_id=request_id,
            error=str(e)
        )
        raise

Scaling and Performance

Load Balancing with Nginx

# nginx.conf
upstream ml_api {
    least_conn;
    server ml-api-1:8000;
    server ml-api-2:8000;
    server ml-api-3:8000;
}

server {
    listen 80;

    location / {
        proxy_pass http://ml_api;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_connect_timeout 60s;
        proxy_read_timeout 60s;
    }

    location /health {
        proxy_pass http://ml_api;
        proxy_connect_timeout 5s;
        proxy_read_timeout 5s;
    }
}

Model Optimization

# Quantization for faster inference
import torch

# Dynamic quantization (CPU)
quantized_model = torch.quantization.quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8
)

# Static quantization
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrate with representative data
for batch in calibration_data:
    model(batch)
torch.quantization.convert(model, inplace=True)

# ONNX Runtime optimization
import onnxruntime as ort

session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session_options.intra_op_num_threads = 4

session = ort.InferenceSession(
    'model.onnx',
    sess_options=session_options,
    providers=['CPUExecutionProvider']
)

Batching for Throughput

import asyncio
from collections import defaultdict
import time

class BatchingPredictor:
    def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.pending_requests = []
        self.lock = asyncio.Lock()

    async def predict(self, features):
        """Add request to batch and wait for result"""
        future = asyncio.Future()

        async with self.lock:
            self.pending_requests.append({
                'features': features,
                'future': future
            })

            if len(self.pending_requests) >= self.max_batch_size:
                await self._process_batch()

        # Wait for result with timeout
        try:
            return await asyncio.wait_for(future, timeout=self.max_wait_time * 2)
        except asyncio.TimeoutError:
            await self._process_batch()
            return await future

    async def _process_batch(self):
        """Process accumulated requests as a batch"""
        async with self.lock:
            if not self.pending_requests:
                return

            batch = self.pending_requests[:self.max_batch_size]
            self.pending_requests = self.pending_requests[self.max_batch_size:]

        # Batch inference
        features = [r['features'] for r in batch]
        predictions = self.model.predict(features)

        # Set results
        for request, prediction in zip(batch, predictions):
            request['future'].set_result(int(prediction))

# Usage in FastAPI
batcher = BatchingPredictor(model)

@app.post("/predict")
async def predict(request: PredictionRequest):
    prediction = await batcher.predict(request.features)
    return {"prediction": prediction}

CI/CD for ML

GitHub Actions Pipeline

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run tests
        run: pytest tests/ --cov=src --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Build Docker image
        run: docker build -t ml-api:${{ github.sha }} .

      - name: Run container tests
        run: |
          docker run -d -p 8000:8000 ml-api:${{ github.sha }}
          sleep 10
          curl -f http://localhost:8000/health

      - name: Push to registry
        if: github.ref == 'refs/heads/main'
        run: |
          docker tag ml-api:${{ github.sha }} ${{ secrets.REGISTRY }}/ml-api:latest
          docker push ${{ secrets.REGISTRY }}/ml-api:latest

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Deploy to Kubernetes
        run: |
          kubectl set image deployment/ml-api \
            ml-api=${{ secrets.REGISTRY }}/ml-api:latest

Model Testing

# tests/test_model.py
import pytest
import numpy as np
import joblib

@pytest.fixture
def model():
    return joblib.load('model.joblib')

@pytest.fixture
def sample_input():
    return np.array([[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]])

def test_model_loads(model):
    """Test model can be loaded"""
    assert model is not None

def test_model_predicts(model, sample_input):
    """Test model can make predictions"""
    prediction = model.predict(sample_input)
    assert prediction is not None
    assert len(prediction) == 1

def test_prediction_shape(model, sample_input):
    """Test prediction has correct shape"""
    prediction = model.predict(sample_input)
    assert prediction.shape == (1,)

def test_probability_sum(model, sample_input):
    """Test probabilities sum to 1"""
    proba = model.predict_proba(sample_input)
    assert np.isclose(proba.sum(), 1.0, atol=1e-6)

def test_prediction_deterministic(model, sample_input):
    """Test predictions are deterministic"""
    pred1 = model.predict(sample_input)
    pred2 = model.predict(sample_input)
    assert np.array_equal(pred1, pred2)

def test_invalid_input_handling(model):
    """Test model handles invalid input"""
    with pytest.raises(Exception):
        model.predict(np.array([['invalid']]))

# tests/test_api.py
from fastapi.testclient import TestClient
from app import app

client = TestClient(app)

def test_health_endpoint():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_predict_endpoint():
    response = client.post("/predict", json={
        "features": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
    })
    assert response.status_code == 200
    assert "prediction" in response.json()

def test_predict_invalid_features():
    response = client.post("/predict", json={
        "features": [1.0, 2.0]  # Wrong number of features
    })
    assert response.status_code == 422

Security Best Practices

API Authentication

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta

app = FastAPI()
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

def create_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/predict")
async def predict(request: PredictionRequest, token: dict = Depends(verify_token)):
    # Token is valid, proceed with prediction
    prediction = model.predict([request.features])[0]
    return {"prediction": int(prediction)}

Input Validation

from pydantic import BaseModel, validator, Field
import numpy as np

class PredictionRequest(BaseModel):
    features: list[float] = Field(..., min_items=10, max_items=10)

    @validator('features')
    def validate_features(cls, v):
        # Check for NaN/Inf
        if any(np.isnan(x) or np.isinf(x) for x in v):
            raise ValueError('Features contain NaN or Inf values')

        # Check value ranges
        if any(x < -1000 or x > 1000 for x in v):
            raise ValueError('Feature values out of expected range')

        return v

    class Config:
        # Prevent extra fields
        extra = 'forbid'

Rate Limiting

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/predict")
@limiter.limit("100/minute")
async def predict(request: Request, data: PredictionRequest):
    prediction = model.predict([data.features])[0]
    return {"prediction": int(prediction)}

Deployment Checklist

Pre-Deployment

  • Model serialized and versioned
  • Unit tests passing
  • Integration tests passing
  • Performance benchmarks met
  • Security review completed
  • Documentation updated

Infrastructure

  • Container built and tested
  • Health check endpoint working
  • Logging configured
  • Metrics collection setup
  • Alerting configured
  • Secrets management in place

Monitoring

  • Prediction latency tracking
  • Error rate monitoring
  • Data drift detection
  • Model performance tracking
  • Resource utilization monitoring

Operations

  • Rollback procedure documented
  • Scaling policies configured
  • Backup strategy defined
  • Incident response plan
  • On-call rotation setup

Summary

Deploying ML models to production requires careful attention to:

  1. Serialization: Choose the right format for your use case
  2. APIs: Build robust, well-documented endpoints
  3. Containerization: Use Docker for reproducibility
  4. Versioning: Track models with MLflow or similar tools
  5. Monitoring: Watch for drift and performance degradation
  6. Security: Authenticate, validate, and rate limit
  7. Scaling: Design for your throughput requirements

Start simple, iterate, and add complexity only as needed. The best deployment is one that reliably serves users while being maintainable by your team.

Further Reading

  • “Machine Learning Engineering” - Andriy Burkov
  • “Building Machine Learning Powered Applications” - Emmanuel Ameisen
  • MLOps Community - mlops.community
  • Kubeflow Documentation - kubeflow.org
  • MLflow Documentation - mlflow.org