Deploying Machine Learning Models to Production
A practical guide to taking your ML models from Jupyter notebooks to production APIs.
Training a model is only half the battle. The real challenge begins when you need to deploy it to production where it can serve real users. This comprehensive guide covers everything from model serialization to cloud deployment, monitoring, and scaling.
Table of Contents
- Model Serialization
- Building REST APIs
- Containerization with Docker
- Model Versioning and Registry
- Inference Patterns
- Cloud Deployment
- Monitoring and Observability
- Scaling and Performance
- CI/CD for ML
- Security Best Practices
Model Serialization
Before deploying, you need to save your trained model in a format that can be loaded efficiently in production.
Scikit-learn Models
import joblib
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
# Train model
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100))
])
pipeline.fit(X_train, y_train)
# Method 1: Joblib (recommended for sklearn)
joblib.dump(pipeline, 'model.joblib')
# Method 2: Pickle
with open('model.pkl', 'wb') as f:
pickle.dump(pipeline, f)
# Method 3: Compressed joblib
joblib.dump(pipeline, 'model.joblib.gz', compress=3)
# Loading
model = joblib.load('model.joblib')
predictions = model.predict(X_new)
PyTorch Models
import torch
import torch.nn as nn
class NeuralNetwork(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
model = NeuralNetwork(10, 64, 2)
model.fit(X_train, y_train) # Simplified
# Method 1: Save state dict only (recommended)
torch.save(model.state_dict(), 'model_state.pth')
# Load
model = NeuralNetwork(10, 64, 2)
model.load_state_dict(torch.load('model_state.pth'))
model.eval()
# Method 2: Save entire model
torch.save(model, 'model_full.pth')
model = torch.load('model_full.pth')
# Method 3: TorchScript for production (optimized)
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')
# Load scripted model
loaded_script = torch.jit.load('model_scripted.pt')
loaded_script.eval()
# Method 4: ONNX export for interoperability
import torch.onnx
dummy_input = torch.randn(1, 10)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}}
)
TensorFlow/Keras Models
from tensorflow import keras
import tensorflow as tf
model = keras.Sequential([
keras.layers.Dense(64, activation='relu', input_shape=(10,)),
keras.layers.Dense(2, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(X_train, y_train, epochs=10)
# Method 1: HDF5 format
model.save('model.h5')
loaded_model = keras.models.load_model('model.h5')
# Method 2: SavedModel format (recommended for TF Serving)
model.save('saved_model/')
loaded_model = keras.models.load_model('saved_model/')
# Method 3: TensorFlow Lite for mobile/edge
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# Method 4: Weights only
model.save_weights('model_weights.h5')
new_model = create_model() # Same architecture
new_model.load_weights('model_weights.h5')
ONNX for Framework Interoperability
import onnx
import onnxruntime as ort
import numpy as np
# Convert sklearn to ONNX
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
initial_type = [('float_input', FloatTensorType([None, X_train.shape[1]]))]
onnx_model = convert_sklearn(pipeline, initial_types=initial_type)
with open('model.onnx', 'wb') as f:
f.write(onnx_model.SerializeToString())
# Run inference with ONNX Runtime
session = ort.InferenceSession('model.onnx')
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
predictions = session.run(
[output_name],
{input_name: X_test.astype(np.float32)}
)[0]
Building REST APIs
FastAPI (Recommended)
FastAPI is modern, fast, and includes automatic OpenAPI documentation.
# app.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import List, Optional
import joblib
import numpy as np
import logging
from datetime import datetime
import asyncio
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI
app = FastAPI(
title="ML Model API",
description="Production API for machine learning predictions",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load model on startup
model = None
@app.on_event("startup")
async def load_model():
global model
model = joblib.load('model.joblib')
logger.info("Model loaded successfully")
@app.on_event("shutdown")
async def shutdown():
logger.info("Shutting down API")
# Request/Response schemas
class PredictionRequest(BaseModel):
features: List[float] = Field(..., min_items=1, description="Input features")
request_id: Optional[str] = None
@validator('features')
def validate_features(cls, v):
if len(v) != 10: # Expected feature count
raise ValueError(f'Expected 10 features, got {len(v)}')
if any(np.isnan(x) or np.isinf(x) for x in v):
raise ValueError('Features contain NaN or Inf values')
return v
class Config:
schema_extra = {
"example": {
"features": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0],
"request_id": "req-123"
}
}
class PredictionResponse(BaseModel):
prediction: int
probability: float
probabilities: List[float]
model_version: str
latency_ms: float
request_id: Optional[str]
class BatchPredictionRequest(BaseModel):
instances: List[List[float]]
class BatchPredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
count: int
class HealthResponse(BaseModel):
status: str
model_loaded: bool
timestamp: str
# Endpoints
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Check API health and model status"""
return HealthResponse(
status="healthy",
model_loaded=model is not None,
timestamp=datetime.utcnow().isoformat()
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make a single prediction"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start_time = datetime.now()
try:
features = np.array(request.features).reshape(1, -1)
prediction = model.predict(features)[0]
probabilities = model.predict_proba(features)[0].tolist()
latency = (datetime.now() - start_time).total_seconds() * 1000
response = PredictionResponse(
prediction=int(prediction),
probability=float(max(probabilities)),
probabilities=probabilities,
model_version="1.0.0",
latency_ms=latency,
request_id=request.request_id
)
# Log prediction
logger.info(f"Prediction: {prediction}, Latency: {latency:.2f}ms")
return response
except Exception as e:
logger.error(f"Prediction error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Make batch predictions"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
try:
features = np.array(request.instances)
predictions = model.predict(features).tolist()
probabilities = model.predict_proba(features).tolist()
return BatchPredictionResponse(
predictions=predictions,
probabilities=probabilities,
count=len(predictions)
)
except Exception as e:
logger.error(f"Batch prediction error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/model/info")
async def model_info():
"""Get model metadata"""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
return {
"model_type": type(model).__name__,
"n_features": model.n_features_in_ if hasattr(model, 'n_features_in_') else "unknown",
"classes": model.classes_.tolist() if hasattr(model, 'classes_') else "unknown"
}
# Run: uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
Flask Alternative
from flask import Flask, request, jsonify
import joblib
import numpy as np
app = Flask(__name__)
model = joblib.load('model.joblib')
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
features = np.array(data['features']).reshape(1, -1)
prediction = model.predict(features)[0]
probability = model.predict_proba(features).max()
return jsonify({
'prediction': int(prediction),
'probability': float(probability)
})
except Exception as e:
return jsonify({'error': str(e)}), 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
gRPC for High Performance
# prediction.proto
syntax = "proto3";
service PredictionService {
rpc Predict(PredictionRequest) returns (PredictionResponse);
rpc PredictBatch(BatchRequest) returns (BatchResponse);
}
message PredictionRequest {
repeated float features = 1;
}
message PredictionResponse {
int32 prediction = 1;
float probability = 2;
}
message BatchRequest {
repeated PredictionRequest instances = 1;
}
message BatchResponse {
repeated PredictionResponse predictions = 1;
}
# server.py
import grpc
from concurrent import futures
import prediction_pb2
import prediction_pb2_grpc
import joblib
import numpy as np
class PredictionServicer(prediction_pb2_grpc.PredictionServiceServicer):
def __init__(self):
self.model = joblib.load('model.joblib')
def Predict(self, request, context):
features = np.array(request.features).reshape(1, -1)
prediction = self.model.predict(features)[0]
probability = float(self.model.predict_proba(features).max())
return prediction_pb2.PredictionResponse(
prediction=int(prediction),
probability=probability
)
def PredictBatch(self, request, context):
instances = [np.array(inst.features) for inst in request.instances]
features = np.array(instances)
predictions = self.model.predict(features)
probabilities = self.model.predict_proba(features).max(axis=1)
responses = [
prediction_pb2.PredictionResponse(prediction=int(p), probability=float(prob))
for p, prob in zip(predictions, probabilities)
]
return prediction_pb2.BatchResponse(predictions=responses)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
prediction_pb2_grpc.add_PredictionServiceServicer_to_server(
PredictionServicer(), server
)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
Containerization with Docker
Basic Dockerfile
# Dockerfile
FROM python:3.10-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app.py .
COPY model.joblib .
# Create non-root user
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
Optimized Multi-Stage Build
# Dockerfile.multistage
# Build stage
FROM python:3.10-slim as builder
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
# Production stage
FROM python:3.10-slim
WORKDIR /app
# Copy only wheels from builder
COPY --from=builder /app/wheels /wheels
RUN pip install --no-cache /wheels/*
# Copy application
COPY app.py .
COPY model.joblib .
# Security
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Docker Compose for Full Stack
# docker-compose.yml
version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
environment:
- MODEL_PATH=/app/models/model.joblib
- LOG_LEVEL=INFO
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
cpus: '2'
memory: 2G
reservations:
cpus: '0.5'
memory: 512M
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
volumes:
redis_data:
grafana_data:
Requirements File
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
joblib==1.3.2
numpy==1.26.2
scikit-learn==1.3.2
pydantic==2.5.2
python-multipart==0.0.6
prometheus-client==0.19.0
Model Versioning and Registry
MLflow
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
# Set tracking URI
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("model-training")
# Start run
with mlflow.start_run(run_name="random-forest-v1"):
# Log parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 2,
"random_state": 42
}
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
# Log metrics
mlflow.log_metrics({
"accuracy": accuracy,
"f1_score": f1
})
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="production-classifier",
signature=mlflow.models.infer_signature(X_train, y_pred)
)
# Log artifacts
mlflow.log_artifact("requirements.txt")
mlflow.log_artifact("config.yaml")
# Load model from registry
model_uri = "models:/production-classifier/Production"
loaded_model = mlflow.sklearn.load_model(model_uri)
# Serve model
# mlflow models serve -m "models:/production-classifier/1" -p 5001
DVC (Data Version Control)
# Initialize DVC
dvc init
# Track model file
dvc add model.joblib
# Commit to git
git add model.joblib.dvc .gitignore
git commit -m "Add model v1"
# Push to remote storage
dvc remote add -d myremote s3://my-bucket/dvc-storage
dvc push
# Pull specific version
git checkout v1.0.0
dvc pull
# dvc.yaml
stages:
train:
cmd: python train.py
deps:
- train.py
- data/train.csv
params:
- train.n_estimators
- train.max_depth
outs:
- model.joblib
metrics:
- metrics.json:
cache: false
evaluate:
cmd: python evaluate.py
deps:
- evaluate.py
- model.joblib
- data/test.csv
metrics:
- evaluation.json:
cache: false
Weights & Biases
import wandb
from sklearn.ensemble import RandomForestClassifier
# Initialize
wandb.init(project="ml-deployment", name="training-run-1")
# Config
config = wandb.config
config.n_estimators = 100
config.max_depth = 10
# Train
model = RandomForestClassifier(
n_estimators=config.n_estimators,
max_depth=config.max_depth
)
model.fit(X_train, y_train)
# Log metrics
accuracy = model.score(X_test, y_test)
wandb.log({"accuracy": accuracy})
# Save model as artifact
artifact = wandb.Artifact('model', type='model')
joblib.dump(model, 'model.joblib')
artifact.add_file('model.joblib')
wandb.log_artifact(artifact)
wandb.finish()
Inference Patterns
Real-time (Online) Inference
from fastapi import FastAPI
import asyncio
from cachetools import TTLCache
import hashlib
app = FastAPI()
model = load_model()
# Cache for repeated predictions
prediction_cache = TTLCache(maxsize=1000, ttl=300)
def get_cache_key(features):
return hashlib.md5(str(features).encode()).hexdigest()
@app.post("/predict")
async def predict(request: PredictionRequest):
# Check cache
cache_key = get_cache_key(request.features)
if cache_key in prediction_cache:
return prediction_cache[cache_key]
# Make prediction
prediction = model.predict([request.features])[0]
response = {"prediction": int(prediction)}
# Cache result
prediction_cache[cache_key] = response
return response
Batch Inference
import pandas as pd
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class BatchPredictor:
def __init__(self, model_path, batch_size=1000):
self.model = joblib.load(model_path)
self.batch_size = batch_size
def predict_file(self, input_path, output_path):
"""Process file in batches"""
logger.info(f"Starting batch prediction: {input_path}")
start_time = datetime.now()
# Read in chunks
chunks = pd.read_csv(input_path, chunksize=self.batch_size)
results = []
for i, chunk in enumerate(chunks):
predictions = self.model.predict(chunk)
probabilities = self.model.predict_proba(chunk).max(axis=1)
chunk['prediction'] = predictions
chunk['probability'] = probabilities
results.append(chunk)
logger.info(f"Processed batch {i+1}")
# Combine and save
output_df = pd.concat(results)
output_df.to_csv(output_path, index=False)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Completed in {duration:.2f}s, {len(output_df)} rows")
return output_path
# Schedule with Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'batch_predictions',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
)
def run_batch_prediction():
predictor = BatchPredictor('model.joblib')
predictor.predict_file(
'/data/input/daily_data.csv',
'/data/output/predictions.csv'
)
batch_task = PythonOperator(
task_id='run_predictions',
python_callable=run_batch_prediction,
dag=dag,
)
Streaming Inference
from kafka import KafkaConsumer, KafkaProducer
import json
import threading
import queue
class StreamingPredictor:
def __init__(self, model_path, input_topic, output_topic,
bootstrap_servers='localhost:9092'):
self.model = joblib.load(model_path)
self.input_topic = input_topic
self.output_topic = output_topic
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest',
group_id='prediction-service'
)
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def process_messages(self):
"""Process messages from Kafka"""
for message in self.consumer:
try:
data = message.value
features = data['features']
prediction = self.model.predict([features])[0]
probability = float(self.model.predict_proba([features]).max())
result = {
'id': data.get('id'),
'prediction': int(prediction),
'probability': probability,
'timestamp': datetime.utcnow().isoformat()
}
self.producer.send(self.output_topic, value=result)
except Exception as e:
logger.error(f"Error processing message: {e}")
def run(self, num_workers=4):
"""Run with multiple workers"""
threads = []
for _ in range(num_workers):
t = threading.Thread(target=self.process_messages)
t.start()
threads.append(t)
for t in threads:
t.join()
# Run
predictor = StreamingPredictor(
'model.joblib',
'prediction-requests',
'prediction-results'
)
predictor.run()
Asynchronous Inference with Celery
# tasks.py
from celery import Celery
import joblib
app = Celery('tasks', broker='redis://localhost:6379/0')
model = None
@app.task
def predict_async(features, request_id):
global model
if model is None:
model = joblib.load('model.joblib')
prediction = model.predict([features])[0]
probability = float(model.predict_proba([features]).max())
return {
'request_id': request_id,
'prediction': int(prediction),
'probability': probability
}
# API endpoint
@app.post("/predict/async")
async def predict_async_endpoint(request: PredictionRequest):
task = predict_async.delay(request.features, request.request_id)
return {"task_id": task.id, "status": "processing"}
@app.get("/predict/result/{task_id}")
async def get_result(task_id: str):
task = predict_async.AsyncResult(task_id)
if task.ready():
return {"status": "completed", "result": task.result}
return {"status": "processing"}
Cloud Deployment
AWS SageMaker
import boto3
import sagemaker
from sagemaker.sklearn import SKLearnModel
from sagemaker.serverless import ServerlessInferenceConfig
# Create SageMaker session
session = sagemaker.Session()
role = sagemaker.get_execution_role()
# Package model
import tarfile
with tarfile.open('model.tar.gz', 'w:gz') as tar:
tar.add('model.joblib')
# Upload to S3
s3_model_path = session.upload_data('model.tar.gz', key_prefix='models')
# Create model
sklearn_model = SKLearnModel(
model_data=s3_model_path,
role=role,
entry_point='inference.py',
framework_version='1.0-1',
py_version='py3'
)
# Deploy to endpoint
predictor = sklearn_model.deploy(
instance_type='ml.m5.large',
initial_instance_count=1,
endpoint_name='my-model-endpoint'
)
# Or use serverless inference
serverless_config = ServerlessInferenceConfig(
memory_size_in_mb=2048,
max_concurrency=10
)
predictor = sklearn_model.deploy(
serverless_inference_config=serverless_config,
endpoint_name='my-serverless-endpoint'
)
# Make predictions
result = predictor.predict([[1.0, 2.0, 3.0, 4.0, 5.0]])
print(result)
# Clean up
predictor.delete_endpoint()
# inference.py (SageMaker entry point)
import joblib
import os
def model_fn(model_dir):
"""Load model"""
model_path = os.path.join(model_dir, 'model.joblib')
return joblib.load(model_path)
def input_fn(request_body, request_content_type):
"""Parse input"""
import json
import numpy as np
if request_content_type == 'application/json':
data = json.loads(request_body)
return np.array(data['instances'])
raise ValueError(f"Unsupported content type: {request_content_type}")
def predict_fn(input_data, model):
"""Make predictions"""
return model.predict(input_data)
def output_fn(prediction, accept):
"""Format output"""
import json
return json.dumps({'predictions': prediction.tolist()})
Google Cloud AI Platform (Vertex AI)
from google.cloud import aiplatform
# Initialize
aiplatform.init(project='my-project', location='us-central1')
# Upload model
model = aiplatform.Model.upload(
display_name='my-sklearn-model',
artifact_uri='gs://my-bucket/model/',
serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest'
)
# Deploy to endpoint
endpoint = model.deploy(
machine_type='n1-standard-2',
min_replica_count=1,
max_replica_count=3,
accelerator_type=None
)
# Make predictions
predictions = endpoint.predict(instances=[[1.0, 2.0, 3.0, 4.0, 5.0]])
print(predictions)
# Clean up
endpoint.undeploy_all()
endpoint.delete()
Azure Machine Learning
from azureml.core import Workspace, Model, Environment
from azureml.core.webservice import AciWebservice
from azureml.core.model import InferenceConfig
# Connect to workspace
ws = Workspace.from_config()
# Register model
model = Model.register(
workspace=ws,
model_path='model.joblib',
model_name='my-model',
tags={'framework': 'sklearn', 'version': '1.0'}
)
# Create environment
env = Environment.from_pip_requirements(
name='sklearn-env',
file_path='requirements.txt'
)
# Inference config
inference_config = InferenceConfig(
entry_script='score.py',
environment=env
)
# Deployment config
deployment_config = AciWebservice.deploy_configuration(
cpu_cores=1,
memory_gb=1,
auth_enabled=True
)
# Deploy
service = Model.deploy(
workspace=ws,
name='my-model-service',
models=[model],
inference_config=inference_config,
deployment_config=deployment_config
)
service.wait_for_deployment(show_output=True)
print(service.scoring_uri)
Kubernetes with KServe
# inferenceservice.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: sklearn-model
namespace: ml-serving
spec:
predictor:
minReplicas: 1
maxReplicas: 5
sklearn:
storageUri: "gs://my-bucket/models/sklearn"
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 1
memory: 1Gi
# Deploy
kubectl apply -f inferenceservice.yaml
# Check status
kubectl get inferenceservice sklearn-model
# Test
curl -v -H "Content-Type: application/json" \
http://sklearn-model.ml-serving.example.com/v1/models/sklearn-model:predict \
-d '{"instances": [[1.0, 2.0, 3.0, 4.0, 5.0]]}'
Monitoring and Observability
Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Define metrics
PREDICTION_COUNTER = Counter(
'ml_predictions_total',
'Total number of predictions',
['model_version', 'prediction_class']
)
PREDICTION_LATENCY = Histogram(
'ml_prediction_latency_seconds',
'Prediction latency in seconds',
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
MODEL_LOAD_TIME = Gauge(
'ml_model_load_time_seconds',
'Time to load model'
)
PREDICTION_PROBABILITY = Histogram(
'ml_prediction_probability',
'Prediction probability distribution',
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
# Start metrics server
start_http_server(9090)
# Use in prediction endpoint
@app.post("/predict")
async def predict(request: PredictionRequest):
with PREDICTION_LATENCY.time():
prediction = model.predict([request.features])[0]
probability = model.predict_proba([request.features]).max()
PREDICTION_COUNTER.labels(
model_version='1.0.0',
prediction_class=str(prediction)
).inc()
PREDICTION_PROBABILITY.observe(probability)
return {"prediction": int(prediction)}
Data Drift Detection
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import *
import pandas as pd
class DriftMonitor:
def __init__(self, reference_data):
self.reference_data = reference_data
def check_drift(self, current_data, threshold=0.1):
"""Check for data drift"""
report = Report(metrics=[
DataDriftPreset(),
TargetDriftPreset()
])
report.run(
reference_data=self.reference_data,
current_data=current_data
)
# Extract drift score
drift_results = report.as_dict()
drift_detected = drift_results['metrics'][0]['result']['dataset_drift']
return {
'drift_detected': drift_detected,
'drift_score': drift_results['metrics'][0]['result']['drift_share'],
'drifted_features': [
col for col, data in drift_results['metrics'][0]['result']['drift_by_columns'].items()
if data['drift_detected']
]
}
def generate_report(self, current_data, output_path):
"""Generate HTML drift report"""
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset(),
TargetDriftPreset()
])
report.run(
reference_data=self.reference_data,
current_data=current_data
)
report.save_html(output_path)
# Usage
monitor = DriftMonitor(training_data)
drift_info = monitor.check_drift(production_data)
if drift_info['drift_detected']:
logger.warning(f"Data drift detected! Drifted features: {drift_info['drifted_features']}")
# Trigger retraining pipeline or alert
Logging with Structured Logs
import structlog
import json
from datetime import datetime
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
@app.post("/predict")
async def predict(request: PredictionRequest):
request_id = str(uuid.uuid4())
start_time = time.time()
logger.info(
"prediction_request",
request_id=request_id,
features_count=len(request.features)
)
try:
prediction = model.predict([request.features])[0]
probability = float(model.predict_proba([request.features]).max())
latency = time.time() - start_time
logger.info(
"prediction_complete",
request_id=request_id,
prediction=int(prediction),
probability=probability,
latency_ms=latency * 1000
)
return {"prediction": int(prediction), "request_id": request_id}
except Exception as e:
logger.error(
"prediction_error",
request_id=request_id,
error=str(e)
)
raise
Scaling and Performance
Load Balancing with Nginx
# nginx.conf
upstream ml_api {
least_conn;
server ml-api-1:8000;
server ml-api-2:8000;
server ml-api-3:8000;
}
server {
listen 80;
location / {
proxy_pass http://ml_api;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_connect_timeout 60s;
proxy_read_timeout 60s;
}
location /health {
proxy_pass http://ml_api;
proxy_connect_timeout 5s;
proxy_read_timeout 5s;
}
}
Model Optimization
# Quantization for faster inference
import torch
# Dynamic quantization (CPU)
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
# Static quantization
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# Calibrate with representative data
for batch in calibration_data:
model(batch)
torch.quantization.convert(model, inplace=True)
# ONNX Runtime optimization
import onnxruntime as ort
session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session_options.intra_op_num_threads = 4
session = ort.InferenceSession(
'model.onnx',
sess_options=session_options,
providers=['CPUExecutionProvider']
)
Batching for Throughput
import asyncio
from collections import defaultdict
import time
class BatchingPredictor:
def __init__(self, model, max_batch_size=32, max_wait_time=0.1):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.pending_requests = []
self.lock = asyncio.Lock()
async def predict(self, features):
"""Add request to batch and wait for result"""
future = asyncio.Future()
async with self.lock:
self.pending_requests.append({
'features': features,
'future': future
})
if len(self.pending_requests) >= self.max_batch_size:
await self._process_batch()
# Wait for result with timeout
try:
return await asyncio.wait_for(future, timeout=self.max_wait_time * 2)
except asyncio.TimeoutError:
await self._process_batch()
return await future
async def _process_batch(self):
"""Process accumulated requests as a batch"""
async with self.lock:
if not self.pending_requests:
return
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
# Batch inference
features = [r['features'] for r in batch]
predictions = self.model.predict(features)
# Set results
for request, prediction in zip(batch, predictions):
request['future'].set_result(int(prediction))
# Usage in FastAPI
batcher = BatchingPredictor(model)
@app.post("/predict")
async def predict(request: PredictionRequest):
prediction = await batcher.predict(request.features)
return {"prediction": prediction}
CI/CD for ML
GitHub Actions Pipeline
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: pytest tests/ --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: docker build -t ml-api:${{ github.sha }} .
- name: Run container tests
run: |
docker run -d -p 8000:8000 ml-api:${{ github.sha }}
sleep 10
curl -f http://localhost:8000/health
- name: Push to registry
if: github.ref == 'refs/heads/main'
run: |
docker tag ml-api:${{ github.sha }} ${{ secrets.REGISTRY }}/ml-api:latest
docker push ${{ secrets.REGISTRY }}/ml-api:latest
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/ml-api \
ml-api=${{ secrets.REGISTRY }}/ml-api:latest
Model Testing
# tests/test_model.py
import pytest
import numpy as np
import joblib
@pytest.fixture
def model():
return joblib.load('model.joblib')
@pytest.fixture
def sample_input():
return np.array([[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]])
def test_model_loads(model):
"""Test model can be loaded"""
assert model is not None
def test_model_predicts(model, sample_input):
"""Test model can make predictions"""
prediction = model.predict(sample_input)
assert prediction is not None
assert len(prediction) == 1
def test_prediction_shape(model, sample_input):
"""Test prediction has correct shape"""
prediction = model.predict(sample_input)
assert prediction.shape == (1,)
def test_probability_sum(model, sample_input):
"""Test probabilities sum to 1"""
proba = model.predict_proba(sample_input)
assert np.isclose(proba.sum(), 1.0, atol=1e-6)
def test_prediction_deterministic(model, sample_input):
"""Test predictions are deterministic"""
pred1 = model.predict(sample_input)
pred2 = model.predict(sample_input)
assert np.array_equal(pred1, pred2)
def test_invalid_input_handling(model):
"""Test model handles invalid input"""
with pytest.raises(Exception):
model.predict(np.array([['invalid']]))
# tests/test_api.py
from fastapi.testclient import TestClient
from app import app
client = TestClient(app)
def test_health_endpoint():
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_predict_endpoint():
response = client.post("/predict", json={
"features": [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
})
assert response.status_code == 200
assert "prediction" in response.json()
def test_predict_invalid_features():
response = client.post("/predict", json={
"features": [1.0, 2.0] # Wrong number of features
})
assert response.status_code == 422
Security Best Practices
API Authentication
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
app = FastAPI()
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
def create_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/predict")
async def predict(request: PredictionRequest, token: dict = Depends(verify_token)):
# Token is valid, proceed with prediction
prediction = model.predict([request.features])[0]
return {"prediction": int(prediction)}
Input Validation
from pydantic import BaseModel, validator, Field
import numpy as np
class PredictionRequest(BaseModel):
features: list[float] = Field(..., min_items=10, max_items=10)
@validator('features')
def validate_features(cls, v):
# Check for NaN/Inf
if any(np.isnan(x) or np.isinf(x) for x in v):
raise ValueError('Features contain NaN or Inf values')
# Check value ranges
if any(x < -1000 or x > 1000 for x in v):
raise ValueError('Feature values out of expected range')
return v
class Config:
# Prevent extra fields
extra = 'forbid'
Rate Limiting
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/predict")
@limiter.limit("100/minute")
async def predict(request: Request, data: PredictionRequest):
prediction = model.predict([data.features])[0]
return {"prediction": int(prediction)}
Deployment Checklist
Pre-Deployment
- Model serialized and versioned
- Unit tests passing
- Integration tests passing
- Performance benchmarks met
- Security review completed
- Documentation updated
Infrastructure
- Container built and tested
- Health check endpoint working
- Logging configured
- Metrics collection setup
- Alerting configured
- Secrets management in place
Monitoring
- Prediction latency tracking
- Error rate monitoring
- Data drift detection
- Model performance tracking
- Resource utilization monitoring
Operations
- Rollback procedure documented
- Scaling policies configured
- Backup strategy defined
- Incident response plan
- On-call rotation setup
Summary
Deploying ML models to production requires careful attention to:
- Serialization: Choose the right format for your use case
- APIs: Build robust, well-documented endpoints
- Containerization: Use Docker for reproducibility
- Versioning: Track models with MLflow or similar tools
- Monitoring: Watch for drift and performance degradation
- Security: Authenticate, validate, and rate limit
- Scaling: Design for your throughput requirements
Start simple, iterate, and add complexity only as needed. The best deployment is one that reliably serves users while being maintainable by your team.
Further Reading
- “Machine Learning Engineering” - Andriy Burkov
- “Building Machine Learning Powered Applications” - Emmanuel Ameisen
- MLOps Community - mlops.community
- Kubeflow Documentation - kubeflow.org
- MLflow Documentation - mlflow.org