Skip to content

使用 Cpp 扩展自定义进程组后端

译者:片刻小哥哥

项目地址:https://pytorch.apachecn.org/2.0/tutorials/intermediate/process_group_cpp_extension_tutorial

原始地址:https://pytorch.org/tutorials/intermediate/process_group_cpp_extension_tutorial.html

作者 :

Howard Huang https://github.com/H-Huang

, 冯天 , 沉力 , 敏斯

没有10

editgithub .

先决条件:

本教程演示如何实现自定义 后端 并将其插入 PyTorch 分布式包 使用 cpp 扩展 。当您需要针对硬件 的专用软件堆栈,或者当您想要尝试新的 集体通信算法时,这会很有帮助。

基础知识

PyTorch 集体通信支持多种广泛采用的分布式 训练功能,包括 DistributedDataParallel , ZeroRedundancyOptimizer , FullyShardedDataParallel 。 为了使同一个集体通信 API 能够与 不同的通信后端一起工作,分布式包将集体 通信操作抽象为 [后端] (https://github.com/pytorch/pytorch/blob/main/torch/csrc/distributed/c10d/Backend.hpp) 类。然后可以使用首选第三方库将不同的后端实现为“后端”的子类。 PyTorch 分布式附带三个默认后端: ProcessGroupNCCLProcessGroupGlooProcessGroupMPI 。然而,除了这三个后端之外,还有其他通信库(例如,UCCOneCCL ),不同类型的硬件 (例如, TPU , Trainum ),以及新兴的 通信算法(例如, Herring , 缩减服务器 )。 因此,分布式包公开扩展 API 以允许自定义 集体通信后端。

下面的 4 个步骤展示了如何实现虚拟 后端 后端 并在 Python 应用程序代码中使用它。请注意,本教程的重点 不是演示扩展 API,而不是开发 正常运行的通信后端。因此, dummy 后端仅覆盖 API 的一个子集( all_reduceall_gather ),并简单地设置tensor的值 到 0。

第 1 步:实现

Backend的子类

第一步是实现一个 Backend 子类,该子类覆盖 目标集体通信 API 并运行自定义通信算法。 该扩展还需要实现

Work 子类,该子类 作为通信结果的未来,并允许在应用程序代码中异步执行。如果扩展使用第三方库,则它可以包含标头并从“BackendDummy”子类调用库 API。下面的两个代码片段展示了 dummy.hdummy.cpp 的实现。请参阅 虚拟集体 存储库以了解完整实现。

// file name: dummy.hpp
#include <torch/python.h>

#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>

#include <pybind11/chrono.h>

namespace c10d {

class BackendDummy : public Backend {
 public:
 BackendDummy(int rank, int size);

 c10::intrusive_ptr<Work> allgather(
 std::vector<std::vector<at::Tensor>>& outputTensors,
 std::vector<at::Tensor>& inputTensors,
 const AllgatherOptions& opts = AllgatherOptions()) override;

 c10::intrusive_ptr<Work> allreduce(
 std::vector<at::Tensor>& tensors,
 const AllreduceOptions& opts = AllreduceOptions()) override;

 // The collective communication APIs without a custom implementation
 // will error out if invoked by application code.
};

class WorkDummy : public Work {
 public:
 WorkDummy(
 OpType opType,
 c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output
 : Work(
 -1, // rank, only used by recvAnySource, irrelevant in this demo
 opType),
 future_(std::move(future)) {}
 bool isCompleted() override;
 bool isSuccess() const override;
 bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
 virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;

 private:
 c10::intrusive_ptr<c10::ivalue::Future> future_;
};
} // namespace c10d
// file name: dummy.cpp
#include "dummy.hpp"

namespace c10d {

// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allgather(
 std::vector<std::vector<at::Tensor>>& outputTensors,
 std::vector<at::Tensor>& inputTensors,
 const AllgatherOptions& /* unused */) {
 for (auto& outputTensorVec : outputTensors) {
 for (auto& outputTensor : outputTensorVec) {
 outputTensor.zero_();
 }
 }

 auto future = c10::make_intrusive<c10::ivalue::Future>(
 c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
 future->markCompleted(c10::IValue(outputTensors));
 return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}

// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allreduce(
 std::vector<at::Tensor>& tensors,
 const AllreduceOptions& opts) {
 for (auto& tensor : tensors) {
 tensor.zero_();
 }

 auto future = c10::make_intrusive<c10::ivalue::Future>(
 c10::ListType::create(c10::TensorType::get()));
 future->markCompleted(c10::IValue(tensors));
 return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
} // namespace c10d

步骤 2:公开扩展 Python API

后端构造函数被调用 从 Python 端 , so该扩展还需要向 Python 公开构造函数 API。这可以通过添加以下方法来完成。在此示例中, storetimeoutBackendDummy 实例化方法忽略,因为 在此虚拟实现中未使用它们。但是,实际扩展 应考虑使用 store 来执行交会并支持 timeout 参数。

// file name: dummy.hpp
class BackendDummy : public Backend {
 ...
 <Step 1 code>
 ...

 static c10::intrusive_ptr<Backend> createBackendDummy(
 const c10::intrusive_ptr<::c10d::Store>& store,
 int rank,
 int size,
 const std::chrono::duration<float>& timeout);

 static void BackendDummyConstructor() __attribute__((constructor)) {
 py::object module = py::module::import("torch.distributed");
 py::object register_backend =
 module.attr("Backend").attr("register_backend");
 // torch.distributed.Backend.register_backend will add `dummy` as a
 // new valid backend.
 register_backend("dummy", py::cpp_function(createBackendDummy));
 }
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy(
 const c10::intrusive_ptr<::c10d::Store>& /* unused */,
 int rank,
 int size,
 const std::chrono::duration<float>& /* unused */) {
 return c10::make_intrusive<BackendDummy>(rank, size);
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
 m.def("createBackendDummy", &BackendDummy::createBackendDummy);
}

步骤 3:构建自定义扩展

现在,扩展源代码文件已准备就绪。然后我们可以使用 cpp 扩展 来构建它。为此,请创建一个 setup.py 文件来准备路径和 命令。然后调用 `python

setup.py

develop` 来安装扩展。

如果扩展依赖于第三方库,您还可以为 cpp 扩展 API 指定 libraries_dirslibraries 。请参阅 torch ucc 项目作为实际示例。

# file name: setup.py
import os
import sys
import torch
from setuptools import setup
from torch.utils import cpp_extension

sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]

if torch.cuda.is_available():
    module = cpp_extension.CUDAExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )
else:
    module = cpp_extension.CppExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )

setup(
    name = "Dummy-Collectives",
    version = "0.0.1",
    ext_modules = [module],
    cmdclass={'build_ext': cpp_extension.BuildExtension}
)

步骤 4:在应用程序中使用扩展

安装后,您可以在调用时方便地使用 dummy 后端 init_process_group 就好像它是内置后端一样。

我们可以通过更改 init_process_groupbackend 参数来指定基于后端的调度。我们可以通过 指定 cpu:gloo,cuda:dummy 作为后端参数。

要将所有tensor发送到 dummy 后端,我们只需指定 dummy 作为后端参数。

import os

import torch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
import dummy_collectives

import torch.distributed as dist

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'

# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)

# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")

# this goes through dummy
if torch.cuda.is_available():
    y = x.cuda()
    dist.all_reduce(y)
    print(f"cuda allreduce: {y}")

    try:
        dist.broadcast(y, 0)
    except RuntimeError:
        print("got RuntimeError when calling broadcast")


回到顶部