mirror of
https://github.com/datawhalechina/llms-from-scratch-cn.git
synced 2026-05-02 12:28:18 +08:00
Add files via upload
This commit is contained in:
parent
e0678d43bf
commit
9c08e88d13
179
appendix-A/03_main-chapter-code/DDP-script.py
Normal file
179
appendix-A/03_main-chapter-code/DDP-script.py
Normal file
@ -0,0 +1,179 @@
|
||||
# 附录A :PyTorch的介绍(第三部分)
|
||||
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
from torch.utils.data import Dataset, DataLoader
|
||||
|
||||
# 导入新的库
|
||||
import os
|
||||
import torch.multiprocessing as mp
|
||||
from torch.utils.data.distributed import DistributedSampler
|
||||
from torch.nn.parallel import DistributedDataParallel as DDP
|
||||
from torch.distributed import init_process_group, destroy_process_group
|
||||
|
||||
|
||||
# 创建一个新的函数用于初始化一个分布式进程(每个GPU一个进程)
|
||||
# 该函数允许进程之间的通信
|
||||
def ddp_setup(rank, world_size):
|
||||
"""
|
||||
提示:
|
||||
rank:特定的进程编号(进程ID)
|
||||
world_size:组内的进程总数
|
||||
"""
|
||||
|
||||
# 正在运行的机器编号 ID:进程0
|
||||
# 这里的前提是假设所有的GPU在同一台机器上
|
||||
os.environ["MASTER_ADDR"] = "localhost"
|
||||
# 机器上任意的空闲端口号
|
||||
os.environ["MASTER_PORT"] = "12345"
|
||||
|
||||
# 初始化进程
|
||||
# Windows 用户使用"gloo"来替代下面代码中的"nccl"
|
||||
# nccl: NVIDIA Collective Communication Library
|
||||
init_process_group(backend="nccl", rank=rank, world_size=world_size)
|
||||
torch.cuda.set_device(rank)
|
||||
|
||||
|
||||
class ToyDataset(Dataset):
|
||||
def __init__(self, X, y):
|
||||
self.features = X
|
||||
self.labels = y
|
||||
|
||||
def __getitem__(self, index):
|
||||
one_x = self.features[index]
|
||||
one_y = self.labels[index]
|
||||
return one_x, one_y
|
||||
|
||||
def __len__(self):
|
||||
return self.labels.shape[0]
|
||||
|
||||
|
||||
class NeuralNetwork(torch.nn.Module):
|
||||
def __init__(self, num_inputs, num_outputs):
|
||||
super().__init__()
|
||||
|
||||
self.layers = torch.nn.Sequential(
|
||||
# 第一个隐藏层
|
||||
torch.nn.Linear(num_inputs, 30),
|
||||
torch.nn.ReLU(),
|
||||
|
||||
# 第二个隐藏层
|
||||
torch.nn.Linear(30, 20),
|
||||
torch.nn.ReLU(),
|
||||
|
||||
# 输出层
|
||||
torch.nn.Linear(20, num_outputs),
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
logits = self.layers(x)
|
||||
return logits
|
||||
|
||||
|
||||
def prepare_dataset():
|
||||
X_train = torch.tensor([
|
||||
[-1.2, 3.1],
|
||||
[-0.9, 2.9],
|
||||
[-0.5, 2.6],
|
||||
[2.3, -1.1],
|
||||
[2.7, -1.5]
|
||||
])
|
||||
y_train = torch.tensor([0, 0, 0, 1, 1])
|
||||
|
||||
X_test = torch.tensor([
|
||||
[-0.8, 2.8],
|
||||
[2.6, -1.6],
|
||||
])
|
||||
y_test = torch.tensor([0, 1])
|
||||
|
||||
train_ds = ToyDataset(X_train, y_train)
|
||||
test_ds = ToyDataset(X_test, y_test)
|
||||
|
||||
train_loader = DataLoader(
|
||||
dataset=train_ds,
|
||||
batch_size=2,
|
||||
shuffle=False, # 这里设置为False
|
||||
pin_memory=True,
|
||||
drop_last=True,
|
||||
# 在多个GPU上划分批次,确保批次之间不重叠样本
|
||||
sampler=DistributedSampler(train_ds)
|
||||
)
|
||||
test_loader = DataLoader(
|
||||
dataset=test_ds,
|
||||
batch_size=2,
|
||||
shuffle=False,
|
||||
)
|
||||
return train_loader, test_loader
|
||||
|
||||
|
||||
# 包装器
|
||||
def main(rank, world_size, num_epochs):
|
||||
|
||||
ddp_setup(rank, world_size) #
|
||||
|
||||
train_loader, test_loader = prepare_dataset()
|
||||
model = NeuralNetwork(num_inputs=2, num_outputs=2)
|
||||
model.to(rank)
|
||||
optimizer = torch.optim.SGD(model.parameters(), lr=0.5)
|
||||
|
||||
model = DDP(model, device_ids=[rank]) # 使用分布式数据并行(DDP)将模型进行包装
|
||||
# 现在核心模型可以通过 model.module 访问
|
||||
|
||||
for epoch in range(num_epochs):
|
||||
|
||||
model.train()
|
||||
for features, labels in enumerate(train_loader):
|
||||
|
||||
features, labels = features.to(rank), labels.to(rank)
|
||||
logits = model(features)
|
||||
loss = F.cross_entropy(logits, labels) # 损失函数
|
||||
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
### 日志
|
||||
print(f"[GPU{rank}] Epoch: {epoch+1:03d}/{num_epochs:03d}"
|
||||
f" | Batchsize {labels.shape[0]:03d}"
|
||||
f" | Train/Val Loss: {loss:.2f}")
|
||||
|
||||
model.eval()
|
||||
train_acc = compute_accuracy(model, train_loader, device=rank)
|
||||
print(f"[GPU{rank}] Training accuracy", train_acc)
|
||||
test_acc = compute_accuracy(model, test_loader, device=rank)
|
||||
print(f"[GPU{rank}] Test accuracy", test_acc)
|
||||
|
||||
destroy_process_group() # 清理退出分布式模式
|
||||
|
||||
|
||||
def compute_accuracy(model, dataloader, device):
|
||||
model = model.eval()
|
||||
correct = 0.0
|
||||
total_examples = 0
|
||||
|
||||
for idx, (features, labels) in enumerate(dataloader):
|
||||
features, labels = features.to(device), labels.to(device)
|
||||
|
||||
with torch.no_grad():
|
||||
logits = model(features)
|
||||
predictions = torch.argmax(logits, dim=1)
|
||||
compare = labels == predictions
|
||||
correct += torch.sum(compare)
|
||||
total_examples += len(compare)
|
||||
return (correct / total_examples).item()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("PyTorch version:", torch.__version__)
|
||||
print("CUDA available:", torch.cuda.is_available())
|
||||
print("Number of GPUs available:", torch.cuda.device_count())
|
||||
|
||||
torch.manual_seed(123)
|
||||
|
||||
# 新建进程
|
||||
# 请注意,spawn会自动传递排名
|
||||
num_epochs = 3
|
||||
world_size = torch.cuda.device_count()
|
||||
mp.spawn(main, args=(world_size, num_epochs), nprocs=world_size)
|
||||
# nprocs=world_size 会为每个GPU生成一个进程
|
||||
|
||||
Loading…
Reference in New Issue
Block a user