Model Observability and Monitoring
Scenario: Model Drift
In this example, we'll deploy a model on an insurance dataset to make two predictions. Introducing intentional drifts in the BMI and Age columns, we aim to observe their impact on the model's predictions.
-
Open Jupyter Notebook
-
Train and deploy a model on insurance dataset
-
Making predictions with deployed model
-
Multiplying the BMI and Age columns to create Drifts on features and predictions
-
Observing the model drift plots
Defining parameters.
This section defines key parameters for the notebook. Parameters control the behavior of the code, making it easy to customize without altering the logic. By centralizing parameters at the start, we ensure better readability, maintainability, and adaptability for different use cases.
deployment_key = None
prefix = None
model_name = None
practicus_url = None # Example http://company.practicus.com
assert deployment_key, "Please select a deployment key"
assert prefix, "Please select a prefix"
assert model_name, "Please enter a model_name"
assert practicus_url, "Please enter practicus_url"
Model Development
Loading and preparing the dataset
data_set_conn = {"connection_type": "WORKER_FILE", "file_path": "/home/ubuntu/samples/data/insurance.csv"}
# Loading the dataset to worker
import practicuscore as prt
region = prt.current_region()
worker = region.get_or_create_worker()
proc = worker.load(data_set_conn)
proc.show_head()
# Pre-process
proc.categorical_map(column_name="sex", column_suffix="category")
proc.categorical_map(column_name="smoker", column_suffix="category")
proc.categorical_map(column_name="region", column_suffix="category")
proc.delete_columns(["region", "smoker", "sex"])
Model Training
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
import xgboost as xgb
from sklearn.pipeline import Pipeline
pipeline = Pipeline([("model", xgb.XGBRegressor(objective="reg:squarederror", n_estimators=100))])
pipeline.fit(X_train, y_train)
# Exporting the model
import cloudpickle
with open("model.pkl", "wb") as f:
cloudpickle.dump(pipeline, f)
Model Deployment
prt.models.deploy(
deployment_key=deployment_key,
prefix=prefix,
model_name=model_name,
model_dir=None, # Current Dir
)
Prediction
# Loading the dataset to worker
import practicuscore as prt
region = prt.current_region()
worker = region.get_or_create_worker()
proc = worker.load(data_set_conn)
proc.categorical_map(column_name="sex", column_suffix="category")
proc.categorical_map(column_name="smoker", column_suffix="category")
proc.categorical_map(column_name="region", column_suffix="category")
proc.delete_columns(["region", "smoker", "sex"])
df = proc.get_df_copy()
df.head()
# Let's construct the REST API url.
# Please replace the below url with your current Practicus AI address
# e.g. http://practicus.company.com
# *All* Practicus AI model APIs follow the below url convention
api_url = f"https://{practicus_url}/{prefix}/{model_name}/"
# Important: For effective traffic routing, always terminate the url with / at the end.
print("Model REST API Url:", api_url)
# We will be using using the SDK to get a session token.
# To learn how to get a token without the SDK, please view 05_others/tokens sample notebook
token = prt.models.get_session_token(api_url)
print("API session token:", token)
import requests
import pandas as pd
headers = {"authorization": f"Bearer {token}", "content-type": "text/csv"}
data_csv = df.to_csv(index=False)
r = requests.post(api_url, headers=headers, data=data_csv)
if not r.ok:
raise ConnectionError(f"{r.status_code} - {r.text}")
from io import BytesIO
pred_df = pd.read_csv(BytesIO(r.content))
print("Prediction Result:")
pred_df.head()
- After you make the first prediction, please wait for 5 minutes to see a clearer picture on the drift plot
- When we look at Model Drifts Dashboard at Grafana we will see plots with the model drift visible
Hand-Made Model Drift
headers = {"authorization": f"Bearer {token}", "content-type": "text/csv"}
data_csv = df.to_csv(index=False)
r = requests.post(api_url, headers=headers, data=data_csv)
if not r.ok:
raise ConnectionError(f"{r.status_code} - {r.text}")
pred_df = pd.read_csv(BytesIO(r.content))
print("Prediction Result:")
pred_df.head()
- After you make the second prediction, please wait for 2 minutes to see a clearer picture on the drift plot
- When we look at Model Drifts Dashboard at Grafana we will see plots with the model drift visible
Supplementary Files
model.py
import os
from typing import Optional
import pandas as pd
from starlette.exceptions import HTTPException
import joblib
model_pipeline = None
async def init(model_meta=None, *args, **kwargs):
global model_pipeline
current_dir = os.path.dirname(__file__)
model_file = os.path.join(current_dir, "model.pkl")
if not os.path.exists(model_file):
raise HTTPException(status_code=404, detail=f"Could not locate model file: {model_file}")
model_pipeline = joblib.load(model_file)
async def predict(http_request, df: Optional[pd.DataFrame] = None, *args, **kwargs) -> pd.DataFrame:
if df is None:
raise HTTPException(status_code=500, detail="No dataframe received")
if "charges" in df.columns:
# Dropping 'charges' since it is the target
df = df.drop("charges", axis=1)
# Making predictions
predictions = model_pipeline.predict(df)
# Converting predictions to a DataFrame
predictions_df = pd.DataFrame(predictions, columns=["Predictions"])
return predictions_df
Previous: Experiment Tracking Model Training | Next: Model Observability > Model Observability