ML Pipeline, Model Training, Experiment Tracking (MLflow), Model Serving, Docker, Kubernetes, Monitoring & Drift — production ML mastery.
An ML pipeline is a sequence of automated steps for data ingestion, preprocessing, feature engineering, model training, evaluation, and deployment. Orchestrating pipelines ensures reproducibility, scalability, and reliability.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
# ── Define Column Groups ──
numeric_features = ['age', 'salary', 'tenure']
categorical_features = ['department', 'education', 'location']
# ── Preprocessing Pipelines ──
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])
# ── Column Transformer (apply different transforms to different columns) ──
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features),
])
# ── Full Pipeline (preprocessing + model) ──
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=200, random_state=42)),
])
# ── Train & Evaluate ──
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
scores = cross_val_score(full_pipeline, X_train, y_train, cv=5, scoring='f1')
full_pipeline.fit(X_train, y_train)
print(f"CV F1: {scores.mean():.4f} (+/- {scores.std():.4f})")
# ── Save Pipeline (model + preprocessing in one object) ──
import joblib
joblib.dump(full_pipeline, 'model_pipeline.joblib')| Stage | Components | Tools | Key Considerations |
|---|---|---|---|
| Data Ingestion | Data sources, connectors, APIs | Airflow, Prefect, Dagster | Data freshness, schema validation, idempotency |
| Data Validation | Schema checks, drift detection, stats | Great Expectations, Evidently, Pandera | Catch data quality issues early |
| Preprocessing | Cleaning, encoding, scaling, imputation | sklearn Pipeline, Pandas | Must be reproducible, no data leakage |
| Feature Store | Centralized feature management | Feast, Tecton, Hopsworks | Feature reuse, point-in-time correctness |
| Training | Model fitting, hyperparameter tuning | MLflow, Weights & Biases, ClearML | Track experiments, version models |
| Evaluation | Metrics, bias detection, A/B testing | Evidently, SHAP, Fairlearn | Cross-validation, statistical significance |
| Deployment | Model serving, scaling, rollback | BentoML, Seldon, KServe | Canary deployments, shadow mode |
| Monitoring | Performance, drift, alerts | Evidently, Arize, Fiddler | Data drift, concept drift, latency |
Model training encompasses data splitting, cross-validation, hyperparameter tuning, and selecting the right algorithm. Proper training methodology separates production models from prototypes.
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import (
train_test_split, cross_val_score, GridSearchCV,
RandomizedSearchCV, StratifiedKFold, validation_curve
)
from sklearn.metrics import make_scorer, f1_score
import numpy as np
# ── Proper Data Splitting ──
# For production: maintain a hold-out test set that is NEVER touched during development
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)
# Result: 60% train, 20% val, 20% test
# ── Cross-Validation Strategy ──
# Stratified K-Fold for classification (preserves class balance)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
# ── Hyperparameter Tuning ──
param_grid = {
'n_estimators': [100, 200, 300, 500],
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': [2, 5, 10, 20],
'max_features': ['sqrt', 'log2', 0.5],
'class_weight': ['balanced', 'balanced_subsample', None]
}
# Grid Search (exhaustive, slow but thorough)
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid, cv=cv, scoring='f1_weighted',
n_jobs=-1, verbose=2, refit=True
)
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")
print(f"Best CV F1: {grid_search.best_score_:.4f}")
# Randomized Search (faster, good for large search spaces)
from scipy.stats import randint, uniform
param_dist = {
'n_estimators': randint(100, 1000),
'max_depth': randint(3, 30),
'min_samples_split': randint(2, 20),
'max_features': uniform(0.1, 0.9),
}
random_search = RandomizedSearchCV(
RandomForestClassifier(random_state=42),
param_dist, n_iter=100, cv=cv, scoring='f1_weighted',
n_jobs=-1, random_state=42, verbose=1
)from sklearn.model_selection import learning_curve, validation_curve
import matplotlib.pyplot as plt
import numpy as np
# ── Learning Curve (diagnose bias vs variance) ──
train_sizes, train_scores, val_scores = learning_curve(
estimator=full_pipeline,
X=X_train, y=y_train,
train_sizes=np.linspace(0.1, 1.0, 10),
cv=cv, scoring='f1', n_jobs=-1
)
train_mean = np.mean(train_scores, axis=1)
val_mean = np.mean(val_scores, axis=1)
plt.plot(train_sizes, train_mean, label='Training Score')
plt.plot(train_sizes, val_mean, label='Validation Score')
plt.xlabel('Training Set Size')
plt.ylabel('F1 Score')
plt.legend()
# High gap = high variance (overfitting)
# Both low = high bias (underfitting)
# ── Validation Curve (find optimal hyperparameter) ──
param_range = [10, 50, 100, 200, 500, 1000]
train_scores, val_scores = validation_curve(
estimator=full_pipeline,
X=X_train, y=y_train,
param_name='classifier__n_estimators',
param_range=param_range,
cv=cv, scoring='f1', n_jobs=-1
)Experiment tracking records all parameters, metrics, artifacts, and environment details for every training run. This enables reproducibility, comparison, and collaboration in ML projects.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
# ── Start MLflow Tracking Server ──
# Terminal: mlflow server --host 0.0.0.0 --port 5000
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("customer-churn-v2")
# ── Log an Experiment Run ──
with mlflow.start_run(run_name="rf-tuning-v3"):
# Log parameters
mlflow.log_params({
"model": "RandomForest",
"n_estimators": 300,
"max_depth": 15,
"min_samples_split": 5,
"random_state": 42,
})
# Train model
model = RandomForestClassifier(
n_estimators=300, max_depth=15,
min_samples_split=5, random_state=42
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
# Log metrics
mlflow.log_metrics({
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average='weighted'),
"roc_auc": roc_auc_score(y_test, y_prob),
})
# Log model (for deployment later)
mlflow.sklearn.log_model(model, "model", registered_model_name="churn-model")
# Log artifacts (any file)
import json
with open("feature_importance.json", "w") as f:
json.dump(dict(zip(feature_names, model.feature_importances_)), f)
mlflow.log_artifact("feature_importance.json")
# Log a plot
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")# ── Model Registry (Promote models through stages) ──
from mlflow.tracking import MlflowClient
client = MlflowClient()
# List registered models
models = client.search_registered_models()
# Transition model stage
client.transition_model_version_stage(
name="churn-model",
version=3,
stage="Staging", # None -> Staging -> Production -> Archived
archive_existing_versions=False
)
# Load a registered model for inference
import mlflow.sklearn
model_uri = "models:/churn-model/Production"
loaded_model = mlflow.sklearn.load_model(model_uri)
predictions = loaded_model.predict(new_data)| Tool | Open Source | Strengths | Cloud Hosted | Best For |
|---|---|---|---|---|
| MLflow | Yes | Model registry, tracking, serving, simple API | Databricks | General-purpose, sklearn/XGBoost |
| Weights & Biases | Yes (core) | Experiment comparison, sweeps, reports, teams | wandb.ai | Deep learning, collaborative research |
| ClearML | Yes | Auto-logging, MLOps orchestration, pipelines | clear.ml | End-to-end MLOps, auto-tracking |
| Neptune | Yes (SDK) | Team collaboration, custom metrics, dashboards | neptune.ai | Large teams, experiment management |
| Comet | Yes (core) | Code diffs, hyperparameter optimization, production | comet.com | Production ML, debugging |
| Aim | Yes | Fast querying, efficient storage, dashboards | Self-hosted | Large-scale experiment tracking |
| DVC | Yes | Data & model versioning (Git for data) | Iterative Studio | Data versioning, pipeline orchestration |
Model serving makes trained models available for inference via APIs, batch jobs, or streaming. The serving infrastructure must handle scalability, latency, versioning, and monitoring.
# ── REST API with FastAPI ──
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
app = FastAPI(title="Churn Prediction API", version="1.0")
# Load model pipeline
model = joblib.load("model_pipeline.joblib")
class PredictionRequest(BaseModel):
age: float
salary: float
tenure: float
department: str
education: str
location: str
class PredictionResponse(BaseModel):
churn_probability: float
churn_prediction: int # 0 or 1
model_version: str
@app.get("/health")
async def health_check():
return {"status": "healthy", "model_version": "1.2.0"}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
input_data = pd.DataFrame([request.model_dump()])
probability = model.predict_proba(input_data)[0, 1]
prediction = int(probability >= 0.5)
return PredictionResponse(
churn_probability=round(float(probability), 4),
churn_prediction=prediction,
model_version="1.2.0"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch")
async def predict_batch(requests: list[PredictionRequest]):
df = pd.DataFrame([r.model_dump() for r in requests])
probabilities = model.predict_proba(df)[:, 1]
return [
{"probability": round(float(p), 4), "prediction": int(p >= 0.5)}
for p in probabilities
]# ── BentoML (Production-grade model serving) ──
import bentoml
from bentoml.io import JSON, NumpyNdarray
import numpy as np
# Save model to BentoML model store
bentoml.sklearn.save_model("churn_classifier", model_pipeline)
# ── Create a Service (service.py) ──
# import bentoml
# import numpy as np
# from bentoml.io import JSON
#
# classifier_ref = bentoml.sklearn.get("churn_classifier:latest")
# runner = classifier_ref.to_runner()
#
# svc = bentoml.Service("churn-prediction", runners=[runner])
#
# @svc.api(input=JSON(), output=JSON())
# async def predict(input_data):
# df = pd.DataFrame([input_data])
# result = await runner.predict_proba.async_run(df)
# return {"probability": float(result[0][1])}
# ── Serve locally ──
# bentoml serve service.py:svc
# ── Containerize & Deploy ──
# bentoml containerize churn-prediction:latest
# bentoml deploy aws-lambda churn-prediction:latest| Strategy | Latency | Throughput | Use Case | Tools |
|---|---|---|---|---|
| REST API | 10-500ms | Low-Medium | Synchronous requests | FastAPI, Flask, BentoML |
| gRPC | 1-50ms | High | Inter-service communication | gRPC, Triton Inference Server |
| Batch Inference | Minutes-Hours | Very High | Nightly scoring, recommendations | Spark, Airflow, Ray |
| Streaming | 1-10ms | High | Real-time features, fraud detection | Kafka Streams, Flink |
| Edge / On-Device | < 10ms | Device-dependent | Mobile apps, IoT, offline | ONNX, TF Lite, CoreML |
| Serverless | Cold: 1-10s | Variable | Bursty, low average traffic | AWS Lambda, GCP Cloud Run |
Docker ensures reproducibility across development, training, and deployment environments. Containerizing ML workloads eliminates "works on my machine" issues and enables seamless deployment.
# ── Multi-stage Dockerfile for ML API ──
# Stage 1: Base environment
FROM python:3.11-slim AS base
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ libgomp1 && \
rm -rf /var/lib/apt/lists/*
# Stage 2: Dependencies
FROM base AS deps
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Stage 3: Production
FROM base AS production
COPY --from=deps /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=deps /usr/local/bin /usr/local/bin
# Copy application code and model
COPY app/ ./app/
COPY models/ ./models/
# Non-root user for security
RUN useradd -m -u 1000 mluser
USER mluser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]# ── Docker Compose for ML Stack ──
version: '3.8'
services:
# MLflow Tracking Server
mlflow:
image: ghcr.io/mlflow/mlflow:v2.12.1
ports:
- "5000:5000"
environment:
- MLFLOW_BACKEND_STORE_URI=postgresql://mlflow:mlflow@db:5432/mlflow
- MLFLOW_ARTIFACT_ROOT=s3://mlflow-artifacts/
command: >
mlflow server
--host 0.0.0.0
--port 5000
--backend-store-uri postgresql://mlflow:mlflow@db:5432/mlflow
--default-artifact-root s3://mlflow-artifacts/
depends_on:
db:
condition: service_healthy
# Model API
api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- MODEL_PATH=/app/models/churn_model.joblib
- MLFLOW_TRACKING_URI=http://mlflow:5000
volumes:
- ./models:/app/models:ro
depends_on:
- mlflow
# PostgreSQL for MLflow
db:
image: postgres:16-alpine
environment:
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: mlflow
POSTGRES_DB: mlflow
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U mlflow"]
interval: 5s
timeout: 5s
retries: 5
# MinIO (S3-compatible artifact storage)
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- miniodata:/data
volumes:
pgdata:
miniodata:Kubernetes provides auto-scaling, self-healing, and resource management for ML workloads. From model serving to distributed training, K8s is the standard for production ML infrastructure.
# ── Model Serving Deployment ──
apiVersion: apps/v1
kind: Deployment
metadata:
name: churn-model
labels:
app: churn-model
version: v1.2.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: churn-model
template:
metadata:
labels:
app: churn-model
spec:
containers:
- name: model-api
image: registry.example.com/ml/churn-model:v1.2.0
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
env:
- name: MODEL_PATH
value: "/app/models/churn_model.joblib"
- name: LOG_LEVEL
value: "info"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 15
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: model-volume
mountPath: /app/models
readOnly: true
volumes:
- name: model-volume
persistentVolumeClaim:
claimName: model-pvc
---
apiVersion: v1
kind: Service
metadata:
name: churn-model-service
spec:
type: ClusterIP
ports:
- port: 80
targetPort: 8000
selector:
app: churn-model
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: churn-model-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: churn-model
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70| Tool | Purpose | Key Features | Best For |
|---|---|---|---|
| KServe | Model serving | Multi-framework, canary, GPU scheduling | Serving multiple ML models |
| Kubeflow | ML platform | Pipelines, Notebooks, Katib (AutoML) | End-to-end ML on K8s |
| TorchElastic / TorchRun | Distributed PyTorch | Fault-tolerant distributed training | Large-scale model training |
| Ray on K8s | Distributed computing | Ray Serve, Ray Train, Ray Tune | Scalable training & serving |
| TFJob (Kubeflow) | Distributed TensorFlow | Fault-tolerant TF training | TensorFlow training at scale |
| NVIDIA GPU Operator | GPU management | Driver, CUDA, device plugin automation | GPU-enabled workloads |
| Argo Workflows | Workflow orchestration | DAG-based pipeline orchestration | ML pipeline orchestration |
ML models degrade over time as data distributions shift. Monitoring model performance and detecting drift early is critical for maintaining reliable predictions in production.
import numpy as np
from scipy.stats import ks_2samp
from sklearn.metrics import accuracy_score
# ── Data Drift Detection ──
# Kolmogorov-Smirnov test (numeric features)
def detect_data_drift(reference_data, current_data, threshold=0.05):
drift_report = {}
for col in reference_data.columns:
if reference_data[col].dtype in ['float64', 'int64']:
stat, p_value = ks_2samp(reference_data[col], current_data[col])
is_drifted = p_value < threshold
drift_report[col] = {
'statistic': round(stat, 4),
'p_value': round(p_value, 4),
'drifted': is_drifted
}
return drift_report
# ── Population Stability Index (PSI) ──
def calculate_psi(expected, actual, bins=10):
"""PSI > 0.2 indicates significant drift"""
breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
expected_pct = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_pct = np.histogram(actual, bins=breakpoints)[0] / len(actual)
# Avoid division by zero
expected_pct = np.clip(expected_pct, 1e-4, None)
actual_pct = np.clip(actual_pct, 1e-4, None)
psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
return round(psi, 4)
# PSI interpretation: < 0.1 (no change), 0.1-0.2 (moderate), > 0.2 (significant)# ── Evidently AI (Comprehensive ML Monitoring) ──
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
# Column mapping
column_mapping = ColumnMapping(
target='churn',
prediction='prediction',
numerical_features=['age', 'salary', 'tenure'],
categorical_features=['department', 'education'],
)
# Data Drift Report
drift_report = Report(metrics=[DataDriftPreset()])
drift_report.run(reference_data=train_df, current_data=production_df)
drift_report.save_html("drift_report.html")
# Model Performance Report
perf_report = Report(metrics=[ClassificationPreset()])
perf_report.run(reference_data=test_df, current_data=production_df,
column_mapping=column_mapping)| Drift Type | Description | Detection Method | Response |
|---|---|---|---|
| Data Drift (Covariate) | Input feature distribution changes | KS test, PSI, Jensen-Shannon | Retrain with recent data, feature adaptation |
| Concept Drift | Relationship between X and y changes | Performance monitoring, error rate tracking | Full retrain, model architecture change |
| Prediction Drift | Model output distribution changes | KL divergence on predictions, tracking | Investigate data and concept drift |
| Label Drift | Distribution of target variable changes | Compare label distributions over time | Update class weights, rebalance data |
Key MLOps interview questions with detailed answers.
Answer: The training set is used to fit model parameters (weights). The validation set is used to tune hyperparameters and select the best model configuration. The test set is used only once at the end for final unbiased performance evaluation.
A common split is 60/20/20 or 70/15/15. The test set should never be used during model development — not for feature selection, not for hyperparameter tuning, not for model selection. It simulates real-world unseen data.
Answer: My deployment workflow:
Answer: Data drift occurs when the statistical distribution of input features changes between training and production data. This can happen due to seasonal changes, market shifts, or user behavior evolution.
Detection: KS test for numeric features, chi-square for categorical, PSI (Population Stability Index) for overall distribution comparison. Set up automated monitoring with Evidently or custom dashboards.
Response: (1) Short-term: retrain model with recent data. (2) Medium-term: implement online learning or periodic retraining. (3) Long-term: design adaptive features that are robust to distribution shifts.
Answer: Blue-green deployment maintains two identical environments (blue = current, green = new). Traffic is switched entirely from blue to green. Rollback is instant by switching back.
Canary deployment gradually shifts traffic: 5% to new version, 25%, 50%, then 100%. This provides statistically meaningful comparison with real traffic. If metrics degrade, you can stop before affecting all users.
Recommendation: For ML models, canary is preferred because you need to compare prediction quality on real data, not just infrastructure health. Use A/B testing frameworks to measure business impact.
Answer: Reproducibility requires versioning every component of the ML pipeline: