Skip to content

Executing batch jobs in Spark Cluster

In this example we will: - Create a Spark cluster - Submit a job python file - Terminate the cluster after job is completed.

Before you begin

  • Create "spark" under your "~/my" folder
  • And copy job.py under this (spark_with_job) folder
import practicuscore as prt

job_dir = "~/my/spark_with_job/"


distributed_config = prt.DistJobConfig(
    job_type = prt.DistJobType.spark,
    job_dir = job_dir,
    py_file = "job.py",
    worker_count = 2,
    terminate_on_completion = False
)

worker_config = prt.WorkerConfig(
    worker_size="Small",
    distributed_config=distributed_config,
    log_level="DEBUG"
)
coordinator_worker = prt.create_worker(
    worker_config=worker_config,
)
prt.distributed.live_view(
    job_dir=job_dir,
    job_id=coordinator_worker.job_id,

)
# You can view the logs during or after the job is completed
# To view coordinator (master) set rank = 0
rank = 0
# To view other workers set rank = 1,2, ..

prt.distributed.view_log(
    job_dir=job_dir,
    job_id=coordinator_worker.job_id,
    rank=rank
)

Supplementary Files

job.py

import practicuscore as prt 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, lit, max, min, stddev, corr
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

spark = SparkSession.builder \
    .appName("Advanced Data Processing") \
    .getOrCreate()

file_path = "/home/ubuntu/samples/insurance.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)
missing_data = data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])

categorical_columns = ['sex', 'smoker', 'region']
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in categorical_columns]

data = data.withColumn("bmi_category", 
                       when(col("bmi") < 18.5, lit("underweight"))
                       .when((col("bmi") >= 18.5) & (col("bmi") < 25), lit("normal"))
                       .when((col("bmi") >= 25) & (col("bmi") < 30), lit("overweight"))
                       .otherwise(lit("obese")))

feature_columns = ['age', 'bmi', 'children', 'sex_encoded', 'smoker_encoded', 'region_encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])
data = pipeline.fit(data).transform(data)

output_path = "/home/ubuntu/my/processed_insurance_data.parquet/"

data.write.parquet(output_path, mode="overwrite")

spark.stop()

run/2c741e/prt_dist_job.json

{"job_type":"spark","job_dir":"~/my/02_batch_job/","initial_count":2,"coordinator_port":7077,"additional_ports":[4040,7078,7079],"terminate_on_completion":false,"py_file":"job.py","executors":[{"rank":0,"instance_id":"5cf16b71"},{"rank":1,"instance_id":"63e80dc8"}]}

run/2c741e/rank_0.json

{"rank":0,"instance_id":"5cf16b71","state":"completed","used_ram":1187,"peak_ram":1187,"total_ram":3200,"gpus":0,"used_vram":0,"peak_vram":0,"reserved_vram":0,"total_vram":0}

run/2c741e/rank_1.json

{"rank":1,"instance_id":"63e80dc8","state":"running","used_ram":284,"peak_ram":293,"total_ram":3200,"gpus":0,"used_vram":0,"peak_vram":0,"reserved_vram":0,"total_vram":0}

Previous: SparkML Ice Cream | Next: Spark For Ds > Spark Tutorial