From ab04fc02de99bdd48a8eaf4aeea27c3cd12ae274 Mon Sep 17 00:00:00 2001 From: hanchao Date: Thu, 17 Oct 2024 08:17:49 +0000 Subject: [PATCH] rm header and refine profilehead --- caffe2/CMakeLists.txt | 3 - .../distributed/c10d/ProcessGroupXCCL.cpp | 58 +++++++++++++------ .../distributed/c10d/ProcessGroupXCCL.hpp | 43 ++++++++------ 3 files changed, 65 insertions(+), 39 deletions(-) diff --git a/caffe2/CMakeLists.txt b/caffe2/CMakeLists.txt index b4ec018019f165..25bd7f700f68a2 100644 --- a/caffe2/CMakeLists.txt +++ b/caffe2/CMakeLists.txt @@ -1376,9 +1376,6 @@ if(USE_DISTRIBUTED) endif() if(USE_XPU AND USE_C10D_XCCL) target_compile_definitions(torch_xpu PUBLIC USE_C10D_XCCL) - set_source_files_properties( - ${TORCH_SRC_DIR}/csrc/distributed/c10d/ProcessGroupXCCL.cpp - PROPERTIES COMPILE_DEFINITIONS "CCL_ENABLE_ZE;CCL_ENABLE_SYCL") endif() if(USE_MPI AND USE_C10D_MPI) if(CMAKE_CXX_COMPILER_ID MATCHES "Clang" OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU") diff --git a/torch/csrc/distributed/c10d/ProcessGroupXCCL.cpp b/torch/csrc/distributed/c10d/ProcessGroupXCCL.cpp index 90fb4c3f9cbd75..76d265ca5de289 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupXCCL.cpp +++ b/torch/csrc/distributed/c10d/ProcessGroupXCCL.cpp @@ -1,17 +1,8 @@ #ifdef USE_C10D_XCCL #include +#include #include -#include -#include -#include -#include -#include -#include -#include - -#include -#include namespace c10d { @@ -89,10 +80,13 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL( at::Device& device, int rank, OpType opType, + uint64_t seq, + const char* profilingTitle, const std::optional>& inputs) - : Work(rank, opType, "profilingTitle", inputs), + : Work(rank, opType, profilingTitle, inputs), device_(device), - workStartTime_(std::chrono::steady_clock::now()) { + workStartTime_(std::chrono::steady_clock::now()), + seq_(seq) { xcclEndEvent_ = std::make_shared(); } @@ -101,7 +95,8 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL(const WorkXCCL& w) device_(w.device_), xcclEndEvent_(w.xcclEndEvent_), blockingWait_(w.blockingWait_), - workStartTime_(w.workStartTime_) {} + workStartTime_(w.workStartTime_), + seq_(w.seq_) {} ProcessGroupXCCL::WorkXCCL::~WorkXCCL() = default; @@ -156,10 +151,16 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( at::Device& device, int rank, OpType opType, + const char* profilingTitle, const std::vector& inputs, const std::vector& outputs) { auto r = c10::make_intrusive( - device, rank, opType, std::optional>(inputs)); + device, + rank, + opType, + seqCollective_, + profilingTitle, + std::optional>(inputs)); return r; } @@ -212,7 +213,10 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( Fn fn, PreProcess pre, PostProcess post, - OpType opType) { + OpType opType, + const char* profilingTitle) { + seqCollective_++; + auto device = inputs[0].device(); const auto key = std::to_string(device.index()); auto comm = getXCCLComm(key, device); @@ -221,7 +225,7 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( syncStream(device, xcclEventsMap_[key], stream); c10::intrusive_ptr work; - work = initWork(device, rank_, opType); + work = initWork(device, rank_, opType, profilingTitle); work->outputs_ = std::make_shared>(outputs); at::xpu::OptionalXPUGuard gpuGuard(device); @@ -253,6 +257,25 @@ c10::intrusive_ptr ProcessGroupXCCL::allreduce( auto tensor = tensors.back(); checkXPUTensor(tensor); + RECORD_PARAM_COMMS_DATA( + // static_cast( + // this->getSequenceNumberForGroup() + 1), // seq + 1 to match + // collective + 1, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + tensors, // inputTensors + tensors, // outputTensors + rank_, // rank + "allreduce", // collective name + tensor.numel(), // inNelems + tensor.numel(), // outNelems + tensor.scalar_type(), // dType + std::vector(), // inSplitSizes + std::vector(), // outSplitSizes + 0, // globalRankStart + 1, // globalRankStride + this->getSize()); // worldSize + return collective( tensor, tensor, @@ -273,7 +296,8 @@ c10::intrusive_ptr ProcessGroupXCCL::allreduce( ccl_stream); return; }, - OpType::ALLREDUCE); + OpType::ALLREDUCE, + "xccl:all_reduce"); } } // namespace c10d diff --git a/torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp b/torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp index 6e6eb16d62d620..f9761c652dc1a0 100644 --- a/torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp +++ b/torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp @@ -1,33 +1,24 @@ #pragma once -#if defined(__linux__) -#include -#include -#include -#include -#endif - #ifdef USE_C10D_XCCL -#include +// We will define those flags in XCCL backend file instead of passing to gcc +// compiler. +#define CCL_ENABLE_ZE +#define CCL_ENABLE_SYCL + #include -#include #include -#include -#include - -#include -#include #include -#include #include #include -#include #include +#include +#include #include #include #include -#include +#include namespace c10d { static std::vector TORCH_XCCL_BLOCKING_WAIT = { @@ -45,6 +36,8 @@ class TORCH_API ProcessGroupXCCL : public Backend { at::Device& device, int rank, OpType opType, + uint64_t seq, + const char* profilingTitle = nullptr, const std::optional>& inputs = std::nullopt); WorkXCCL(const WorkXCCL& w); ~WorkXCCL() override; @@ -63,6 +56,10 @@ class TORCH_API ProcessGroupXCCL : public Backend { return future_; } + uint64_t getSequencenumber() const override { + return seq_; + } + std::vector result() override { return *outputs_; } @@ -72,6 +69,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::shared_ptr xcclEndEvent_; bool blockingWait_ = false; std::chrono::time_point workStartTime_; + uint64_t seq_; private: void synchronizeInternal(std::chrono::milliseconds timeout); @@ -103,6 +101,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { at::Device& device, int rank, OpType opType, + const char* profilingTitle = nullptr, const std::vector& inputs = {}, const std::vector& outputs = {}); @@ -111,7 +110,8 @@ class TORCH_API ProcessGroupXCCL : public Backend { at::Tensor& input, at::Tensor& output, Fn fn, - OpType opType) { + OpType opType, + const char* profilingTitle = nullptr) { auto inputs = std::vector{input}; auto outputs = std::vector{output}; return collective( @@ -132,13 +132,17 @@ class TORCH_API ProcessGroupXCCL : public Backend { Fn fn, PreProcess pre, PostProcess post, - OpType opType); + OpType opType, + const char* profilingTitle = nullptr); c10::intrusive_ptr allreduce( std::vector& tensors, const AllreduceOptions& opts = AllreduceOptions()) override; void setSequenceNumberForGroup() override {} + uint64_t getSequenceNumberForGroup() override { + return seqCollective_; + } protected: std::unordered_map xcclStreamsMap_; @@ -147,6 +151,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { c10::intrusive_ptr store_; std::mutex mutex_; bool blockingWait_ = false; + uint64_t seqCollective_{0}; private: std::mutex kvs_mutex;