Starting a Batch Job on auto-scaled Spark Cluster
This example demonstrates how to set up and run an auto-scaled batch job in Practicus AI. Instead of launching a fixed-size environment, we will create a batch job with the ability to automatically scale its compute resources based on demand.
Important note on worker container image
- Unlike standard Spark cluster, auto-scaled Spark cluster executors have a separate type of container image
ghcr.io/practicusai/practicus-spark
- This means packages accessible to coordinator worker might not be accessible to the executors.
- To install packages please install to both the coordinator and the executor images and create custom container images.
- While creating the spark client, you can then pass arguments to specify which executor image to use.
Important note on privileged access
- For auto-scaled Spark to work,
you will need additional privileges
on the Kubernetes cluster. - Please ask your admin to grant you access to worker size definitions with privileged access before you continue with this example.
Finding an Auto-Scaled (Privileged) Worker Size
Let's identify a worker size that supports auto-scaling and includes the required privileged capabilities for running batch jobs.
import practicuscore as prt
region = prt.get_default_region()
auto_dist_worker_size = None
for worker_size in region.worker_size_list:
if worker_size.auto_distributed:
auto_dist_worker_size = worker_size.name
break
assert auto_dist_worker_size, "You do not have access to any auto-distributed (privileged) worker sizes"
print("Located an auto-distributed (privileged) worker size:", auto_dist_worker_size)
# Configure distributed job settings
# This example uses Spark with auto-scaling capabilities.
distributed_config = prt.distributed.JobConfig(
job_type=prt.distributed.JobType.spark,
auto_distributed=True, # Enables automatic scaling of the Spark cluster
initial_count=1, # Start with 1 executor plus the coordinator (2 workers total)
max_count=4 # Allow the cluster to scale up to 4 additional executors if needed
)
# Define the worker configuration
# Ensure that the chosen worker size includes privileged access
# to support auto-scaling.
worker_config = prt.WorkerConfig(
worker_size=auto_dist_worker_size,
distributed_config=distributed_config,
)
# Note: We are not creating a coordinator worker interactively here
# since this setup is intended for batch tasks rather than interactive sessions.
# Example: coordinator_worker = prt.create_worker(worker_config)
# Running the batch job:
# - Starts a worker
# - Submits 'job.py' to run on the cluster
# - 'job.py' creates a Spark session and triggers cluster creation
# with multiple executors as defined above.
# - Monitors execution and prints progress
# - On completion, terminates the Spark session and executors
worker, success = prt.run_task(
file_name="job.py",
worker_config=worker_config,
terminate_on_completion=False # Leave the cluster running until we decide to terminate
)
if success:
print("Job is successful, terminating cluster.")
worker.terminate()
else:
print("Job failed, opening notebook on coordinator to analyze.")
worker.open_notebook()
Supplementary Files
job.py
import practicuscore as prt
print("Requesting a Spark session...")
spark = prt.distributed.get_client()
# Create a sample DataFrame
data = [("Alice", 29), ("Bob", 34), ("Cathy", 23)]
columns = ["Name", "Age"]
print("Creating DataFrame...")
df = spark.createDataFrame(data, columns)
print("Applying filter: Age > 30")
df_filtered = df.filter(df.Age > 30)
print("Filtered results:")
df_filtered.show()
# Note:
# Auto-scaled Spark executors are different from standard Practicus AI workers.
# They use a specialized container image and do not have direct access to
# `~/my` or `~/shared` directories.
# For saving results, consider using a data lake or object storage.
Previous: Use Cluster | Next: Dask > Interactive > Start Cluster