Skip to content

Predicting Insurance Charges with Automated Machine Learning (AutoML)

In the insurance sector, accurately forecasting the costs associated with policyholders is crucial for pricing policies competitively while ensuring profitability. For insurance companies, the ability to predict these costs helps in tailoring individual policies, identifying key drivers of insurance costs, and ultimately enhancing customer satisfaction by offering policies that reflect a customer's specific risk profile and needs.

Objective

The primary goal of this notebook is to showcase how Practicus AI users can leverage AutoML to swiftly and proficiently develop a predictive model, minimizing the need for extensive manual modeling work. By engaging with this notebook, you'll acquire knowledge on how to:

  • Load the dataset specific to insurance costs

  • Using AutoML to train and tune a predictive model tailored for insurance charges prediction.

  • Assess the model's performance accurately

Let's embark on this journey!

Step 1: Setting Up the Environment

First, we need to set up our Python environment with the necessary libraries. PyCaret, an AutoML library, simplifies the machine learning workflow, enabling us to efficiently develop predictive models.
# Standard libraries for data manipulation and numerical operations
import pandas as pd
import numpy as np
from pycaret.regression import *  # Importing PyCaret's regression module
# Extras
import warnings
warnings.filterwarnings("ignore")

Step 2: Loading the Dataset

The dataset consists of 1,337 observations and 7 variables, with 'charges' being the target variable we aim to predict. This dataset is a common benchmark in insurance cost predictions.
data_set_conn = {
    "connection_type": "WORKER_FILE",
    "file_path": "/home/ubuntu/samples/insurance.csv"
}
import practicuscore as prt

worker = prt.get_local_worker()

proc = worker.load(data_set_conn, engine='AUTO') 

df = proc.get_df_copy()
display(df)

Step 3: Initializing the AutoML Experiment

PyCaret's regression module is utilized here for predicting a continuous target variable, i.e., insurance costs. We begin by initializing our AutoML experiment.
from pycaret.regression import RegressionExperiment, load_model, predict_model

exp = RegressionExperiment()
This step sets up our environment within PyCaret, allowing for automated feature engineering, model selection, and more.

Step 4: Configuring the Experiment

We'll configure our experiment with a specific name, making it easier to manage and reference.
# You need to configure using the service unique key, you can find your key on the "Practicus AI Admin Console" 
service_key = 'mlflow-primary'  
# Optionally, you can provide experiment name to create a new experiment while configuring
experiment_name = 'insuranceCharges'

prt.experiments.configure(service_key=service_key, experiment_name=experiment_name)
# No experiment service selected, will use MlFlow inside the Worker. To configure manually:
# configure_experiment(experiment_name=experiment_name, service_name='Experiment service name')

Step 5: Preparing Data with PyCaret's Setup

A critical step where we specify our experiment's details, such as the target variable, session ID for reproducibility, and whether to log the experiment for tracking purposes.
setup_params = {'normalize': True, 'normalize_method': 'minmax'}
exp.setup(data=df, target='charges', session_id=42, 
          log_experiment=True, feature_selection=True, experiment_name=experiment_name, **setup_params)

Step 6: Model Selection and Tuning

This command leverages AutoML to compare different models automatically, selecting the one that performs best according to a default or specified metric. It's a quick way to identify a strong baseline model without manual experimentation.
best_model = exp.compare_models()
Once a baseline model is selected, this step fine-tunes its hyperparameters to improve performance. The use of tune-sklearn and hyperopt indicates an advanced search across the hyperparameter space for optimal settings, which can significantly enhance model accuracy.
tune_params = {}
tuned_model = exp.tune_model(best_model, **tune_params)

Step 7: Finalizing the Model

After tuning, the model is finalized, meaning it's retrained on the entire dataset, including the validation set. This step ensures the model is as generalized as possible before deployment.
final_model = exp.finalize_model(tuned_model)
proc.kill()

Step 8: Predictions and Saving the Model

We predict insurance costs using our final model and save it for future use, ensuring operational scalability.
predictions = exp.predict_model(final_model, data=df)
display(predictions)
The last step involves saving the trained model for future use, such as deployment in a production environment or further evaluation. It ensures the model's availability beyond the current session, facilitating operationalization and scalability.
exp.save_model(final_model, 'model')
loaded_model = load_model('model')

predictions = predict_model(loaded_model, data=df)
display(predictions)

Summary

By following these steps, insurance companies can develop a predictive model for insurance costs using Practicus AI's AutoML capabilities. This approach reduces the need for extensive manual modeling, enabling insurers to efficiently adapt to changing market conditions and customer profiles.

Supplementary Files

bank_marketing/snippets/label_encoder.py

def label_encoder(df, text_col_list: list[str] | None = None):
    """
    Applies label encoding to specified categorical columns or all categorical columns in the dataframe if none are specified.
    :param text_col_list: Optional list of column names to apply label encoding. If None, applies to all categorical columns.
    """
    from sklearn.preprocessing import LabelEncoder

    le = LabelEncoder()

    # If text_col_list is provided, use it; otherwise, select all categorical columns
    if text_col_list is not None:
        categorical_cols = text_col_list
    else:
        categorical_cols = [col for col in df.columns if col.dtype == 'O']

    # Apply Label Encoding to each specified (or detected) categorical column
    for col in categorical_cols:
        # Check if the column exists in the DataFrame to avoid KeyError
        if col in df.columns:
            df[col] = le.fit_transform(df[col])
        else:
            print(f"Warning: Column '{col}' not found in DataFrame.")

    return df


label_encoder.worker_required = True

bank_marketing/snippets/one_hot.py

from enum import Enum


class DummyOption(str, Enum):
    DROP_FIRST = "Drop First Dummy"
    KEEP_ALL = "Keep All Dummies"


def one_hot(df, text_col_list: list[str] | None,
            max_categories: int = 25, dummy_option: DummyOption = DummyOption.KEEP_ALL,
            result_col_suffix: list[str] | None = None, result_col_prefix: list[str] | None = None):
    """
    Applies one-hot encoding to specified columns in the DataFrame. If no columns are specified,
    one-hot encoding is applied to all categorical columns that have a number of unique categories
    less than or equal to the specified max_categories. It provides an option to either drop the
    first dummy column to avoid multicollinearity or keep all dummy columns.

    :param text_col_list: List of column names to apply one-hot encoding. If None, applies to all
                          suitable categorical columns.
    :param max_categories: Maximum number of unique categories in a column to be included for encoding.
    :param dummy_option: Specifies whether to drop the first dummy column (DROP_FIRST) or keep all
                         (KEEP_ALL).
    :param result_col_suffix: Suffix for the new column where the suppressed data will be stored.
    :param result_col_prefix: Prefix for the new column where the suppressed data will be stored.
    """
    import pandas as pd

    if text_col_list is None:
        text_col_list = [col for col in df.columns if df[col].dtype == 'object' and df[col].nunique() <= max_categories]

    for col in text_col_list:
        dummies = pd.get_dummies(df[col], prefix=(result_col_prefix if result_col_prefix else col),
                                 drop_first=(dummy_option == DummyOption.DROP_FIRST))
        dummies = dummies.rename(columns=lambda x: f'{x}_{result_col_suffix}' if result_col_suffix else x)

        df = pd.concat([df, dummies], axis=1)

    return df

model_observability/model.py

import os
from typing import Optional
import pandas as pd
import numpy as np
from starlette.exceptions import HTTPException
import joblib

model_pipeline = None

def add_features(df):
    for column in df.select_dtypes(include='object'):
        mode_value = df[column].mode()[0]
        df[column] = df[column].fillna(mode_value)

    for column in df.select_dtypes(include='int64'):
        mean_value = df[column].mean()
        df[column] = df[column].fillna(mean_value)

    for column in df.select_dtypes(include='float64'):
        mean_value = df[column].mean()
        df[column] = df[column].fillna(mean_value)
    return df

async def init(model_meta=None, *args, **kwargs):
    global model_pipeline

    current_dir = os.path.dirname(__file__)
    model_file = os.path.join(current_dir, 'model.pkl')
    if not os.path.exists(model_file):
        raise HTTPException(status_code=404, detail=f"Could not locate model file: {model_file}")

    model_pipeline = joblib.load(model_file)


async def predict(http_request, df: Optional[pd.DataFrame] = None, *args, **kwargs) -> pd.DataFrame:
    if df is None:
        raise HTTPException(status_code=500, detail="No dataframe received")

    # Making predictions
    predictions = model_pipeline.predict(df)

    # Converting predictions to a DataFrame
    predictions_df = pd.DataFrame(predictions, columns=['income >50K'])

    return predictions_df

model_tracking/model_drift/model.py

import os
from typing import Optional
import pandas as pd
from starlette.exceptions import HTTPException
import joblib 


model_pipeline = None


async def init(model_meta=None, *args, **kwargs):
    global model_pipeline

    current_dir = os.path.dirname(__file__)
    model_file = os.path.join(current_dir, 'model.pkl')
    if not os.path.exists(model_file):
        raise HTTPException(status_code=404, detail=f"Could not locate model file: {model_file}")

    model_pipeline = joblib.load(model_file)


async def predict(http_request, df: Optional[pd.DataFrame] = None, *args, **kwargs) -> pd.DataFrame:
    if df is None:
        raise HTTPException(status_code=500, detail="No dataframe received")

    if 'charges' in df.columns:
        # Dropping 'charges' since it is the target
        df = df.drop('charges', axis=1)  

    # Making predictions
    predictions = model_pipeline.predict(df)

    # Converting predictions to a DataFrame
    predictions_df = pd.DataFrame(predictions, columns=['Predictions'])

    return predictions_df

sparkml/model.json

{
    "download_files_from": "cache/ice_cream_sparkml_model/",
    "_comment": "you can also define download_files_to otherwise, /var/practicus/cache is used"
}

sparkml/model.py

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.ml.regression import LinearRegressionModel

spark = None
model = None

# Make sure you downloaded the SparkML model files to the correct cache folder
MODEL_PATH = "/var/practicus/cache/ice_cream_sparkml_model"


async def init(*args, **kwargs):
    global spark, model
    if spark is None:
        spark = SparkSession.builder.appName("IceCreamRevenuePrediction").getOrCreate()
    if model is None:
        model = LinearRegressionModel.load(MODEL_PATH)


async def predict(df: pd.DataFrame | None = None, *args, **kwargs) -> pd.DataFrame:
    # Define schema for Spark DataFrame
    schema = StructType([
        StructField("features", DoubleType(), True)
    ])

    # Convert input Pandas DataFrame to Spark DataFrame
    spark_data = spark.createDataFrame(
        df.apply(lambda row: (Vectors.dense(float(row['Temperature'])),), axis=1),
        schema=["features"]
    )

    # Make predictions using the Spark model
    predictions = model.transform(spark_data)

    # Select the relevant columns and convert to Pandas DataFrame
    predictions_pd = predictions.select("features", "prediction").toPandas()

    # Extract the Temperature and predicted Revenue for readability
    predictions_pd["Temperature"] = predictions_pd["features"].apply(lambda x: x[0])
    predictions_pd = predictions_pd.rename(columns={"prediction": "predicted_Revenue"})
    predictions_pd = predictions_pd[["predicted_Revenue"]]

    return predictions_pd

xgboost/model.py

import os
import pandas as pd
import joblib

model_pipeline = None


async def init(model_meta=None, *args, **kwargs):
    global model_pipeline

    current_dir = os.path.dirname(__file__)
    model_file = os.path.join(current_dir, 'model.pkl')
    if not os.path.exists(model_file):
        raise FileNotFoundError(f"Could not locate model file: {model_file}")

    model_pipeline = joblib.load(model_file)


async def predict(http_request, df: pd.DataFrame | None = None, *args, **kwargs) -> pd.DataFrame:
    if df is None:
        raise ValueError("No dataframe received")

    if 'charges' in df.columns:
        # Dropping 'charges' since it is the target
        df = df.drop('charges', axis=1)

        # Making predictions
    predictions = model_pipeline.predict(df)

    # Converting predictions to a DataFrame
    predictions_df = pd.DataFrame(predictions, columns=['Predictions'])

    return predictions_df

xgboost/model_custom_df.py

import os
import pandas as pd
import joblib

model_pipeline = None


async def init(model_meta=None, *args, **kwargs):
    global model_pipeline

    current_dir = os.path.dirname(__file__)
    model_file = os.path.join(current_dir, 'model.pkl')
    if not os.path.exists(model_file):
        raise FileNotFoundError(f"Could not locate model file: {model_file}")

    model_pipeline = joblib.load(model_file)


async def predict(http_request, *args, **kwargs) -> pd.DataFrame:
    # Add the code that creates a dataframe using Starlette Request object http_request
    # E.g. read bytes using http_request.stream(), decode and pass to Pandas.
    raise NotImplemented("DataFrame generation code not implemented")

    if 'charges' in df.columns:
        # Dropping 'charges' since it is the target
        df = df.drop('charges', axis=1)

    # Making predictions
    predictions = model_pipeline.predict(df)

    # Converting predictions to a DataFrame
    predictions_df = pd.DataFrame(predictions, columns=['Predictions'])

    return predictions_df

Previous: XGBoost | Next: Shap Analysis