Distributed DeepSpeed Training
This example demonstrates the process of setting up distributed workers for distributed training using Practicus AI DeepSpeed.
Focuses: - Configuring and launching distributed workers using Practicus AI. - Monitoring and logging distributed job performance and resource usage. - Terminating the distributed job after completion.
The train.py script and ds_config.json configuration files are used to define the model fine-tuning process.
Importing Libraries and Configuring Distributed Job
This step imports the required libraries, including practicuscore
, and sets up the configurations for a distributed job. The key elements are:
- job_dir
: Directory containing DeepSpeed configuration files and the training script.
- DistJobConfig
: Defines distributed job parameters such as worker count and termination policy.
- WorkerConfig
: Specifies worker parameters, including the Docker image, worker size, and startup script.
The configuration prepares a coordinator worker that initializes a distributed job.
Before you begin
- Create "deepspeed" under your "~/my" folder.
- And copy
train.py
andds_config.json
under this folder.
import practicuscore as prt
# DeepSpeed job directory must have default files ds_config.json and train.py (can be renamed)
job_dir = "~/my/deepspeed"
worker_count = 2
distributed_config = prt.DistJobConfig(
job_type = prt.DistJobType.deepspeed,
job_dir = job_dir,
worker_count = worker_count,
terminate_on_completion=False,
)
worker_config = prt.WorkerConfig(
worker_image="ghcr.io/practicusai/practicus-gpu-deepspeed",
worker_size="L-GPU",
log_level="DEBUG",
distributed_config=distributed_config
)
coordinator_worker = prt.create_worker(worker_config)
job_id = coordinator_worker.job_id
assert job_id, "Could not create distributed job"
Monitoring Distributed Job
The live_view
and view_log
utilities from the Practicus SDK are used to monitor the progress of the distributed job. This provides details such as:
- Job ID, start time, worker states, and GPU utilization.
- Resource allocation for each worker in the distributed cluster.
It helps in tracking the real-time status of the distributed job.
# For pair logs, you must specify pair IDs, e.g. 1:
prt.distributed.view_log(job_dir, job_id, rank=1)
Terminating Distributed Job Cluster
The distributed job cluster and all associated workers are terminated using the terminate
method of the coordinator worker.
Supplementary Files
ds_config.json
{
"train_batch_size": 4,
"gradient_accumulation_steps": 2,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001
}
},
"fp16": {
"enabled": true
},
"zero_optimization": {
"stage": 1
}
}
train.py
import torch
import torch.nn as nn
import deepspeed
import torch.distributed as dist
import os
import json
# Define a dummy large neural network
class LargeModel(nn.Module):
def __init__(self):
super(LargeModel, self).__init__()
self.layers = nn.Sequential(
nn.Linear(8192, 4096), nn.ReLU(),
nn.Linear(4096, 2048), nn.ReLU(),
nn.Linear(2048, 1024), nn.ReLU(),
nn.Linear(1024, 512), nn.ReLU(),
nn.Linear(512, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
nn.Linear(128, 1)
)
def forward(self, x):
return self.layers(x)
# Function to check FP16
def is_fp16_enabled(config_path="ds_config.json"):
with open(config_path) as f:
config = json.load(f)
return config.get("fp16", {}).get("enabled", False)
# Main training function
def train():
# Distributed setup
dist.init_process_group(
backend="nccl",
init_method=f"tcp://{os.environ['MASTER_ADDR']}:{os.environ['MASTER_PORT']}",
rank=int(os.environ['RANK']),
world_size=int(os.environ['WORLD_SIZE']),
)
device = torch.device(f"cuda:0")
model = LargeModel().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Initialize DeepSpeed
model, optimizer, _, _ = deepspeed.initialize(
model=model,
optimizer=optimizer,
config="ds_config.json"
)
# Training loop
for epoch in range(5):
optimizer.zero_grad()
# Creation of dummy train data
data = torch.randn(32768, 8192).to(device)
target = torch.randn(32768, 1).to(device)
# Convert data and target to FP16 if enabled
if is_fp16_enabled("ds_config.json"):
data, target = data.half(), target.half()
# Log memory usage and loss
loss = nn.MSELoss()(model(data), target)
model.backward(loss)
model.step()
print(f"[GPU {os.environ['RANK']}] Epoch {epoch}, Loss: {loss.item()}, "
f"Allocated VRAM: {torch.cuda.memory_allocated(device) / 1e9:.2f} GB")
# Clean cache
dist.destroy_process_group()
if __name__ == "__main__":
train()
Previous: XGBoost | Next: LLM Fine Tuning > Llms With DeepSpeed