Executing batch jobs in Spark Cluster
In this example we will: - Create a Spark cluster - Submit a job python file - Terminate the cluster after job is completed.
Before you begin
- Create "spark" under your "~/my" folder
- And copy job.py under this (spark_with_job) folder
import practicuscore as prt
job_dir = "~/my/spark_with_job/"
distributed_config = prt.DistJobConfig(
job_type = prt.DistJobType.spark,
job_dir = job_dir,
py_file = "job.py",
worker_count = 2,
terminate_on_completion = False
)
worker_config = prt.WorkerConfig(
worker_size="Small",
distributed_config=distributed_config,
log_level="DEBUG"
)
coordinator_worker = prt.create_worker(
worker_config=worker_config,
)
# You can view the logs during or after the job is completed
# To view coordinator (master) set rank = 0
rank = 0
# To view other workers set rank = 1,2, ..
prt.distributed.view_log(
job_dir=job_dir,
job_id=coordinator_worker.job_id,
rank=rank
)
Supplementary Files
job.py
import practicuscore as prt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, lit, max, min, stddev, corr
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
spark = SparkSession.builder \
.appName("Advanced Data Processing") \
.getOrCreate()
file_path = "/home/ubuntu/samples/insurance.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)
missing_data = data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])
categorical_columns = ['sex', 'smoker', 'region']
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in categorical_columns]
data = data.withColumn("bmi_category",
when(col("bmi") < 18.5, lit("underweight"))
.when((col("bmi") >= 18.5) & (col("bmi") < 25), lit("normal"))
.when((col("bmi") >= 25) & (col("bmi") < 30), lit("overweight"))
.otherwise(lit("obese")))
feature_columns = ['age', 'bmi', 'children', 'sex_encoded', 'smoker_encoded', 'region_encoded']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])
data = pipeline.fit(data).transform(data)
output_path = "/home/ubuntu/my/processed_insurance_data.parquet/"
data.write.parquet(output_path, mode="overwrite")
spark.stop()
run/2c741e/prt_dist_job.json
{"job_type":"spark","job_dir":"~/my/02_batch_job/","initial_count":2,"coordinator_port":7077,"additional_ports":[4040,7078,7079],"terminate_on_completion":false,"py_file":"job.py","executors":[{"rank":0,"instance_id":"5cf16b71"},{"rank":1,"instance_id":"63e80dc8"}]}
run/2c741e/rank_0.json
{"rank":0,"instance_id":"5cf16b71","state":"completed","used_ram":1187,"peak_ram":1187,"total_ram":3200,"gpus":0,"used_vram":0,"peak_vram":0,"reserved_vram":0,"total_vram":0}
run/2c741e/rank_1.json
{"rank":1,"instance_id":"63e80dc8","state":"running","used_ram":284,"peak_ram":293,"total_ram":3200,"gpus":0,"used_vram":0,"peak_vram":0,"reserved_vram":0,"total_vram":0}
Previous: SparkML Ice Cream | Next: Spark For Ds > Spark Tutorial