Model Observability and Monitoring
Scenario: Model Drift
In this example, we'll deploy a model on an insurance dataset to make two predictions. Introducing intentional drifts in the BMI and Age columns, we aim to observe their impact on the model's predictions.
-
Open Jupyter Notebook
-
Train and deploy a model on insurance dataset
-
Making predictions with deployed model
-
Multiplying the BMI and Age columns to create Drifts on features and predictions
-
Observing the model drift plots
Data Loading and Pre-Processing with Practicus Core
This code demonstrates how to load, preprocess, and prepare a dataset for machine learning:
1. Dataset Connection:
- Specifies the connection type (WORKER_FILE
) and the path to the dataset file (insurance.csv
).
2. Load Dataset:
- Imports the Practicus Core library.
- Retrieves or creates a worker in the current region.
- Loads the dataset into the worker.
- Displays the first few rows of the dataset using proc.show_head()
.
3. Pre-processing:
- Applies categorical mapping to columns (sex
, smoker
, region
) to create new categorical columns.
- Deletes the original columns (region
, smoker
, sex
) after mapping.
4. Export Dataset:
- Retrieves a copy of the preprocessed dataset as a DataFrame.
- Displays the first few rows of the modified dataset.
5. Feature-Target Separation:
- Defines X
as the feature set by dropping the charges
column.
- Defines y
as the target variable (charges
).
data_set_conn = {
"connection_type": "WORKER_FILE",
"file_path": "/home/ubuntu/samples/insurance.csv"
}
# Loading the dataset to worker
import practicuscore as prt
region = prt.current_region()
worker = region.get_or_create_worker()
proc = worker.load(data_set_conn)
proc.show_head()
# Pre-process
proc.categorical_map(column_name='sex', column_suffix='category')
proc.categorical_map(column_name='smoker', column_suffix='category')
proc.categorical_map(column_name='region', column_suffix='category')
proc.delete_columns(['region', 'smoker', 'sex'])
Train-Test Split, Model Training, and Export
This code handles the following tasks:
1. Train-Test Split:
- The dataset is divided into training and testing subsets using an 80-20 split.
2. Pipeline Creation:
- Creates a machine learning pipeline using Pipeline
from sklearn
.
- Includes an XGBRegressor
model configured for regression with the reg:squarederror
objective and 100 estimators.
3. Model Training:
- Fits the pipeline to the training data (X_train
, y_train
).
4. Model Export:
- Serializes and saves the trained pipeline using cloudpickle
to a file named model.pkl
, enabling reuse of the trained model for future predictions.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
import xgboost as xgb
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('model', xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100))
])
pipeline.fit(X_train, y_train)
# Exporting the model
import cloudpickle
with open('model.pkl', 'wb') as f:
cloudpickle.dump(pipeline, f)
Model Deployment with Practicus
This code demonstrates how to deploy a trained model using Practicus:
1. Deployment Key:
- A deployment_key
is required for authorization. It must be obtained from the administrator.
- An assertion ensures that the deployment_key
is set; otherwise, an error is raised.
2. Model Deployment Configuration:
- prefix
: Specifies the prefix for organizing the deployed models.
- model_name
: Sets a custom name for the deployed model (custom-insurance-test
).
- model_dir
: Optionally specifies the directory of the model. If None
, the current directory is used.
3. Deploy Model:
- The prt.models.deploy()
function is used to deploy the model with the provided parameters.
# Please ask for a deployment key from your admin.
deployment_key = ""
assert deployment_key, "Please select a deployment_key"
prefix = "models"
model_name = "custom-insurance-test"
model_dir= None # Current dir
prt.models.deploy(
deployment_key=deployment_key,
prefix=prefix,
model_name=model_name,
model_dir=model_dir
)
Dataset Preparation, Model API Construction, and Prediction
This code performs the following tasks:
1. Load and Preprocess Dataset:
- Imports the Practicus Core library and initializes the worker.
- Loads the dataset and applies categorical mappings to specific columns (sex
, smoker
, region
).
- Deletes the original columns post-mapping to avoid redundancy.
- Retrieves the preprocessed dataset as a DataFrame.
2. Construct the REST API URL:
- Constructs the REST API URL for the deployed model.
- Requires the practicus_url
(base URL of the Practicus AI platform) and other parameters (prefix
, model_name
).
- Ensures the URL ends with a /
for proper routing.
3. Get Session Token:
- Retrieves an API session token using the Practicus AI SDK.
- Prints the token for reference.
4. Send Data for Prediction:
- Sends the preprocessed dataset to the model REST API in CSV format using a POST
request.
- Includes necessary headers, such as authorization (Bearer
token) and content type (text/csv
).
- Handles any connection errors that may occur.
5. Receive and Display Predictions:
- Reads the response from the API and converts it into a DataFrame.
- Prints the first few rows of the prediction results for review.
# Loading the dataset to worker
import practicuscore as prt
region = prt.current_region()
worker = region.get_or_create_worker()
proc = worker.load(data_set_conn)
proc.categorical_map(column_name='sex', column_suffix='category')
proc.categorical_map(column_name='smoker', column_suffix='category')
proc.categorical_map(column_name='region', column_suffix='category')
proc.delete_columns(['region', 'smoker', 'sex'])
df = proc.get_df_copy()
df.head()
# Let's construct the REST API url.
# Please replace the below url with your current Practicus AI address
# e.g. http://practicus.company.com
practicus_url = ""
assert practicus_url, "Please select practicus_url"
# *All* Practicus AI model APIs follow the below url convention
api_url = f"https://{practicus_url}/{prefix}/{model_name}/"
# Important: For effective traffic routing, always terminate the url with / at the end.
print("Model REST API Url:", api_url)
# We will be using using the SDK to get a session token.
# To learn how to get a token without the SDK, please view 05_others/tokens sample notebook
token = prt.models.get_session_token(api_url)
print("API session token:", token)
import requests
import pandas as pd
headers = {
'authorization': f'Bearer {token}',
'content-type': 'text/csv'
}
data_csv = df.to_csv(index=False)
r = requests.post(api_url, headers=headers, data=data_csv)
if not r.ok:
raise ConnectionError(f"{r.status_code} - {r.text}")
from io import BytesIO
pred_df = pd.read_csv(BytesIO(r.content))
print("Prediction Result:")
pred_df.head()
- After you make the first prediction, please wait for 5 minutes to see a clearer picture on the drift plot
- When we look at Model Drifts Dashboard at Grafana we will see plots with the model drift visible
Simulating Model Drift and Analyzing
This section of the code introduces artificial model drift and demonstrates how to evaluate its effects:
1. Hand-Made Model Drift:
- Artificially modifies the dataset by altering the age
and bmi
columns (multiplying by 2 and 3, respectively).
- Displays the modified dataset for verification.
2. Send Modified Data for Prediction:
- Prepares the modified dataset as a CSV file.
- Sends it to the deployed model API using a POST
request with proper authorization headers.
- Checks for errors during the request.
3. Analyze Predictions:
- Reads the API's prediction response into a DataFrame.
- Displays the prediction results to analyze the impact of drift on the model's outputs.
4. Monitor Drift on Grafana:
- After the second prediction, wait for 2 minutes to allow for drift detection updates.
- View the Model Drifts Dashboard in Grafana to observe visual plots indicating the extent of model drift.
headers = {
'authorization': f'Bearer {token}',
'content-type': 'text/csv'
}
data_csv = df.to_csv(index=False)
r = requests.post(api_url, headers=headers, data=data_csv)
if not r.ok:
raise ConnectionError(f"{r.status_code} - {r.text}")
pred_df = pd.read_csv(BytesIO(r.content))
print("Prediction Result:")
pred_df.head()
- After you make the second prediction, please wait for 2 minutes to see a clearer picture on the drift plot
- When we look at Model Drifts Dashboard at Grafana we will see plots with the model drift visible
ADD DRIFTED IMG WITH IMG FOLDER
Supplementary Files
model.py
import os
from typing import Optional
import pandas as pd
from starlette.exceptions import HTTPException
import joblib
model_pipeline = None
async def init(model_meta=None, *args, **kwargs):
global model_pipeline
current_dir = os.path.dirname(__file__)
model_file = os.path.join(current_dir, 'model.pkl')
if not os.path.exists(model_file):
raise HTTPException(status_code=404, detail=f"Could not locate model file: {model_file}")
model_pipeline = joblib.load(model_file)
async def predict(http_request, df: Optional[pd.DataFrame] = None, *args, **kwargs) -> pd.DataFrame:
if df is None:
raise HTTPException(status_code=500, detail="No dataframe received")
if 'charges' in df.columns:
# Dropping 'charges' since it is the target
df = df.drop('charges', axis=1)
# Making predictions
predictions = model_pipeline.predict(df)
# Converting predictions to a DataFrame
predictions_df = pd.DataFrame(predictions, columns=['Predictions'])
return predictions_df
Previous: Experiment Tracking Model Training | Next: Model Observability > Model Observability