[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[toc]
0x00 摘要
本文是 PyTorch 分布式的第三篇,继续上文,介绍 DataPrallel 的并行操作和反向传播。
本系列其他文章如下:
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
0x01 前向操作
我们先回忆一下目前的前向图,replicate 调用了Broadcast.forward,同时往其context 存储了input_device和num_inputs。
+----------------------------------------------------------------------------------------+ | DataParallel.forward | | | | | | replicate +---------------> parallel_apply gather | | | +----------------------------------------------------------------------------------------+ +---------------------------+ | Broadcast | | | | | | | | forward() +-----------> | | | | | +---------------------+ | | | ctx | | | | input_device | | | | | | | | num_inputs | | | | | | | +---------------------+ | | | | | | | | | | | | | +---------------------------+
1.1 并行
目前,我们已经使用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,用 Replicate 函数将模型从 device[0] 复制到不同的卡,这样各个卡都有了同样的模型和不同的数据,现在就要分别调用 forward 计算损失和梯度。也就是 parallel_apply 部分。
# 分发数据 inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids) # 分发模型 replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # 并行训练 outputs = self.parallel_apply(replicas, inputs, kwargs)
对应我们传播图是:
parallel_apply 是基于threading 实现,用前面准备好的 replica 和输入数据,然后for 循环启动多线程进行前向传播,最后输出传播结果。
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None): # 确保模型和输入大小一致 assert len(modules) == len(inputs) # 确保每个 GPU 都有相应的元数据,如没有就空白补全 if kwargs_tup is not None: # 在前面已经补全 assert len(modules) == len(kwargs_tup) else: kwargs_tup = ({},) * len(modules) # 确保模型数目和CPU数目一致 if devices is not None: assert len(modules) == len(devices) else: devices = [None] * len(modules) devices = [_get_device_index(x, True) for x in devices] # 基于threading多线程实现 lock = threading.Lock() results = {} grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled() # 定义 worker def _worker(i, module, input, kwargs, device=None): torch.set_grad_enabled(grad_enabled) if device is None: device = get_a_var(input).get_device() try: # 设置当前的设备 with torch.cuda.device(device), autocast(enabled=autocast_enabled): # this also avoids accidental slicing of `input` if it is a Tensor if not isinstance(input, (list, tuple)): input = (input,) output = module(*input, **kwargs) # 前向操作 with lock: # 并行计算得到输出 results[i] = output except Exception: with lock: results[i] = ExceptionWrapper( where="in replica {} on device {}".format(i, device)) if len(modules) > 1: # 如有一个进程控制多个 GPU ,起多个线程 # 注意,这里就是每个 worker 调用了 modules 数组中的一个模型copy threads = [threading.Thread(target=_worker, args=(i, module, input, kwargs, device)) for i, (module, input, kwargs, device) in enumerate(zip(modules, inputs, kwargs_tup, devices))] for thread in threads: thread.start() for thread in threads: thread.join() else: # 一个GPU对应一个进程 _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0]) outputs = [] for i in range(len(inputs)): output = results[i] # error handle if isinstance(output, ExceptionWrapper): output.reraise() outputs.append(output) # 输出 n 个计算结果 return outputs
此时前向传播具体对应如下图,现在并行操作调用了 module 的forward方法。
+----------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply gather | | | +----------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ | Broadcast | | module | | | | | | | | | | 1 | | 2 | | forward() +-----------> | forward() +---------> | | | | | | | | | +---------------------+ | | | | | ctx | | | | | | input_device | | | | | | | | | | | | num_inputs | | | | | | | | | | | +---------------------+ | | | | | | | | | | | | | | | | | | | | | | | | | | | +---------------------------+ +-------------------+
1.2 Gather
目前,我们已经使用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,用 Replicate 函数将模型从 device[0] 复制到不同的卡,这样各个卡都有了同样的模型和不同的数据,然后分别调用 forward 计算损失和梯度。也就是 parallel_apply 部分。
现在要做的就是把分布式计算的梯度合并到 device[0],就是 self.output_device。
# 分发数据 inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids) # 分发模型 replicas = self.replicate(self.module, self.device_ids[:len(inputs)]) # 并行训练 outputs = self.parallel_apply(replicas, inputs, kwargs) # 收集到 devices[0] return self.gather(outputs, self.output_device)
对应我们传播图是:
我们看看如何把结果收集到 device[0],以及device[0]如何作为参数服务器。
1.2.1 Python世界
gather 主要是调用 Gather.apply(target_device, dim, *outputs) 完成收集工作。
def gather(outputs, target_device, dim=0): # target_device 就是 device[0] r""" Gathers tensors from different GPUs on a specified device (-1 means the CPU). """ def gather_map(outputs): out = outputs[0] if isinstance(out, torch.Tensor): return Gather.apply(target_device, dim, *outputs) # 调用下面的 Gather if out is None: return None if isinstance(out, dict): return type(out)(((k, gather_map([d[k] for d in outputs])) for k in out)) return type(out)(map(gather_map, zip(*outputs))) # Recursive function calls like this create reference cycles. # Setting the function to None clears the refcycle. try: res = gather_map(outputs) finally: gather_map = None return res
Gather 则调用了 comm.gather 完成工作,而 comm.gather 则会带领我们进入到 C++世界。
我们省略一些校验代码。
# Gather 源码 class Gather(Function): @staticmethod def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0] # 下面会往 context 内部存放几个变量,后续会用到 target_device = _get_device_index(target_device, True) ctx.target_device = target_device ctx.dim = dim ctx.input_gpus = tuple(i.get_device() for i in inputs) if all(t.dim() == 0 for t in inputs) and dim == 0: inputs = tuple(t.view(1) for t in inputs) ctx.unsqueezed_scalar = True else: ctx.unsqueezed_scalar = False ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs) return comm.gather(inputs, ctx.dim, ctx.target_device) # 这里会进入C++世界 @staticmethod def backward(ctx, grad_output): # 注意,这里后续会用到 scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output) if ctx.unsqueezed_scalar: scattered_grads = tuple(g[0] for g in scattered_grads) return (None, None) + scattered_grads
现在前向计算如图:
gather 调用到了Gather的forward 函数,forward 方法在 ctx 存储了 input_gpus, input_sizes, dim 这三个变量,这些变量后续会用到。
+-----------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +-----------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | | | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | +---------------------------+ +-------------------+ +--------------------+
1.2.2 C++世界
gather 函数调用了 _gather_out_impl 来完成拷贝操作。
at::Tensor gather( at::TensorList tensors, int64_t dim, c10::optional<int32_t> destination_index) { // destination_index 就是 device[0] 的index int64_t total_size = 0; auto& first = tensors.front(); const auto first_size = first.sizes(); dim = at::maybe_wrap_dim(dim, first); std::vector<int64_t> expected_size(first_size.begin(), first_size.end()); auto memory_format = first.suggest_memory_format(); for (size_t i = 0; i < tensors.size(); i++) { const auto& tensor = tensors[i]; expected_size[dim] = tensor.size(dim); total_size += tensor.size(dim); if (memory_format != MemoryFormat::Contiguous && tensor.suggest_memory_format() != memory_format) { memory_format = MemoryFormat::Contiguous; } } expected_size[dim] = total_size; at::Device device(DeviceType::CPU); // 根据 index 得到输出的目标设备 if (!destination_index || *destination_index != -1) { // device 就是 GPU 0 这个设备 device = at::Device( DeviceType::CUDA, destination_index ? *destination_index : -1); } //首先,构建一个空的目标tensor建立在目标设备之上,命名为result at::Tensor result = at::empty(expected_size, first.options().device(device), memory_format); return _gather_out_impl(tensors, result, dim); // 然后对result进行gather }
_gather_out_impl 执行了具体的gather 操作,就是把 输入的tensors 拷贝到 目标 tensor 之上,即拷贝到 GPU0 之上。
// ***************** Gather ******************* // // Gather a list of CUDA tensors on one or more devices to a target tensor or // device, either CPU or CUDA. // no checks static inline at::Tensor& _gather_out_impl( at::TensorList tensors, at::Tensor& out_tensor, int64_t dim) { std::vector<int64_t> chunk_sizes; chunk_sizes.reserve(tensors.size()); for (auto& tensor : tensors) { chunk_sizes.push_back(tensor.size(dim)); } auto chunks = out_tensor.split_with_sizes(/*split_sizes=*/chunk_sizes, /*dim=*/dim); for (size_t i = 0; i < tensors.size(); i++) { // 拷贝到GPU 0 之上 chunks[i].copy_(tensors[i], /*non_blocking=*/out_tensor.is_cuda()); } return out_tensor; }
0x02 计算损失
现在,我们已经把梯度收集到 device[0] 之上,现在我们开始进行反向传播,其整体逻辑如上图所示。首先是在 device[0] 计算损失。其实这步计算损失算是前向计算和后向传播的中间环节,这里把它算成是反向传播的开端,如下图。
我们找出来示例代码看看,里面关键的几点:
- 数据已经放到了默认GPU,即GPU 0上。
- prediction 是gather到 GPU 0 的前向计算输出。
- 使用
loss = criterion(prediction,target_var)
在默认GPU之上计算loss。 - 使用 loss.backward() 开始反向传播。
for batch_idx, (data, label) in pbar: if args.cuda: data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上 data_v = Variable(data) target_var = Variable(label) prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出 # 到目前为止,我们完成了DataParallel.forward() #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里 #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果聚合到主gpu里 criterion = nn.CrossEntropyLoss() loss = criterion(prediction,target_var) # 3. 在默认GPU之上计算loss optimizer.zero_grad() loss.backward() # 4. 开始反向传播 optimizer.step()
0x03 后向传播
我们前面运行的是上面的 Forward 部分,计算损失,接下来就运行上面代码中
loss.backward()部分。
3.1 分发梯度
我们首先来到分发梯度部分,这部分作用是:把损失在 GPUs 之间 scatter,这样后续才可以在每个GPU之上独立进行后向传播。对应下图:
3.1.1 Gather.backward
前面有提到,prediction 是gather到 GPU 0 的前向计算输出。而 loss 又是根据 prediction 计算出来,所以从
loss.backward()开始反向传播,从后向前的第一个步骤就来到了 gather 的传播操作,对应的就是 Gather 的 backward 函数,其中的核心代码是 Scatter.apply。
class Gather(Function): # 这里前向传播用到了,为了对照,我们依然贴出来 @staticmethod def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0] # 下面会往 context 内部存放几个变量,后续会用到 target_device = _get_device_index(target_device, True) ctx.target_device = target_device ctx.dim = dim ctx.input_gpus = tuple(i.get_device() for i in inputs) if all(t.dim() == 0 for t in inputs) and dim == 0: inputs = tuple(t.view(1) for t in inputs) ctx.unsqueezed_scalar = True else: ctx.unsqueezed_scalar = False ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs) # 这里会进入C++世界,把输出聚集到 GPU 0。 return comm.gather(inputs, ctx.dim, ctx.target_device) @staticmethod def backward(ctx, grad_output): # 这里现在后向传播用到了! # 把前向传播在 context 之中存放的变量取出,作为 Scatter 的输入 scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output) if ctx.unsqueezed_scalar: scattered_grads = tuple(g[0] for g in scattered_grads) return (None, None) + scattered_grads
具体如下,可以看到,backward 使用了之前前向传播时候存储的 ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output,以此调用 Scatter.apply。
图中,最上面是前向传播过程,最下面是反向传播过程,中间是某些在前后传播中都用到的代码模块。
+--------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +--------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | | | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | | | | <---------+ backward() | | | | | | 3 | | | | | | | +---------------------------+ +-------------------+ +--------------------+ +--------------------------------------------------------------------------------------+ | loss.backward() | | 3 | | <--------------------+ | | | | | +--------------------------------------------------------------------------------------+
3.1.2 Scatter
Scatter.apply 实际上调用到了其 forward 方法。
- 首先从上下文之中提取之前存储的变量,这里主要是输入设备 input_device(源设备)和 target_gpus(目标设备)。
- 获取到目标设备的流。
- 调用 comm.scatter 把梯度分发到目标设备。
class Scatter(Function): @staticmethod def forward(ctx, target_gpus, chunk_sizes, dim, input): target_gpus = [_get_device_index(x, True) for x in target_gpus] ctx.dim = dim ctx.input_device = input.get_device() if input.device.type != "cpu" else -1 streams = None if torch.cuda.is_available() and ctx.input_device == -1: # Perform CPU to GPU copies in a background stream streams = [_get_stream(device) for device in target_gpus] # 分发到其他GPU outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams) # Synchronize with the copy stream if streams is not None: for i, output in enumerate(outputs): with torch.cuda.device(target_gpus[i]): main_stream = torch.cuda.current_stream() main_stream.wait_stream(streams[i]) output.record_stream(main_stream) return outputs @staticmethod def backward(ctx, *grad_output): return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
3.1.3 C++
上面python代码
outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)会直接进入到C++世界。具体代码位于 torch/csrc/cuda/comm.cpp。
scatter 的作用就是把tensor进行split,然后分发给各个设备的流。
std::vector<at::Tensor> scatter( const at::Tensor& tensor, at::IntArrayRef devices, const c10::optional<std::vector<int64_t>>& chunk_sizes, int64_t dim, const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>& streams) { dim = at::maybe_wrap_dim(dim, tensor); // 把tensor进行split std::vector<at::Tensor> chunks = chunk_sizes ? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim) : tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim); at::cuda::OptionalCUDAStreamGuard cuda_guard; for (size_t i = 0; i < chunks.size(); ++i) { const auto device_index = static_cast<int16_t>(devices[i]); if (device_index != tensor.get_device()) { if (i < (streams ? streams->size() : 0U) && (*streams)[i]) { cuda_guard.reset_stream(*(*streams)[i]); } // 发送给各个设备的流 chunks[i] = chunks[i].to( {DeviceType::CUDA, device_index}, /*non_blocking=*/true, /*copy=*/false, /*memory_format=*/at::MemoryFormat::Preserve); } } return chunks; }
3.2 并行后向传播
现在梯度已经分发到各个 GPU,接下来正式进入并行后向传播,这部分作用是:在各个GPU之上并行运行后向传播,计算参数梯度。对应下图:
这部分调用到了原始模型的 backward,具体如下图中的数值 4:
+--------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +--------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | | | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | | <---------+ backward() | <---------+ backward() | | | | 4 | | 3 | | | | | | | +---------------------------+ +-------------------+ +--------------------+ +--------------------------------------------------------------------------------------+ | loss.backward() | | 4 3 | | <------------------+ <--------------------+ | | | | | +--------------------------------------------------------------------------------------+
3.3 归并梯度
这部分作用是 在 GPU 0 之上归并梯度,总体流程拓展对应下图:
3.3.1 Broadcast.backward
这部分对应了 Broadcast 的 反向传播。
class Broadcast(Function): @staticmethod def forward(ctx, target_gpus, *inputs): target_gpus = [_get_device_index(x, True) for x in target_gpus] # 前向传播时候,向上下文存入了一些变量 ctx.target_gpus = target_gpus if len(inputs) == 0: return tuple() ctx.num_inputs = len(inputs) # input 放在 device[0],所以 input_device 就是 GPU 0 ctx.input_device = inputs[0].get_device() # 和 detach 的情形一样 outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus) non_differentiables = [] # 在上下文中设置哪些不需要梯度 for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]): if not input_requires_grad: for output in outputs: non_differentiables.append(output[idx]) ctx.mark_non_differentiable(*non_differentiables) return tuple([t for tensors in outputs for t in tensors]) @staticmethod def backward(ctx, *grad_outputs): # 反向传播来到这里,取出之前在上下文存放的变量作为输入。ctx.input_device 就是之前存储的 GPU 0。 return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
因此,我们可以拓展流程图:
+--------------------------------------------------------------------------------------+ | DataParallel.forward | | | | 1 2 3 | | replicate +---------------> parallel_apply +--------------> gather | | | +--------------------------------------------------------------------------------------+ +---------------------------+ +-------------------+ +--------------------+ | Broadcast | | module | |Gather | | | | | | | | | | | | | | 1 | | 2 | | 3 | | forward() +-----------> | forward() +--------> | forward() | | | | | | | | | | | | | | +---------------------+ | | | | +----------------+ | | | ctx | | | | | |ctx | | | | input_device | | | | | | input_gpus | | | | | | | | | | | | | | num_inputs | | | | | | input_sizes| | | | | | | | | | | | | +---------------------+ | | | | | dim | | | | | | | +----------------+ | | | | | | | | | | | | | | backward() | <---------+ backward() | <---------+ backward() | | 5 | | 4 | | 3 | | | | | | | +---------------------------+ +-------------------+ +--------------------+ +--------------------------------------------------------------------------------------+ | loss.backward() | | 5 4 3 | | <------------------------+ <------------------+ <--------------------+ | | | | | +--------------------------------------------------------------------------------------+
3.3.2 ReduceAddCoalesced
Broadcast.backward 调用了 ReduceAddCoalesced.apply,其对应了 ReduceAddCoalesced 的 forward 方法,目的是把梯度归并到目标设备 destination,就是GPU 0。
class ReduceAddCoalesced(Function): @staticmethod # 会调用到这里,destination 是GPU 0 def forward(ctx, destination, num_inputs, *grads): # 从梯度之中提取所在的设备 ctx.target_gpus = [grads[i].get_device() for i in range(0, len(grads), num_inputs)] grads_ = [grads[i:i + num_inputs] for i in range(0, len(grads), num_inputs)] # 把梯度归并到目标设备 destination,就是GPU 0 return comm.reduce_add_coalesced(grads_, destination) @staticmethod def backward(ctx, *grad_outputs): return (None, None,) + Broadcast.apply(ctx.target_gpus, *grad_outputs)
3.3.3 c++
看注释就是:从多个 GPU 来相加梯度,代码之中就是归并相加。
def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760): """Sums tensors from multiple GPUs. Small tensors are first coalesced into a buffer to reduce the number of synchronizations. Args: inputs (Iterable[Iterable[Tensor]]): iterable of iterables that contain tensors from a single device. destination (int, optional): a device on which the output will be placed (default: current device). buffer_size (int): maximum size of the buffer used for coalescing Returns: A tuple of tensors containing an elementwise sum of each group of inputs, placed on the ``destination`` device. """ dense_tensors: List[List] = [[] for _ in inputs] # shape (num_gpus, num_tensors) output = [] ref_order = [] # process sparse ones first since they may have different sizes on different gpus for tensor_at_gpus in zip(*inputs): if all(t.is_sparse for t in tensor_at_gpus): # 进行归并 result = reduce_add(tensor_at_gpus, destination) # this will be sparse too output.append(result) ref_order.append(tensor_at_gpus[0]) else: for coll, t in zip(dense_tensors, tensor_at_gpus): coll.append(t.to_dense() if t.is_sparse else t) ref_order.append(dense_tensors[0][-1]) itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors] # now the dense ones, which have consistent sizes for chunks in zip(*itrs): flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks] # (num_gpus,) # 进行归并 flat_result = reduce_add(flat_tensors, destination) for t in _unflatten_dense_tensors(flat_result, chunks[0]): # The unflattened tensors do not share storage, and we don't expose # base flat tensor anyways, so give them different version counters. # See NOTE [ Version Counter in comm.*_coalesced ] output.append(t.data) return tuple(_reorder_tensors_as(output, ref_order))
3.4 更新模型参数
这部分功能是:更新梯度参数。进行梯度下降,并更新主GPU上的模型参数。
另外,由于模型参数仅在主GPU上更新,而其他从属GPU此时并不是同步更新的,所以需要将更新后的模型参数复制到剩余的从属 GPU 中,以此来实现并行。这就是在下一次for循环之中进行,以此循环反复。
对应示例代码是:
for batch_idx, (data, label) in pbar: # 6. 下一次迭代会继续从分发开始 if args.cuda: data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上 data_v = Variable(data) target_var = Variable(label) prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出 # 到目前为止,我们完成了DataParallel.forward() #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在在前向传播里 #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果和到主gpu里 criterion = nn.CrossEntropyLoss() loss = criterion(prediction,target_var) # 3. 在默认GPU之上计算loss optimizer.zero_grad() loss.backward() # 4. 开始反向传播 optimizer.step() # 5. 更新模型
0x04 总结
我们总结一下流程,起初数据和模型被放入到默认GPU,就是 GPU 0,然后迭代如下:
- scatter 会把数据分发到其他 GPU。
- replicate 会把模型分发到其他 GPU。
- parallel_apply 会启动多个线程进行前向计算。
- gather 会把计算输出收集到 GPU 0。
- GPU 0 会计算损失。
- 把梯度 scatter 到其他 GPU。
- 模型调用 backward 计算。
- 把梯度归并到 GPU 0。
- optimizer.step 更新模型。
具体对应下图之中的数字。
+-----+ +-------+ |GPU1 | | GPU1 | main thread +-----+ +-------+ +-----> Forward----> scatter +--------------> replicate-------> parallel_apply +--------> gather +---------+ + + + | 1 | 2 | 3 | | | | | | | +---------+----------+---+ | | | | | | | | +---------+----------+ | +--------------------+ | | | | | | | | | | | | | 2 | | 2 | | 2 thread|1 thread 2 thread 3 | 1 | | 1 | | 1 | | | | | | | v | v | v | | | | v v v v v v | +--+---+ +--+---+ +--+---+ +--+---+ +--+---+ +--+---+ +-------+ | | GPU1 | | GPU2 | | GPU3 | | GPU1 | | GPU2 | | GPU3 | | GPU1 | | +------+ +------+ +------+ +--+---+ +-+----+ +---+--+ +-+-+--++ | | | | ^ ^ ^ | | | | 4 | | | | | | ----------^ | | | | | 4 | | | | +------------------------+ | | | | | +------------------------------------+ | +------------------------------------------------------------------------------------------------------+ | +------+ | | GPU1 | | +------+ main thread +-> loss = criterion(...)+-----> scatter +--------------> model.backward() +----------> reduce gradient +-------> optimizer.step + + + +------+ 9 | 5 | 6 | 7 | GPU1 | | | | +--+---+ | v---------------v +--------------------+ ^ | | | | | | | | 8 | | | | thread 1 thread 2 thread 3 | | | | | + | | +-------------+ | | | | | | | | | | v v v v v v v | | | +--+---+ +---+-+ +--+--+ +-+---+ +--+--+ +---+--+ +--+--+ +--+--+ +-+--+ +-+---+ | GPU1 | | GPU1| | GPU2| |GPU3 | | GPU1| | GPU2 | |GPU3 | | GPU1| |GPU2| | GPU3| +------+ +-----+ +-----+ +-----+ +-----+ +------+ +-----+ +-----+ +----+ +-----+
手机如下:
至此,DP 分析完毕,我们下一篇要介绍 DDP 的一些相关知识。
0xFF 参考
PyTorch 源码解读之 torch.optim:优化算法接口详解
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20
[[原创]深度][PyTorch] DDP系列第二篇:实现原理与源代码解析
Pytorch踩坑记:赋值、浅拷贝、深拷贝三者的区别以及model.state_dict()和model.load_state_dict()的坑点
- [源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇
- [源码解析] PyTorch 分布式(4)------分布式应用基础概念
- [源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组
- MIT 2012分布式课程基础源码解析-事件管理封装
- 分布式事务 TCC-Transaction 源码解析 —— 调试环境搭建
- 分布式_事务_02_2PC框架raincat源码解析
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器
- [源码解析] 并行分布式框架 Celery 之架构 (1)
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器
- Cassandra 源码解析 3: 分布式hashtable(DHT) 和 Locator
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器
- [源码解析] 并行分布式框架 Celery 之架构 (2)
- [源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
- 分布式事物 seata框架的学习 第二章 (源码解析(server端))
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器
- 学习笔记---分布式调度之xxlJob调度中心的启动源码解析
- 分布式事物 seata框架的学习 第一章 (GlobalTransactional注解源码解析(tm端))
- 分布式事务_03_2PC框架raincat源码解析-事务提交过程
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器