Skip to content

Model Observability and Monitoring

Scenario: Model Drift

In this example, we'll deploy a model on an insurance dataset to make two predictions. Introducing intentional drifts in the BMI and Age columns, we aim to observe their impact on the model's predictions.

  1. Open Jupyter Notebook

  2. Train and deploy a model on insurance dataset

  3. Making predictions with deployed model

  4. Multiplying the BMI and Age columns to create Drifts on features and predictions

  5. Observing the model drift plots

Defining parameters.

This section defines key parameters for the notebook. Parameters control the behavior of the code, making it easy to customize without altering the logic. By centralizing parameters at the start, we ensure better readability, maintainability, and adaptability for different use cases.

deployment_key = None
prefix = None
model_name = None
practicus_url = None  # Example http://company.practicus.com
assert deployment_key, "Please select a deployment key"
assert prefix, "Please select a prefix"
assert model_name, "Please enter a model_name"
assert practicus_url, "Please enter practicus_url"

Model Development

Loading and preparing the dataset

data_set_conn = {"connection_type": "WORKER_FILE", "file_path": "/home/ubuntu/samples/data/insurance.csv"}
# Loading the dataset to worker

import practicuscore as prt

region = prt.current_region()
worker = region.get_or_create_worker()
proc = worker.load(data_set_conn)

proc.show_head()
# Pre-process

proc.categorical_map(column_name="sex", column_suffix="category")
proc.categorical_map(column_name="smoker", column_suffix="category")
proc.categorical_map(column_name="region", column_suffix="category")
proc.delete_columns(["region", "smoker", "sex"])
# Taking the dataset into csv

df = proc.get_df_copy()
df.head()

Model Training

X = df.drop("charges", axis=1)
y = df["charges"]
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
import xgboost as xgb
from sklearn.pipeline import Pipeline

pipeline = Pipeline([("model", xgb.XGBRegressor(objective="reg:squarederror", n_estimators=100))])
pipeline.fit(X_train, y_train)
# Exporting the model

import cloudpickle

with open("model.pkl", "wb") as f:
    cloudpickle.dump(pipeline, f)

Model Deployment

prt.models.deploy(
    deployment_key=deployment_key,
    prefix=prefix,
    model_name=model_name,
    model_dir=None,  # Current Dir
)

Prediction

# Loading the dataset to worker

import practicuscore as prt

region = prt.current_region()
worker = region.get_or_create_worker()
proc = worker.load(data_set_conn)

proc.categorical_map(column_name="sex", column_suffix="category")
proc.categorical_map(column_name="smoker", column_suffix="category")
proc.categorical_map(column_name="region", column_suffix="category")
proc.delete_columns(["region", "smoker", "sex"])

df = proc.get_df_copy()
df.head()
# Let's construct the REST API url.
# Please replace the below url with your current Practicus AI address
# e.g. http://practicus.company.com

# *All* Practicus AI model APIs follow the below url convention
api_url = f"https://{practicus_url}/{prefix}/{model_name}/"
# Important: For effective traffic routing, always terminate the url with / at the end.
print("Model REST API Url:", api_url)
# We will be using using the SDK to get a session token.
# To learn how to get a token without the SDK, please view 05_others/tokens sample notebook
token = prt.models.get_session_token(api_url)
print("API session token:", token)
import requests
import pandas as pd

headers = {"authorization": f"Bearer {token}", "content-type": "text/csv"}
data_csv = df.to_csv(index=False)

r = requests.post(api_url, headers=headers, data=data_csv)
if not r.ok:
    raise ConnectionError(f"{r.status_code} - {r.text}")

from io import BytesIO

pred_df = pd.read_csv(BytesIO(r.content))

print("Prediction Result:")
pred_df.head()
  • After you make the first prediction, please wait for 5 minutes to see a clearer picture on the drift plot
  • When we look at Model Drifts Dashboard at Grafana we will see plots with the model drift visible

Hand-Made Model Drift

df["age"] = df["age"] * 2
df["bmi"] = df["bmi"] * 3
display(df)
headers = {"authorization": f"Bearer {token}", "content-type": "text/csv"}
data_csv = df.to_csv(index=False)

r = requests.post(api_url, headers=headers, data=data_csv)
if not r.ok:
    raise ConnectionError(f"{r.status_code} - {r.text}")

pred_df = pd.read_csv(BytesIO(r.content))

print("Prediction Result:")
pred_df.head()
  • After you make the second prediction, please wait for 2 minutes to see a clearer picture on the drift plot
  • When we look at Model Drifts Dashboard at Grafana we will see plots with the model drift visible

Supplementary Files

model.py

import os
from typing import Optional
import pandas as pd
from starlette.exceptions import HTTPException
import joblib


model_pipeline = None


async def init(model_meta=None, *args, **kwargs):
    global model_pipeline

    current_dir = os.path.dirname(__file__)
    model_file = os.path.join(current_dir, "model.pkl")
    if not os.path.exists(model_file):
        raise HTTPException(status_code=404, detail=f"Could not locate model file: {model_file}")

    model_pipeline = joblib.load(model_file)


async def predict(http_request, df: Optional[pd.DataFrame] = None, *args, **kwargs) -> pd.DataFrame:
    if df is None:
        raise HTTPException(status_code=500, detail="No dataframe received")

    if "charges" in df.columns:
        # Dropping 'charges' since it is the target
        df = df.drop("charges", axis=1)

    # Making predictions
    predictions = model_pipeline.predict(df)

    # Converting predictions to a DataFrame
    predictions_df = pd.DataFrame(predictions, columns=["Predictions"])

    return predictions_df

Previous: Experiment Tracking Model Training | Next: Model Observability > Model Observability