diff --git a/csrc/device_lower/lower2device.cpp b/csrc/device_lower/lower2device.cpp index 79652bc67c5..9d39040f4be 100644 --- a/csrc/device_lower/lower2device.cpp +++ b/csrc/device_lower/lower2device.cpp @@ -348,6 +348,12 @@ IdModelOptions getIdModelOptions(Fusion* fusion) { } else if (expr->isA()) { options.setBuildTensorIndexer(true); continue; + } else if (expr->isOneOf()) { + options.setProducerIndex(true); + options.setConsumerIndex(true); + options.setInlinePredicate(true); + options.setUnswitchPredicate(true); + continue; } else if (auto reshape = dynamic_cast(expr)) { // The legacy indexer has an issue when an expand broadcast is // involved in reshape transformations. Enable both tensor and diff --git a/csrc/id_model/indexing.cpp b/csrc/id_model/indexing.cpp index b505a2b6ef7..8cc0da959e2 100644 --- a/csrc/id_model/indexing.cpp +++ b/csrc/id_model/indexing.cpp @@ -413,10 +413,11 @@ class AllocationDomainSetup : private kir::IrVisitor { } // Reorder non-logical allocation domains to follow the ordering of - // the logical domain. This is necessary when an allocation domain - // includes a vectorized loop iter domain since it must be at the + // the set allocation domain. This is necessary when an allocation + // domain includes a vectorized loop iter domain since it must be at the // innermost position but that may not be the case in the loop - // domain. Not strictly necessary otherwise, but this should also + // domain. It is also necessary when the tensor is a producer of a + // vectorized store. Not strictly necessary otherwise, but this should also // minimize the deviation from the old indexing scheme which always // uses the logical domain to index. // @@ -424,8 +425,17 @@ class AllocationDomainSetup : private kir::IrVisitor { std::optional> reorderAllocationDomains( const TensorView* tv, const std::vector& allocation_domains) const { + // Use getMaybeAllocationDomain instead of getLogicalDomain. When + // this tv is a producer of a vectorized store, the consumer + // tensor shoud be a global memory tensor and this is likely a + // cache tensor created by cacheBefore. The consumer tensor may + // have a reordered allocation domain and that dictates the actual + // allocation ordering of this producer local tensor as well. If + // getLogicalDomain is used, DistributedTransformerTest.Backward + // fails at the result validation. auto exprs = DependencyCheck::getAllExprsBetween( - {tv->getLogicalDomain().begin(), tv->getLogicalDomain().end()}, + {tv->getMaybeAllocationDomain().begin(), + tv->getMaybeAllocationDomain().end()}, {allocation_domains.begin(), allocation_domains.end()}); if (exprs.empty()) { @@ -434,7 +444,7 @@ class AllocationDomainSetup : private kir::IrVisitor { // Replay exprs from the logical domain to get the non-reordered // domains - auto ordered_domains = tv->getLogicalDomain(); + auto ordered_domains = tv->getMaybeAllocationDomain(); for (auto expr : exprs) { // Find the position to insert the outputs. int64_t insertion_pos = -1; @@ -845,14 +855,18 @@ std::vector TensorIndexer::getIndexFor( const auto& replacement_map = getIndexReplacementMap( expr, as_consumer, info.loop_domains, for_loops, info.index_map); - const auto index_groups = traversalGraph().toGroups(index_ids); + // Note that IDs of index_ids may be mapped as the traversal graph + // is the AlmostExact graph. std::vector result; - result.reserve(index_groups.size()); - for (const auto& g : index_groups) { - auto it = info.index_map.find(g); + result.reserve(index_ids.size()); + for (IterDomain* index_id : index_ids) { + const auto& index_group = traversalGraph().toGroup(index_id); + auto it = info.index_map.find(index_group); NVF_ERROR( - it != info.index_map.end(), "Index not found for ", g->toString()); + it != info.index_map.end(), + "Index not found for ", + index_id->toString()); result.push_back( ir_utils::replaceValRecursively(it->second, replacement_map)); } diff --git a/csrc/kernel_ir.h b/csrc/kernel_ir.h index f5f062cdbb7..60421db1995 100644 --- a/csrc/kernel_ir.h +++ b/csrc/kernel_ir.h @@ -77,7 +77,7 @@ class Predicate final : public Val { std::string toString(int indent_size = 0) const override; - NVF_API std::string toInlineString(int indent_size = 0) const override; + std::string toInlineString(int indent_size = 0) const override; PredicateType predicate_type() const { return ptype_; @@ -148,7 +148,7 @@ class Predicate final : public Val { Val* value_ = nullptr; }; -class NVF_API TensorIndex final : public Val { +class TensorIndex final : public Val { public: TensorIndex( IrBuilderPasskey, @@ -252,7 +252,7 @@ class Asm final : public Expr { //! is required as an intermediate within a kernel. The extent is the expression //! of the size of the buffer that is generated from the TensorView that //! describes the output of an operation. -class NVF_API Allocate final : public Expr { +class Allocate final : public Expr { public: using Expr::Expr; @@ -385,7 +385,7 @@ class NVF_API Allocate final : public Expr { // // TODO(kir): change name to SyncThreads as we could have other barriers. // -class NVF_API BlockSync final : public Expr { +class BlockSync final : public Expr { public: using Expr::Expr; @@ -408,7 +408,7 @@ class NVF_API BlockSync final : public Expr { // Synchronize all blocks in device, implies cooperative group launch is // required. -class NVF_API GridSync final : public Expr { +class GridSync final : public Expr { public: using Expr::Expr; @@ -436,7 +436,7 @@ class NVF_API GridSync final : public Expr { }; // PTX: fence.proxy.async -class NVF_API FenceAsyncProxy final : public Expr { +class FenceAsyncProxy final : public Expr { public: using Expr::Expr; @@ -453,7 +453,7 @@ class NVF_API FenceAsyncProxy final : public Expr { }; // PTX: wgmma.fence.sync.aligned -class NVF_API WgMmaFence final : public Expr { +class WgMmaFence final : public Expr { public: using Expr::Expr; @@ -469,7 +469,7 @@ class NVF_API WgMmaFence final : public Expr { std::string toInlineString(int indent_size = 0) const override; }; -class NVF_API MBarrierInit final : public Expr { +class MBarrierInit final : public Expr { public: using Expr::Expr; explicit MBarrierInit( @@ -495,7 +495,7 @@ class NVF_API MBarrierInit final : public Expr { } }; -class NVF_API MBarrierInvalidate final : public Expr { +class MBarrierInvalidate final : public Expr { public: using Expr::Expr; explicit MBarrierInvalidate(IrBuilderPasskey passkey, Val* mbarrier); @@ -514,7 +514,7 @@ class NVF_API MBarrierInvalidate final : public Expr { } }; -class NVF_API MBarrierArrive final : public Expr { +class MBarrierArrive final : public Expr { public: using Expr::Expr; explicit MBarrierArrive(IrBuilderPasskey passkey, Val* state, Val* mbarrier); @@ -544,7 +544,7 @@ class NVF_API MBarrierArrive final : public Expr { // This is usually used to specify the number of bytes that will be // transferred for cp.async and cp.async.bulk, so that future mbarrier.wait // can wait for the completion of the transfer. -class NVF_API MBarrierArriveExpectTx final : public Expr { +class MBarrierArriveExpectTx final : public Expr { public: using Expr::Expr; explicit MBarrierArriveExpectTx( @@ -578,7 +578,7 @@ class NVF_API MBarrierArriveExpectTx final : public Expr { } }; -class NVF_API MBarrierWait final : public Expr { +class MBarrierWait final : public Expr { public: using Expr::Expr; explicit MBarrierWait(IrBuilderPasskey passkey, Val* mbarrier, Val* state); @@ -601,7 +601,7 @@ class NVF_API MBarrierWait final : public Expr { } }; -class NVF_API MBarrierWaitParity final : public Expr { +class MBarrierWaitParity final : public Expr { public: using Expr::Expr; explicit MBarrierWaitParity( @@ -796,7 +796,7 @@ class UpdateMagicZero final : public Expr { //! //! TODO(kir): this is not a real expression //! -class NVF_API IfThenElse final : public Expr { +class IfThenElse final : public Expr { public: using Expr::Expr; @@ -915,7 +915,7 @@ class GridReduction final : public ReductionOp { } }; -class NVF_API GroupedGridReduction final : public GroupedReductionOp { +class GroupedGridReduction final : public GroupedReductionOp { public: using GroupedReductionOp::GroupedReductionOp; @@ -1006,7 +1006,7 @@ class NVF_API GroupedGridReduction final : public GroupedReductionOp { //! //! This node provides KernelExecutor the information it needs to allocate the //! broadcast and sync buffers. -class NVF_API GridBroadcast final : public Expr { +class GridBroadcast final : public Expr { public: using Expr::Expr; @@ -1117,7 +1117,7 @@ class GridWelford final : public Expr { } }; -class NVF_API GroupedGridWelford final : public GroupedWelfordOp { +class GroupedGridWelford final : public GroupedWelfordOp { public: using GroupedWelfordOp::GroupedWelfordOp; @@ -1211,7 +1211,7 @@ class NVF_API GroupedGridWelford final : public GroupedWelfordOp { //! Represents a WelfordOp with the division by count is hoisted out //! of an innermost loop -class NVF_API VectorizedWelfordOp final : public WelfordOp { +class VectorizedWelfordOp final : public WelfordOp { public: using WelfordOp::WelfordOp; diff --git a/csrc/multidevice/communication.cpp b/csrc/multidevice/communication.cpp index af122ee6e3d..edcc40e4d5f 100644 --- a/csrc/multidevice/communication.cpp +++ b/csrc/multidevice/communication.cpp @@ -429,12 +429,22 @@ c10::intrusive_ptr postReduceScatter( scattered_axis >= 0, "scattered_axis is expected to be non-negative: ", scattered_axis); -// reduce_scatter primitive in c10d induces extra buffering time to copy the -// user's input tensors to an internal source buffer. It is therefore always -// preferable to use _reduce_scatter_base (which does not perform any extra -// copy) when the tensors are stored contiguously (i.e., when -// scattered_axis==0). Note however than only nccl supports -// _reduce_scatter_base, not ucc. + + std::vector input_tensors = at::tensor_split( + input_tensor, communication->team_size(), scattered_axis); + // We could have checked the output shape as well if reduction_axis is + // available. It's not always available via + // `communication->out()->getReductionAxis()` for manually constructed host + // IRs like + // https://github.com/NVIDIA/Fuser/blob/89c47f695b296eb4ffd27984bd4c953fc3f3264b/tests/cpp/test_multidevice_overlap.cpp#L347. + assertBuffersHaveSameSize(input_tensors, {}); + + // reduce_scatter primitive in c10d induces extra buffering time to copy the + // user's input tensors to an internal source buffer. It is therefore always + // preferable to use _reduce_scatter_base (which does not perform any extra + // copy) when the tensors are stored contiguously (i.e., when + // scattered_axis==0). Note however than only nccl supports + // _reduce_scatter_base, not ucc. #if defined(NVFUSER_DISTRIBUTED) && defined(USE_C10D_NCCL) if (scattered_axis == 0 && backend->getBackendName() == c10d::NCCL_BACKEND_NAME) { @@ -442,14 +452,13 @@ c10::intrusive_ptr postReduceScatter( output_tensor, input_tensor, {.reduceOp = communication->reduceOp()}); } #endif - std::vector> input_tensors(1); - input_tensors[0] = at::split(input_tensor, /*split_size=*/1, scattered_axis); - - std::vector output_tensors({output_tensor}); - assertBufferCount(input_tensors[0], communication->team().size()); + std::vector> input_tensors_vec({input_tensors}); + std::vector output_tensor_vec({output_tensor}); return backend->reduce_scatter( - output_tensors, input_tensors, {.reduceOp = communication->reduceOp()}); + output_tensor_vec, + input_tensors_vec, + {.reduceOp = communication->reduceOp()}); } c10::intrusive_ptr postSendRecv( diff --git a/csrc/multidevice/utils.cpp b/csrc/multidevice/utils.cpp index 6fefa47d68d..020f746f011 100644 --- a/csrc/multidevice/utils.cpp +++ b/csrc/multidevice/utils.cpp @@ -42,45 +42,39 @@ std::unordered_set getShardedIterDomains(TensorView* tv) { return sharded_ids; } -// Returns whether a IterDomain in a TensorView is the outermost -// allocated IterDomain in the TensorView. -bool isOutermostAllocatedId(TensorView* tv, IterDomain* id) { - for (auto i : tv->getLoopDomain()) { - if (i == id) { - return true; +// Returns the position where an axis is allocated in a tv, skipping trivial +// dimensions (i.e. DID, reduction and broadcast). Returns -1 if id is not in +// tv's loop domain WAR: today we assume that the loop domain match with the +// actual allocation, but this will have to change in the future. +int64_t allocationIndex(TensorView* tv, IterDomain* id) { + int64_t index = 0; + for (auto* loop_id : tv->getLoopDomain()) { + if (loop_id == id) { + return index; } - if (!i->isDeviceDim() && !i->isReduction() && !i->isBroadcast()) { - return false; + if (!loop_id->isDeviceDim() && !loop_id->isReduction() && + !loop_id->isBroadcast()) { + index++; } } - NVF_THROW("Id", id->toString(), " is not in TensorView ", tv->toString()); - return false; + return -1; } } // namespace std::pair, std::vector> getShardingChanges( - Expr* expr) { - NVF_ERROR( - ir_utils::isTvOp(expr), "Expression must be a TvOp ", expr->toString()); - NVF_ERROR( - expr->outputs().size() == 1, - "Resharding expression can only have one output"); - NVF_ERROR( - expr->inputs().size() == 1, - "Resharding expression can have only one input"); - auto output = expr->outputs().at(0)->as(); - auto input = expr->inputs().at(0)->as(); - + TensorView* producer, + TensorView* consumer) { std::vector shard_additions; std::vector shard_deletions; - auto rootmap = PairwiseLogicalDomainMap(input, output).mapBroadcast(false); + auto rootmap = + PairwiseLogicalDomainMap(producer, consumer).mapBroadcast(false); const auto c2p_map = rootmap.mapConsumerToProducer(); - for (IterDomain* out_root : output->getMaybeRootDomain()) { + for (IterDomain* out_root : consumer->getMaybeRootDomain()) { IterDomain* in_root = c2p_map.at(out_root); // Ignore sharded broadcast domains and - // sharded reductions on the output + // sharded reductions on the consumer // ex. DIDx(i0) -> r(i0) or DIDx(i0) -> r(DIDx(i0)) // since they don't affect allocation. if (in_root->isDeviceDim() && !in_root->isBroadcast() && @@ -93,8 +87,7 @@ std::pair, std::vector> getShardingChanges } else if (in_root->isDeviceDim() && out_root->isDeviceDim()) { NVF_ERROR( in_root->getParallelType() == out_root->getParallelType(), - expr->toString(), - " reshards ", + " resharding ", in_root->toString(), " to ", out_root->toString(), @@ -462,23 +455,21 @@ bool isInnerResharding(Expr* expr) { ir_utils::isTvOp(expr), "Non-tv op is not supported : ", expr->toString()); - NVF_ERROR( - expr->outputs().size() == 1, - "Resharding operations can only have one output"); - NVF_ERROR( - expr->inputs().size() == 1, - "Resharding operations can have only one input"); - auto output = expr->outputs().at(0)->as(); - auto input = expr->inputs().at(0)->as(); - auto [shard_additions, shard_deletions] = getShardingChanges(expr); - NVF_ERROR( - shard_additions.size() + shard_deletions.size() <= 1, - "Resharding expr can only support one axis") - if (!shard_deletions.empty()) { - return !isOutermostAllocatedId(input, shard_deletions[0]); - } else if (!shard_additions.empty()) { - return !isOutermostAllocatedId(output, shard_additions[0]); + for (auto input : ir_utils::filterByType(expr->inputs())) { + for (auto output : ir_utils::filterByType(expr->outputs())) { + auto [shard_additions, shard_deletions] = + getShardingChanges(input, output); + NVF_ERROR( + shard_additions.size() + shard_deletions.size() <= 1, + "Resharding expr can only support one axis") + if ((!shard_deletions.empty() && + allocationIndex(input, shard_deletions.at(0)) > 0) || + (!shard_additions.empty() && + allocationIndex(output, shard_additions.at(0)) > 0)) { + return true; + } + } } return false; } diff --git a/csrc/multidevice/utils.h b/csrc/multidevice/utils.h index 50500a7a5fc..14da53df800 100644 --- a/csrc/multidevice/utils.h +++ b/csrc/multidevice/utils.h @@ -30,7 +30,8 @@ NVF_API bool distributedEnabled(); // TODO: Analyze loop domain for unsharded/sharded IDs and return their // parent root IDs. std::pair, std::vector> getShardingChanges( - Expr* expr); + TensorView* producer, + TensorView* consumer); // Returns whether a TensorView has a non-reduction axis parallelized Didx // Checks that the other non-reduction axis are not parallelized on Didx diff --git a/csrc/preseg_passes/reorder_sharded_axis.cpp b/csrc/preseg_passes/reorder_sharded_axis.cpp index 0f86d1ad4dc..0a6d3765dd9 100644 --- a/csrc/preseg_passes/reorder_sharded_axis.cpp +++ b/csrc/preseg_passes/reorder_sharded_axis.cpp @@ -40,7 +40,7 @@ void ReorderShardedAxisPass::runPass(Fusion* fusion) { expr->toString()); auto* output = expr->outputs().at(0)->as(); auto* input = expr->inputs().at(0)->as(); - auto [shard_additions, shard_deletions] = getShardingChanges(expr); + auto [shard_additions, shard_deletions] = getShardingChanges(input, output); NVF_ERROR( shard_additions.size() + shard_deletions.size() <= 1, "Resharding expr can only support one axis: ", diff --git a/csrc/scheduler/ampere_multi_matmul.cpp b/csrc/scheduler/ampere_multi_matmul.cpp index ee21e41ce8b..d582e9e9a10 100644 --- a/csrc/scheduler/ampere_multi_matmul.cpp +++ b/csrc/scheduler/ampere_multi_matmul.cpp @@ -1302,11 +1302,17 @@ void AmpereMultipleMatmulScheduler::setUpCircularBuffering() { for (TensorView* acw_smem : acw_smems_) { acw_smem->circularBuffer( - params_->circular_buffer_options.smem_circular_buffer_stage); + params_->circular_buffer_options.smem_circular_buffer_stage, + params_->circular_buffer_options.smem_circular_buffer_stage - + params_->circular_buffer_options + .smem_circular_buffer_prefetch_gap); } for (TensorView* bcw_smem : bcw_smems_) { bcw_smem->circularBuffer( - params_->circular_buffer_options.smem_circular_buffer_stage); + params_->circular_buffer_options.smem_circular_buffer_stage, + params_->circular_buffer_options.smem_circular_buffer_stage - + params_->circular_buffer_options + .smem_circular_buffer_prefetch_gap); } } diff --git a/csrc/scheduler/hopper_multi_matmul.cpp b/csrc/scheduler/hopper_multi_matmul.cpp index fbb95d46df2..b2d8ec705ec 100644 --- a/csrc/scheduler/hopper_multi_matmul.cpp +++ b/csrc/scheduler/hopper_multi_matmul.cpp @@ -53,6 +53,8 @@ void HopperMultipleMatmulScheduler::run() { inspectPrologues(); + setCGADims(); + scheduleOperands(); // schedule mma instruction output (mma_result) @@ -710,18 +712,32 @@ void HopperMultipleMatmulScheduler::setUpCircularBuffering() { params_->async_gmem_load_operands, "Circular buffer only supports async load"); } + NVF_CHECK( + params_->circular_buffer_options.smem_circular_buffer_prefetch_gap > + 0 && + params_->circular_buffer_options + .smem_circular_buffer_prefetch_gap <= + params_->circular_buffer_options.smem_circular_buffer_stage, + "smem_circular_buffer_prefetch_gap is ", + params_->circular_buffer_options.smem_circular_buffer_prefetch_gap, + " but is expected to be positive and not greater than number of stages: ", + params_->circular_buffer_options.smem_circular_buffer_stage); for (TensorView* acw_smem : acw_smems_) { acw_smem->circularBuffer( params_->circular_buffer_options.smem_circular_buffer_stage, /*prefetch_distance=*/ - params_->circular_buffer_options.smem_circular_buffer_stage - 1); + params_->circular_buffer_options.smem_circular_buffer_stage - + params_->circular_buffer_options + .smem_circular_buffer_prefetch_gap); } for (TensorView* bcw_smem : bcw_smems_) { bcw_smem->circularBuffer( params_->circular_buffer_options.smem_circular_buffer_stage, /*prefetch_distance=*/ - params_->circular_buffer_options.smem_circular_buffer_stage - 1); + params_->circular_buffer_options.smem_circular_buffer_stage - + params_->circular_buffer_options + .smem_circular_buffer_prefetch_gap); } } diff --git a/csrc/scheduler/hopper_multi_matmul.h b/csrc/scheduler/hopper_multi_matmul.h index bf7bc1df0f5..1d77785cc99 100644 --- a/csrc/scheduler/hopper_multi_matmul.h +++ b/csrc/scheduler/hopper_multi_matmul.h @@ -149,6 +149,14 @@ class HopperMultipleMatmulScheduler : public MultipleMatmulScheduler { std::vector> blockTileTensors( const std::vector& tvs); + //! Specifies the CGA dimensions by setting "cluster_dims" as fusion-managed + //! data + void setCGADims() const { + if (params_->cluster_dims != std::tuple{1, 1, 1}) { + fusion_->manage("cluster_dims", params_->cluster_dims); + } + } + //! Schedule the loads of all operands from global memory to shared memory. //! Starting from the basic tiled schedule, we swizzle the operand memory. //! Note that the cache op and LoadStoreOpType are already set during diff --git a/csrc/scheduler/matmul_heuristic.h b/csrc/scheduler/matmul_heuristic.h index 6a92d31fd2c..7e8ee6dc4d7 100644 --- a/csrc/scheduler/matmul_heuristic.h +++ b/csrc/scheduler/matmul_heuristic.h @@ -41,10 +41,18 @@ class MatmulParams : public HeuristicParams { // greater than one. Otherwise it is ignored. int smem_circular_buffer_stage = 2; + // The circular buffering prefetch distance will be set to + // smem_circular_buffer_stage - smem_circular_buffer_prefetch_gap + // This value must be positive since the prefetch distance must be strictly + // less than the number of stages. + int smem_circular_buffer_prefetch_gap = 1; + bool operator==(const CircularBufferOptions& other) const { return other.circular_buffer_smem_write == circular_buffer_smem_write && other.circular_buffer_smem_read == circular_buffer_smem_read && - other.smem_circular_buffer_stage == smem_circular_buffer_stage; + other.smem_circular_buffer_stage == smem_circular_buffer_stage && + other.smem_circular_buffer_prefetch_gap == + smem_circular_buffer_prefetch_gap; } std::string toString() const { @@ -54,12 +62,16 @@ class MatmulParams : public HeuristicParams { << (circular_buffer_smem_write ? "true" : "false") << "\n" << " circular_buffer_smem_read: " << (circular_buffer_smem_read ? "true" : "false") << "\n" - << " smem_circular_buffer_stage: " << smem_circular_buffer_stage; + << " smem_circular_buffer_stage: " << smem_circular_buffer_stage + << "\n" + << " smem_circular_buffer_prefetch_gap: " + << smem_circular_buffer_prefetch_gap; return ss.str(); } size_t hash() const { return std::hash{}( + (static_cast(smem_circular_buffer_prefetch_gap) << 3) | (static_cast(smem_circular_buffer_stage) << 2) | (static_cast(circular_buffer_smem_write)) << 1) | (static_cast(circular_buffer_smem_read)); @@ -179,6 +191,10 @@ class MatmulParams : public HeuristicParams { //! axis and perform a grid reduction before the epilogue. int splitk_factor = 1; + //! This is the CGA size on Hopper+ devices. This parameter is ignored on + //! Ampere and Turing. + std::tuple cluster_dims = {2, 1, 1}; + std::string toString() const override { std::stringstream ss; ss << "\n===== Matmul Parameters ========\n" diff --git a/csrc/scheduler/matmul_heuristic_plugin.cpp b/csrc/scheduler/matmul_heuristic_plugin.cpp index b3821787b67..01333727841 100644 --- a/csrc/scheduler/matmul_heuristic_plugin.cpp +++ b/csrc/scheduler/matmul_heuristic_plugin.cpp @@ -135,6 +135,8 @@ void copyParamsToConfig(KernelConfig* config, const MatmulParams* mparams) { }; config->load_stages = mparams->circular_buffer_options.smem_circular_buffer_stage; + config->prefetch_gap = + mparams->circular_buffer_options.smem_circular_buffer_prefetch_gap; config->async_gmem_load_operands = mparams->async_gmem_load_operands; setConfigTile(config->cta_tile, mparams->tile_sizes.cta_tile); setConfigTile(config->warp_tile, mparams->tile_sizes.warp_tile); @@ -163,6 +165,8 @@ void copyConfigToParams(MatmulParams* mparams, const KernelConfig* config) { setGemmTile(mparams->tile_sizes.warp_tile, config->warp_tile); mparams->circular_buffer_options.smem_circular_buffer_stage = config->load_stages; + mparams->circular_buffer_options.smem_circular_buffer_prefetch_gap = + config->prefetch_gap; mparams->async_gmem_load_operands = config->async_gmem_load_operands; // Update mma macro if necessary to match provided instruction tile MmaMacroEncode menc(mparams->mma_macro); // this will record the family diff --git a/csrc/scheduler/matmul_heuristic_plugin_api.h b/csrc/scheduler/matmul_heuristic_plugin_api.h index 224705530e5..207da96e9a8 100644 --- a/csrc/scheduler/matmul_heuristic_plugin_api.h +++ b/csrc/scheduler/matmul_heuristic_plugin_api.h @@ -74,6 +74,9 @@ struct KernelConfig { Tile instruction_tile = {16, 16, 16}; uint16_t splitk_factor = 1; uint8_t load_stages = 2; + // The circular buffering prefetch distance will be set to + // load_stages - prefetch_gap + uint8_t prefetch_gap = 1; uint8_t grid_swizzle_factor = 0; uint8_t cta_order = 0; bool circular_buffer_smem_read = true; diff --git a/csrc/scheduler/tools/inlining.cpp b/csrc/scheduler/tools/inlining.cpp index 6e7b51caba4..6064f4e6e7c 100644 --- a/csrc/scheduler/tools/inlining.cpp +++ b/csrc/scheduler/tools/inlining.cpp @@ -5,12 +5,14 @@ * SPDX-License-Identifier: BSD-3-Clause */ // clang-format on +#include #include #include #include #include #include #include +#include #include @@ -193,6 +195,46 @@ size_t MaxPosCalculator::getMaxProducerPosFromConsumer( } return producer->nDims(); } else { + std::optional> loop_path_groups = std::nullopt; + if (consumer->definition()->isA()) { + // We handle MmaOp specially here since it is currently the only operation + // for which we generate code (i.e. not SdpaFwdOp or SdpaBwdOp) that has + // some output dimensions that do not map to input dimensions. For this + // case, we need to identify potential inlined pairs each ID of which is + // not mapped at all to the other TensorView (see example below). + + // Get ValGroups in loop domains of producer and consumer that are + // connected to _mapped_ IterDomains in the pairwise map. + // + // Note that for MmaOp, it would be sufficient to traverse from the + // producer loop to the consumer loop and identify when _either_ the + // consumer or producer ID is not mapped. Here we are instead traversing + // from mapped domains to both roots so that we can check that _both_ + // consumer and producer ID is not mapped. This is slightly safer and this + // symmetry might be handy in handling new ops that use this feature in + // the future. + std::vector pairwise_mapped_groups; + for (auto [c_id, p_id] : PairwiseLogicalDomainMap(producer, consumer) + .mapConsumerToProducer()) { + pairwise_mapped_groups.push_back(inliningGraph().toGroup(c_id)); + } + // We propagate toward the loop groups from both consumer and producer + std::vector all_loop_groups; + for (IterDomain* id : producer->getLoopDomain()) { + all_loop_groups.push_back(inliningGraph().toGroup(id)); + } + for (IterDomain* id : consumer->getLoopDomain()) { + all_loop_groups.push_back(inliningGraph().toGroup(id)); + } + // getValsBetween does not require all target groups to be visited. The + // means the result contains the subset of both loop groups that we are + // looking for + std::vector group_path = getValsBetween( + pairwise_mapped_groups, all_loop_groups, inliningGraph()); + loop_path_groups = + std::unordered_set(group_path.begin(), group_path.end()); + } + auto consumer_it = consumer->getLoopDomain().begin(); for (const auto producer_pos : c10::irange(producer->nDims())) { auto p_id = producer->getLoopDomain().at(producer_pos); @@ -211,8 +253,54 @@ size_t MaxPosCalculator::getMaxProducerPosFromConsumer( } IterDomain* c_id = *consumer_it; + + // We can inline past positions in which both producer and consumer are + // not connected to any mapped logical IterDomain pairs. + // + // For example, an MmaOp can be constructed as follows: + // + // tv0: + // root/logical: [ iS0, iS1 ] + // loop: [ iS0, bS7, iS1 ] + // tv1: + // root/logical: [ iS2, iS3 ] + // loop: [ bS8, iS2, iS3 ] + // tv2: + // root/logical/loop: [ iS4, iS5, rS6 ] + // + // iS4 maps to iS0 so when producer==tv0 we can inline past iS0. When + // producer==tv1, iS4 doesn't map to anything in tv1 and bS8 is a loop + // broadcast in that position so we inline past the first ID in that + // case also. Similarly, we inline past iS5, iS2, and bS7. + if (loop_path_groups.has_value()) { + bool p_id_connected = + loop_path_groups->count(inliningGraph().toGroup(p_id)); + bool c_id_connected = + loop_path_groups->count(inliningGraph().toGroup(c_id)); + NVF_ERROR( + p_id_connected || + (consumer->definition()->isA() && p_id->isBroadcast()), + "Expected unmapped producer id to be broadcast domain in MmaOp input but found ", + p_id->toString()); + + if (!p_id_connected && !c_id_connected) { + NVF_ERROR( + p_id->isBroadcast(), + "Unmapped producer ID must be a broadcast created in scheduling but found ", + p_id->toString()); + ++consumer_it; + continue; + } + } + if (!inliningGraph().disjointValSets().strictAreMapped(p_id, c_id) || - !isAllowedID(c_id, consumer, best_effort, true, false, true)) { + !isAllowedID( + c_id, + consumer, + best_effort, + /*allow_reduction=*/true, + /*allow_vectorize=*/false, + /*allow_unmappable=*/true)) { return producer_pos; } diff --git a/doc/dev/python_scheduling/autotune_inner_reduction.py b/doc/dev/python_scheduling/autotune_inner_reduction.py new file mode 100644 index 00000000000..c43a20f5767 --- /dev/null +++ b/doc/dev/python_scheduling/autotune_inner_reduction.py @@ -0,0 +1,405 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-present NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: BSD-3-Clause +# Owner(s): ["module: nvfuser"] + +import torch +import itertools +from nvfuser import FusionDefinition, SchedulerType, DataType, ParallelType +from enum import Enum +from dataclasses import dataclass + +from autotune_utils import ( + ScriptConfiguration, + collect_data, + separate_data, + test_model_rmse, + test_model, + ceil_div, + floor_div, +) + + +# ================================ Description ================================ + +# This script defines four inner reduction fusions: +# +# 1. Inner Sum +# y = sum(x, dim=-1) +# +# 2. Add Sum +# z = sum(x1 + x2 + x3 + x4, dim=-1) +# +# 3. Tanh Sum +# y = sum(tanh(x), dim=-1) +# +# 4. Exp Sum +# z = sum(exp(x), dim=-1) +# +# Script Sequence: +# +# 1. Define a nvfuser fusion and its pytorch eager mode reference. +# +# 2. Profile the CUDA kernel performance by iterating over a set of input +# arguments and scheduler configurations. +# +# 3. Train a regression model to predict the desired performance metric given +# some input arguments and a scheduler configuration. +# +# 4. Measure the performance of the regression model. +# - Calculate RMSE of predicted and actual performance on test set. +# - Find the configuration with the best performance using regression model. +# Then, compare against the heuristic configuration selected by nvfuser. +# - For a specific batch size, gather performance across a range of hidden +# sizes. Calculate performance for best predicted and nvfuser +# configurations. Plot a chart comparing performance using matplotlib. +# +# The selected performance metric is effective_bandwidth_gbs. The empirical +# scheduler selects the configuration that has the highest predicted +# effective_bandwidth_gbs. + +# ============================================================================= + + +class AutotuneInnerReduction: + class FUSION(Enum): + INNER_SUM = 1 + ADD_SUM = 2 + TANH_SUM = 3 + EXP_SUM = 4 + + @dataclass(unsafe_hash=True) + class InnerReductionConfiguration: + # The vectorization factor for inner reduction domain. + vectorize_factor: int = 1 + # The unroll factor for the outer iteration domain. + unroll_factor: int = 1 + # The grid size for the outer iteration domain. + # If grdim > 1, then godim corresponds with y axis of the grid. + # Otherwise, it is the x axis of the grid. + godim: int = -1 + # The grid size for the inner reduction domain. It corresponds + # with x axis of the grid when it is >1. + grdim: int = -1 + # The x axis of CTA. It corresponds with inner reduction domain. + bdimx: int = -1 + # The y axis of CTA. It corresponds with outer reduction domain. + # If it is non-zero, then there are multiple reduction per CTA. + bdimy: int = -1 + + def __init__(self, selected_fusion): + self.selected_fusion = selected_fusion + + # gpu device properties are defined globally + assert torch.cuda.is_available() + self.gpu_properties = torch.cuda.get_device_properties(device=0) + + def __repr__(self): + return f"inner_reduction_{self.selected_fusion.name}" + + def convert_to_inner_reduction_params(self, scheduler_config, reduction_params): + warp_size = 32 + max_number_of_threads_cta = 1024 + grid_x_limit = 2147483647 + grid_y_limit = 65535 + + reduction_params.schedule_3D = False + reduction_params.fastest_dim = True + reduction_params.cross_block_inner_reduction = True + reduction_params.block_dim_inner_reduction = ParallelType.block_x + reduction_params.cross_grid_inner_reduction = scheduler_config.grdim > 1 + reduction_params.multiple_reds_per_blk = scheduler_config.bdimy > 1 + reduction_params.pad_inner_reduction_to_warp = ( + scheduler_config.bdimx > warp_size + ) and ( + (scheduler_config.bdimx * scheduler_config.bdimy) + < max_number_of_threads_cta + ) + reduction_params.unroll_factor_inner_reduction = ( + scheduler_config.vectorize_factor + ) + reduction_params.vectorize_inner_reduction = ( + scheduler_config.vectorize_factor > 1 + ) + + if scheduler_config.bdimy > 1: + reduction_params.block_dim_iter_dom = ParallelType.block_y + + reduction_params.unroll_factor_iter_dom = scheduler_config.unroll_factor + + gdimx = -1 + gdimy = -1 + + if scheduler_config.grdim > 1: + reduction_params.grid_dim_inner_reduction = ParallelType.grid_x + reduction_params.grid_dim_iter_dom = ParallelType.grid_y + + reduction_params.split_grid_dim_iter_dom_inner = True + gdimx = min(scheduler_config.grdim, grid_x_limit) + gdimy = min(scheduler_config.godim, grid_y_limit) + if scheduler_config.godim > grid_y_limit: + reduction_params.split_grid_dim_iter_dom_outer = True + else: + reduction_params.grid_dim_iter_dom = ParallelType.grid_x + gdimx = min(scheduler_config.godim, grid_x_limit) + if scheduler_config.godim > grid_x_limit: + reduction_params.split_grid_dim_inner_reduction = True + + reduction_params.lparams.gdimx = gdimx + reduction_params.lparams.gdimy = gdimy + + # Reset CTA dimensions to avoid failing LaunchParams::assertValid + reduction_params.lparams.bdimx = -1 + reduction_params.lparams.bdimy = -1 + reduction_params.lparams.bdimz = -1 + + reduction_params.lparams.bdimx = scheduler_config.bdimx + reduction_params.lparams.bdimy = scheduler_config.bdimy + + # For reduction scheduler, we test the cartesian product of vectorization and + # unroll factors. + def generate_scheduler_configurations(self, input_shape): + threads_per_cta_options = [128, 256, 512, 1024] + vectorization_factor_options = [1, 2, 4, 8] + unroll_factor_options = list(range(1, 11)) + warp_size = 32 + + num_iterations, num_reductions = input_shape + + for threads_per_cta, vectorize_factor, unroll_factor in itertools.product( + threads_per_cta_options, vectorization_factor_options, unroll_factor_options + ): + scheduler_config = self.InnerReductionConfiguration( + vectorize_factor=vectorize_factor, unroll_factor=unroll_factor + ) + scheduler_config.bdimx = min( + threads_per_cta, + max( + warp_size, + ceil_div(num_reductions, scheduler_config.vectorize_factor), + ), + ) + scheduler_config.bdimy = min( + threads_per_cta, + max(1, floor_div(threads_per_cta, scheduler_config.bdimx)), + ) + scheduler_config.godim = ceil_div( + num_iterations, scheduler_config.bdimy * scheduler_config.unroll_factor + ) + + # number of reduction elements not handled by a CTA + remaining_reduction = ceil_div( + num_reductions, + (scheduler_config.bdimx * scheduler_config.vectorize_factor), + ) + + if unroll_factor == 1 and remaining_reduction > 1: + # all remaining reduction goes to grdim + scheduler_config.grdim = remaining_reduction + yield scheduler_config + + # grid stride across reduction iterDomain is 1 + scheduler_config.grdim = 1 + yield scheduler_config + + def create_inputs(self, shape, tensor_datatype): + def inner_fn(num_inputs): + return [ + torch.randn(*shape, dtype=tensor_datatype, device="cuda") + for _ in range(num_inputs) + ] + + if self.selected_fusion == self.FUSION.ADD_SUM: + return inner_fn(num_inputs=4) + elif self.selected_fusion in [ + self.FUSION.INNER_SUM, + self.FUSION.TANH_SUM, + self.FUSION.EXP_SUM, + ]: + return inner_fn(num_inputs=1) + else: + assert False + + # A decorator to create a reduction fusion given some input arguments. + def create_fusion_func(self, inputs): + def sum_fusion(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.ops.cast(T0, dtype=DataType.Float) + T2 = fd.ops.sum(T1, dims=[1], keepdim=False, dtype=DataType.Null) + T3 = fd.ops.cast(T2, dtype=DataType.BFloat16) + fd.add_output(T3) + + def add_sum(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T2 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T3 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T4 = fd.ops.cast(T0, dtype=DataType.Float) + T5 = fd.ops.cast(T1, dtype=DataType.Float) + T6 = fd.ops.add(T4, T5) + T7 = fd.ops.cast(T2, dtype=DataType.Float) + T8 = fd.ops.add(T6, T7) + T9 = fd.ops.cast(T3, dtype=DataType.Float) + T10 = fd.ops.add(T8, T9) + T11 = fd.ops.sum(T10, dims=[1], keepdim=False, dtype=DataType.Null) + T12 = fd.ops.cast(T11, dtype=DataType.BFloat16) + fd.add_output(T12) + + def tanh_sum(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.ops.cast(T0, dtype=DataType.Float) + T2 = fd.ops.tanh(T1) + T3 = fd.ops.sum(T2, dims=[1], keepdim=False, dtype=DataType.Null) + T4 = fd.ops.cast(T3, dtype=DataType.BFloat16) + fd.add_output(T4) + + def exp_sum(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.ops.cast(T0, dtype=DataType.Float) + T2 = fd.ops.exp(T1) + T3 = fd.ops.sum(T2, dims=[1], keepdim=False, dtype=DataType.Null) + T4 = fd.ops.cast(T3, dtype=DataType.BFloat16) + fd.add_output(T4) + + if self.selected_fusion == self.FUSION.INNER_SUM: + return sum_fusion + elif self.selected_fusion == self.FUSION.ADD_SUM: + return add_sum + elif self.selected_fusion == self.FUSION.TANH_SUM: + return tanh_sum + elif self.selected_fusion == self.FUSION.EXP_SUM: + return exp_sum + else: + assert False + + # The pytorch eager mode reference used to validating nvfuser kernel. + def eager_reference(self, inputs): + def sum_fusion(inputs): + return torch.sum(inputs[0], dim=-1) + + def add_sum(inputs): + return torch.sum(inputs[0] + inputs[1] + inputs[2] + inputs[3], dim=-1) + + def tanh_sum(inputs): + return torch.sum(torch.tanh(inputs[0]), dim=-1) + + def exp_sum(inputs): + return torch.sum(torch.exp(inputs[0]), dim=-1) + + if self.selected_fusion == self.FUSION.INNER_SUM: + return sum_fusion(inputs) + elif self.selected_fusion == self.FUSION.ADD_SUM: + return add_sum(inputs) + elif self.selected_fusion == self.FUSION.TANH_SUM: + return tanh_sum(inputs) + elif self.selected_fusion == self.FUSION.EXP_SUM: + return exp_sum(inputs) + else: + assert False + + # Apply scheduler with custom parameters using decorator + def custom_scheduler(self, fd, scheduler_config): + def inner_fn(): + # Check if compatible with reduction scheduler + status, _ = fd.sched.can_schedule(SchedulerType.reduction) + assert status + + reduction_params = fd.sched.compute_reduction_heuristics() + + # Modify original parameters + if scheduler_config is not None: + self.convert_to_inner_reduction_params( + scheduler_config, reduction_params + ) + + # Schedule fusion + fd.sched.schedule() + + fd.schedule = inner_fn + return fd + + +# Run sequence of steps to collect data, train and test model +def main(): + # ====================== Setup Script Configuration ======================= + script_config = ScriptConfiguration( + num_dimensions=2, + outer_shapes=[16384], + inner_shapes=[128, 1024, 4096, 16384], + tensor_datatype=torch.bfloat16, + test_data_percentage=0.1, + empirical_batch_size=16384, + empirical_hidden_sizes=list(range(256, 32768, 256)), + ) + + autotune_config = AutotuneInnerReduction( + selected_fusion=AutotuneInnerReduction.FUSION.INNER_SUM + ) + + # ============================ Run Experiments ============================ + + parameters, performance = collect_data(script_config, autotune_config) + + # ============================ Separate Data ============================== + + train_data, test_data = separate_data(script_config, parameters, performance) + + # ========================= Train Regression Models ======================= + + # Apply machine learning regressor + # Given input shapes and scheduler parameters, predict performance metric. + from sklearn import ensemble + + train_inputs, train_perf = train_data + clf = ensemble.RandomForestRegressor() + clf = clf.fit(train_inputs, train_perf) + + # ========================= Test Regression Models ======================== + test_model_rmse(clf, script_config, autotune_config, test_data) + test_model(clf, script_config, autotune_config) + + +if __name__ == "__main__": + main() diff --git a/doc/dev/python_scheduling/autotune_pointwise.py b/doc/dev/python_scheduling/autotune_pointwise.py index 014ae8197a2..cbda494b3cb 100644 --- a/doc/dev/python_scheduling/autotune_pointwise.py +++ b/doc/dev/python_scheduling/autotune_pointwise.py @@ -1,15 +1,42 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023-present NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-present NVIDIA CORPORATION & AFFILIATES. # All rights reserved. # SPDX-License-Identifier: BSD-3-Clause # Owner(s): ["module: nvfuser"] import torch import itertools -import random -from nvfuser import FusionCache, FusionDefinition, SchedulerType +import math +from nvfuser import FusionDefinition, SchedulerType, DataType +from dataclasses import dataclass +from enum import Enum + +from autotune_utils import ( + ScriptConfiguration, + collect_data, + separate_data, + test_model_rmse, + test_model, +) + # ============================ Description ============================ +# This script defines four pointwise fusions: +# +# 1. GELU with Outer-Broadcast Bias Addition +# y = gelu(x + bias[broadcast, i], approximate='tanh') +# +# 2. SILU with Pointwise Multiplication +# z = silu(x) * y +# +# 3. Inner-Broadcast Addition +# y = x + y[i, broadcast] +# +# 4. Pointwise Multiplication +# z = x + y +# +# Script Sequence: +# # 1. Define a nvfuser fusion and its pytorch eager mode reference. # # 2. Profile the CUDA kernel performance by iterating over a set of input @@ -25,287 +52,321 @@ # - For a specific batch size, gather performance across a range of hidden # sizes. Calculate performance for best predicted and nvfuser # configurations. Plot a chart comparing performance using matplotlib. - +# # The selected performance metric is effective_bandwidth_gbs. The empirical # scheduler selects the configuration that has the highest predicted # effective_bandwidth_gbs. # ============================ Configurations ============================ -# Settings for input tensor generation -num_dimensions = 2 -outer_shapes = [512] -inner_shapes = [2**i for i in range(5, 15)] - -# For pointwise scheduler, we test the cartesian product of vectorization and -# unroll factors. -parameter_configurations = [ - vectorize_range := [1, 2, 4], - unroll_range := list(range(1, 10)), -] - -# We profile a range of input shapes with various configurations. -# This argument determines how much of the profiled data to keep as a test set. -test_data_percentage = 0.1 - -# The selected batch size for empirical and nvfuser comparison. -empirical_batch_size = 512 - -# The range of hidden sizes for empirical and nvfuser comparision. -empirical_hidden_sizes = list(range(256, 28672, 256)) - - -# A decorator to create a pointwise fusion given some input arguments. -def create_fusion_func(inputs): - def fusion_func(fd: FusionDefinition): - t0 = fd.from_pytorch(inputs[0]) - t1 = fd.from_pytorch(inputs[1]) - c0 = fd.define_scalar(3.0) - t2 = fd.ops.add(t0, t1) - t3 = fd.ops.mul(t2, c0) - fd.add_output(t3) - - return fusion_func - - -# The pytorch eager mode reference used to validating nvfuser kernel. -def eager_reference(inputs): - return (inputs[0] + inputs[1]) * 3 - - -# ============================ Function Definitions ============================ - - -# Apply scheduler with custom parameters using decorator -def custom_pointwise_scheduler(fd, config): - def inner_fn(): - # Check if compatible with pointwise scheduler - status, _ = fd.sched.can_schedule(SchedulerType.pointwise) - assert status - - schedule_params = fd.sched.compute_pointwise_heuristics() - - # Modify original parameters - if config is not None: - vectorization_factor, unroll_factor = config - schedule_params.vectorization_factor = vectorization_factor - schedule_params.unroll_factor_inner = unroll_factor - - # Schedule fusion - fd.sched.schedule() - - fd.schedule = inner_fn - return fd - -# Apply schedule decorator, run fusion, and profile performance -def run_profile(presched_fd, inputs, config=None): - scheduled_fd = custom_pointwise_scheduler(presched_fd, config) - nvf_outputs = scheduled_fd.execute(inputs, profile=True) - - # validate correctness - assert torch.allclose(nvf_outputs[0], eager_reference(inputs)) - - prof = scheduled_fd.profile() - bandwidth = prof.kernel_profiles[0].effective_bandwidth_gbs - time = prof.kernel_profiles[0].time_ms - return bandwidth, time - - -def argmax(map_config_to_perf): - best_perf = -1 - best_config = None - for config, perf in map_config_to_perf.items(): - if perf > best_perf: - best_perf = perf - best_config = config - return best_config - - -# Given a prediction model, input_shape, and set of parameter configurations, -# find the best parameters -def find_best_parameters(predictor, input_shape, parameter_configurations): - map_config_to_performance = { - config: predictor.predict([[*input_shape, *config]]) - for config in itertools.product(*parameter_configurations) - } - return argmax(map_config_to_performance) - - -# ============================ Run Experiments ================================ - -# Collect data for decision tree -parameters = [] -performance = [] - -for shape in itertools.product(outer_shapes, inner_shapes): - print(shape) - inputs = [ - torch.randn(*shape, device="cuda"), - torch.randn(*shape, device="cuda"), - ] - - with FusionDefinition() as presched_fd: - create_fusion_func(inputs)(presched_fd) - - # unroll and vectorization configurations - for config in itertools.product(vectorize_range, unroll_range): - perf_metric, _ = run_profile(presched_fd, inputs, config) - parameters.append((*shape, *config)) - performance.append(perf_metric) - -# ============================ Separate Data ================================== - -# Separate collected data into training and test sets -train_data = [] -test_data = [] -train_perf = [] -test_perf = [] -test_shapes = set() -all_test_config = {} # key: input_shape, value: (config, perf) - -for data, perf in zip(parameters, performance): - shape = data[:num_dimensions] - config = data[num_dimensions:] - - if shape in all_test_config: - all_test_config[shape][config] = perf - else: - all_test_config[shape] = {config: perf} - - if random.random() < test_data_percentage: - test_data.append(data) - test_perf.append(perf) - else: - test_shapes.add(shape) - train_data.append(data) - train_perf.append(perf) - -# key: input_shape, value: best_config -best_test_config = {shape: argmax(all_test_config[shape]) for shape in test_shapes} - -# ========================= Train Regression Models =========================== - -# Apply decision tree regressor -# Given input shapes and scheduler parameters, predict performance metric. -from sklearn import tree - -clf = tree.DecisionTreeRegressor() -clf = clf.fit(train_data, train_perf) -test_pred = clf.predict(test_data) - -print("===================== measure performance rmse ========================") - -# Estimate prediction error with RMSE -import numpy as np - -test_perf = np.array(test_perf) -print( - "Test prediction error (RMSE)", - np.sqrt(np.mean(np.power(test_perf - test_pred, 2))), -) -print("Test performance", test_perf) -print("Test prediction", test_pred) +class AutotunePointwise: + class FUSION(Enum): + GELU_BIAS = 1 + SILU_MUL = 2 + BCAST_ADD = 3 + MUL = 4 + + @dataclass(unsafe_hash=True) + class PointwiseConfiguration: + break_point: int + bdim: [int] + vectorize_factor: int + outer_unroll: int + inner_unroll: int + + def __init__(self, selected_fusion): + self.selected_fusion = selected_fusion + + def __repr__(self): + return f"pointwise_{self.selected_fusion.name}" + + # For pointwise scheduler, we test the cartesian product of vectorization and + # unroll factors. + def generate_scheduler_configurations(self, input_shape): + def _named_product(**items): + return itertools.starmap( + self.PointwiseConfiguration, itertools.product(*items.values()) + ) + + num_dimensions = len(input_shape) + warp_size = 32 + warp_group = warp_size * 4 + # limited to a maximum of 128 threads because of pointwise scheduler + max_threads_per_cta = 128 + threads_per_cta = list(range(warp_group, max_threads_per_cta + 1, warp_group)) + + scheduler_configs = [] + for bp in range(num_dimensions): + for num_threads in threads_per_cta: + if bp == 0: + # 1D scheduler configurations + bdim_shapes = [(num_threads, 1)] + outer_unroll_range = [1] + # unroll_factor is between [1, 9] + inner_unroll_range = range(1, 10) + else: + # 2D scheduler configurations + max_bdimy = num_threads // warp_size + log2_max_bdimy = int(math.log2(max_bdimy)) + bdimy_configs = [ + 2**log_bdimy for log_bdimy in range(1, log2_max_bdimy + 1) + ] + + bdim_shapes = [ + (max(warp_size, num_threads // bdimy), bdimy) + for bdimy in bdimy_configs + ] + # total_unroll_factor is between [1, 9] given that outer and + # inner unroll factors are between [1, 3]. + outer_unroll_range = range(1, 4) + inner_unroll_range = range(1, 4) + + scheduler_config = _named_product( + break_point=[bp], + bdim=bdim_shapes, + vectorize_factor=[1, 2, 4, 8], + outer_unroll=outer_unroll_range, + inner_unroll=inner_unroll_range, + ) + scheduler_configs.append(scheduler_config) + return itertools.chain(*scheduler_configs) + + def create_inputs(self, shape, tensor_datatype): + def outer_bcast(): + return [ + torch.randn(1, shape[-1], dtype=tensor_datatype, device="cuda"), + torch.randn(*shape, dtype=tensor_datatype, device="cuda"), + ] + + def inner_bcast(): + return [ + torch.randn(shape[0], 1, dtype=tensor_datatype, device="cuda"), + torch.randn(*shape, dtype=tensor_datatype, device="cuda"), + ] + + def full(): + return [ + torch.randn(*shape, dtype=tensor_datatype, device="cuda"), + torch.randn(*shape, dtype=tensor_datatype, device="cuda"), + ] + + if self.selected_fusion == self.FUSION.GELU_BIAS: + return outer_bcast() + elif self.selected_fusion in [self.FUSION.SILU_MUL, self.FUSION.MUL]: + return full() + elif self.selected_fusion == FUSION.BCAST_ADD: + return inner_bcast() + else: + assert False + + # A decorator to create a pointwise fusion given some input arguments. + def create_fusion_func(self, inputs): + def gelu_bias(fd: FusionDefinition): + T0 = fd.define_tensor( + shape=[1, -1], + contiguity=[None, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T6 = fd.ops.cast(T1, dtype=DataType.Float) + T7 = fd.ops.cast(T0, dtype=DataType.Float) + T8 = fd.ops.add(T6, T7) + T9 = fd.ops.mul(T8, T8) + T10 = fd.ops.mul(T9, T8) + S11 = fd.define_scalar(0.500000, dtype=DataType.Double) + T12 = fd.ops.mul(S11, T8) + S13 = fd.define_scalar(0.0447150, dtype=DataType.Double) + T14 = fd.ops.mul(S13, T10) + T15 = fd.ops.add(T8, T14) + S16 = fd.define_scalar(0.797885, dtype=DataType.Double) + T17 = fd.ops.mul(S16, T15) + T18 = fd.ops.tanh(T17) + S19 = fd.define_scalar(1.00000, dtype=DataType.Double) + T20 = fd.ops.add(S19, T18) + T21 = fd.ops.mul(T12, T20) + T22 = fd.ops.cast(T21, dtype=DataType.BFloat16) + fd.add_output(T22) + + def silu_mul(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T2 = fd.ops.cast(T0, dtype=DataType.Float) + T3 = fd.ops.neg(T2) + T4 = fd.ops.exp(T3) + S5 = fd.define_scalar(1.00000, dtype=DataType.Double) + T6 = fd.ops.add(S5, T4) + T7 = fd.ops.reciprocal(T6) + T8 = fd.ops.mul(T2, T7) + T9 = fd.ops.cast(T1, dtype=DataType.Float) + T10 = fd.ops.mul(T8, T9) + T11 = fd.ops.cast(T10, dtype=DataType.BFloat16) + fd.add_output(T11) + + def bcast_add(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, 1], + contiguity=[True, None], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T2 = fd.ops.cast(T0, dtype=DataType.Float) + T3 = fd.ops.cast(T1, dtype=DataType.Float) + T4 = fd.ops.add(T2, T3) + T5 = fd.ops.cast(T4, dtype=DataType.BFloat16) + fd.add_output(T5) + + def mul(fd: FusionDefinition) -> None: + T0 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T1 = fd.define_tensor( + shape=[-1, -1], + contiguity=[True, True], + dtype=DataType.BFloat16, + is_cpu=False, + stride_order=[1, 0], + ) + T2 = fd.ops.cast(T0, dtype=DataType.Float) + T3 = fd.ops.cast(T1, dtype=DataType.Float) + T4 = fd.ops.mul(T2, T3) + T5 = fd.ops.cast(T4, dtype=DataType.BFloat16) + fd.add_output(T5) + + if self.selected_fusion == self.FUSION.GELU_BIAS: + return gelu_bias + elif self.selected_fusion == self.FUSION.SILU_MUL: + return silu_mul + elif self.selected_fusion == self.FUSION.BCAST_ADD: + return bcast_add + elif self.selected_fusion == self.FUSION.MUL: + return mul + else: + assert False + + # The pytorch eager mode reference used to validating nvfuser kernel. + def eager_reference(self, inputs): + def gelu_bias(inputs): + return torch.nn.functional.gelu( + inputs[0] + inputs[1].unsqueeze(0), approximate="tanh" + ) + + def silu_mul(inputs): + return torch.nn.functional.silu(inputs[0]) * inputs[1] + + def bcast_add(inputs): + return inputs[0] + inputs[1] + + def mul(inputs): + return inputs[0] * inputs[1] + + if self.selected_fusion == self.FUSION.GELU_BIAS: + return gelu_bias(inputs) + elif self.selected__fusion == self.FUSION.SILU_MUL: + return silu_mul(inputs) + elif self.selected_fusion == self.FUSION.BCAST_ADD: + return bcast_add(inputs) + elif self.selected_fusion == self.FUSION.MUL: + return mul(inputs) + else: + assert False + + # Apply scheduler with custom parameters using decorator + def custom_scheduler(self, fd, scheduler_config): + def inner_fn(): + # Check if compatible with pointwise scheduler + status, _ = fd.sched.can_schedule(SchedulerType.pointwise) + assert status + + schedule_params = fd.sched.compute_pointwise_heuristics() + + # Modify original parameters + if scheduler_config is not None: + schedule_params.break_point = scheduler_config.break_point + schedule_params.vectorization_factor = scheduler_config.vectorize_factor + schedule_params.unroll_factor_outer = scheduler_config.outer_unroll + schedule_params.unroll_factor_inner = scheduler_config.inner_unroll + schedule_params.lparams.bdimx = scheduler_config.bdim[0] + schedule_params.lparams.bdimy = scheduler_config.bdim[1] + + # Schedule fusion + fd.sched.schedule() + + fd.schedule = inner_fn + return fd + + +# Run sequence of steps to collect data, train and test model +def main(): + # ====================== Setup Script Configuration ======================= + script_config = ScriptConfiguration( + num_dimensions=2, + outer_shapes=[16384], + inner_shapes=[128, 1024, 4096, 16384], + tensor_datatype=torch.bfloat16, + test_data_percentage=0.1, + empirical_batch_size=16384, + empirical_hidden_sizes=list(range(256, 32768, 256)), + ) -print("======================= compare configurations =======================") -# Find best configuration for test_shapes -print( - "input shape, estimate_config:(vectorization, unroll), actual_config:(vectorization, unroll), correct" -) -correctness_count = 0 -mismatch_configs = [] -for shape in test_shapes: - estimate_config = find_best_parameters(clf, shape, parameter_configurations) - - match_config = estimate_config == best_test_config[shape] - if not match_config: - mismatch_configs.append((shape, estimate_config)) - - correctness_count += int(match_config) - print(f"{shape}, {estimate_config}, {best_test_config[shape]}, {match_config}") -print("% of predictions match nvfuser parameters", correctness_count / len(test_shapes)) -print(correctness_count, "out of", len(test_shapes)) - -print("======================= compare performance =========================") - -for shape, estimate_config in mismatch_configs: - inputs = [ - torch.randn(*shape, device="cuda"), - torch.randn(*shape, device="cuda"), - ] - - with FusionDefinition() as presched_fd: - create_fusion_func(inputs)(presched_fd) - - _, est_perf = run_profile(presched_fd, inputs, estimate_config) - _, nvf_perf = run_profile(presched_fd, inputs) - est_perf_faster = est_perf < nvf_perf - print( - f"{shape} \t estimate_perf:{est_perf:.5f} \t nvfuser_perf:{nvf_perf:.5f} \t is_estimated_config_faster:\t{est_perf_faster}" + autotune_config = AutotunePointwise( + selected_fusion=AutotunePointwise.FUSION.GELU_BIAS ) -print("=====================================================================") + # ============================ Run Experiments ============================ -# For a specific batch size, gather performance across a range of hidden sizes. -# Calculate performance for best predicted and nvfuser configurations. Plot a -# chart comparing performance using matplotlib. + parameters, performance = collect_data(script_config, autotune_config) -# NOTE: The matplotlib experiment plots the kernel runtime, which could be -# different than the selected performance metric. Currently, the performance -# metric is effective_bandwidth_gbs. + # ============================ Separate Data ============================== -import matplotlib.pyplot as plt -import numpy as np + train_data, test_data = separate_data(script_config, parameters, performance) -FusionCache.reset() -est_perfs = [] -for hidden_shape in empirical_hidden_sizes: - inputs = [ - torch.randn(empirical_batch_size, hidden_shape, device="cuda"), - torch.randn(empirical_batch_size, hidden_shape, device="cuda"), - ] - estimate_config = find_best_parameters( - clf, (empirical_batch_size, hidden_shape), parameter_configurations - ) + # ========================= Train Regression Models ======================= - with FusionDefinition() as presched_fd: - create_fusion_func(inputs)(presched_fd) + # Apply machine learning regressor + # Given input shapes and scheduler parameters, predict performance metric. + from sklearn import ensemble - _, est_time_ms = run_profile(presched_fd, inputs, estimate_config) - est_perfs.append(est_time_ms) - print( - f"{empirical_batch_size}, {hidden_shape}, {estimate_config}, {est_time_ms:.3f}" - ) + train_inputs, train_perf = train_data + clf = ensemble.RandomForestRegressor() + clf = clf.fit(train_inputs, train_perf) + + # ========================= Test Regression Models ======================== + test_model_rmse(clf, script_config, autotune_config, test_data) + test_model(clf, script_config, autotune_config) -FusionCache.reset() -nvf_perfs = [] -for hidden_shape in empirical_hidden_sizes: - inputs = [ - torch.randn(empirical_batch_size, hidden_shape, device="cuda"), - torch.randn(empirical_batch_size, hidden_shape, device="cuda"), - ] - - with FusionDefinition() as presched_fd: - create_fusion_func(inputs)(presched_fd) - - _, nvf_time_ms = run_profile(presched_fd, inputs) - nvf_perfs.append(nvf_time_ms) - print(f"{empirical_batch_size}, {hidden_shape}, {nvf_time_ms:.3f}") - -# Get mean speed-up from nvfuser to empirical configurations across all input shapes. -# Negative value mean empirical configurations are slower than nvfuser. -print("Mean speed-up", np.mean(np.array(nvf_perfs) - np.array(est_perfs))) - -np_hidden_size = np.array(empirical_hidden_sizes) -plt.plot(np_hidden_size, np.array(est_perfs)) -plt.plot(np_hidden_size, np.array(nvf_perfs)) - -plt.xlabel("Hidden Size") -plt.ylabel("Time(ms)") -plt.title( - f"Batch Size = {empirical_batch_size}, Compare Decision Tree Heuristic vs NvFuser" -) -plt.legend(["decision_tree", "nvfuser"], loc="lower right") -plt.savefig(f"pointwise_empirical_batchsize{empirical_batch_size}.png") -# ============================================================================= +if __name__ == "__main__": + main() diff --git a/doc/dev/python_scheduling/autotune_utils.py b/doc/dev/python_scheduling/autotune_utils.py new file mode 100644 index 00000000000..e699f84270e --- /dev/null +++ b/doc/dev/python_scheduling/autotune_utils.py @@ -0,0 +1,311 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-present NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: BSD-3-Clause +# Owner(s): ["module: nvfuser"] + +import torch +import math +import itertools +from nvfuser import FusionCache, FusionDefinition +from dataclasses import dataclass, astuple + +# ================================ Description ================================ +# This file contains the utility function for autotuning scripts. +# ============================================================================= + + +# Returns the result of a/b rounded to the nearest integer in the direction of +# positive infinity. +def ceil_div(a, b): + return int(math.ceil(a / b)) + + +# Returns the result of a/b rounded to the nearest integer in the direction of +# negative infinity. +def floor_div(a, b): + return int(math.floor(a / b)) + + +@dataclass +class ScriptConfiguration: + # Settings for input tensor generation + # number of dimensions in the tensor argument + num_dimensions: int + + # the data type for the tensor argument + tensor_datatype: torch.dtype + + # During training, the cartesian product of outer_shapes and inner_shapes + # is used to define the shape of the input tensor arguments. + outer_shapes: [int] + inner_shapes: [int] + + # We profile a range of input shapes with various configurations. + # This argument determines how much of the profiled data to keep as a test + # set. + test_data_percentage: [float] + + # The selected batch size for empirical and nvfuser comparison. + empirical_batch_size: [int] + + # The range of hidden sizes for empirical and nvfuser comparision. + empirical_hidden_sizes: [int] + + +# Converted DataClass to a Tuple. It flattens nested tuples. The function is +# used for compatibility with machine learning model. +def flatten_configuration(scheduler_config): + new_scheduler_config = [] + for item in astuple(scheduler_config): + if type(item) is tuple: + new_scheduler_config.extend(item) + else: + new_scheduler_config.append(item) + return tuple(new_scheduler_config) + + +# Collect data for machine learning +def collect_data(script_config, autotune_config): + parameters = [] + performance = [] + + for shape in itertools.product( + script_config.outer_shapes, script_config.inner_shapes + ): + print(shape) + inputs = autotune_config.create_inputs(shape, script_config.tensor_datatype) + + with FusionDefinition() as presched_fd: + autotune_config.create_fusion_func(inputs)(presched_fd) + + # unroll and vectorization configurations + for parameter_config in autotune_config.generate_scheduler_configurations( + shape + ): + perf_metric, _ = run_profile( + autotune_config, presched_fd, inputs, parameter_config + ) + parameters.append((*shape, *flatten_configuration(parameter_config))) + performance.append(perf_metric) + return parameters, performance + + +# Separate collected data into training and test sets +def separate_data(script_config, parameters, performance): + import random + + train_inputs = [] + test_inputs = [] + train_perf = [] + test_perf = [] + test_shapes = set() + all_test_scheduler_config = {} # key: input_shape, value: (scheduler_config, perf) + + for data, perf in zip(parameters, performance): + shape = data[: script_config.num_dimensions] + scheduler_config = data[script_config.num_dimensions :] + + if shape in all_test_scheduler_config: + all_test_scheduler_config[shape][scheduler_config] = perf + else: + all_test_scheduler_config[shape] = {scheduler_config: perf} + + if ( + script_config.test_data_percentage > 0 + and random.random() < script_config.test_data_percentage + ): + test_shapes.add(shape) + test_inputs.append(data) + test_perf.append(perf) + else: + train_inputs.append(data) + train_perf.append(perf) + + # key: input_shape, value: best_scheduler_config + best_test_scheduler_config = { + shape: argmax(all_test_scheduler_config[shape]) for shape in test_shapes + } + + return (train_inputs, train_perf), ( + test_inputs, + test_perf, + test_shapes, + best_test_scheduler_config, + ) + + +# Apply schedule decorator, run fusion, and profile performance +def run_profile(autotune_config, presched_fd, inputs, scheduler_config=None): + scheduled_fd = autotune_config.custom_scheduler(presched_fd, scheduler_config) + nvf_outputs = scheduled_fd.execute(inputs, profile=True) + + # validate correctness + assert torch.allclose( + nvf_outputs[0], autotune_config.eager_reference(inputs), atol=1e-2, rtol=1e-2 + ) + + prof = scheduled_fd.profile() + bandwidth = prof.kernel_profiles[0].effective_bandwidth_gbs + time = prof.kernel_profiles[0].time_ms + return bandwidth, time + + +# Given a map from scheduler configuration to predicted performance, find the +# configuration with the maximum predicted performance +def argmax(map_scheduler_config_to_perf): + best_perf = -1 + best_scheduler_config = None + for scheduler_config, perf in map_scheduler_config_to_perf.items(): + if perf > best_perf: + best_perf = perf + best_scheduler_config = scheduler_config + return best_scheduler_config + + +# Given a prediction model, input_shape, and set of parameter configurations, +# find the best parameters +def find_best_parameters(clf, input_shape, scheduler_configurations): + map_scheduler_config_to_performance = { + scheduler_config: clf.predict( + [[*input_shape, *flatten_configuration(scheduler_config)]] + ) + for scheduler_config in scheduler_configurations + } + return argmax(map_scheduler_config_to_performance) + + +# Measure model performance with RMSE +def test_model_rmse(clf, script_config, autotune_config, test_data): + test_inputs, test_perf, test_shapes, best_test_scheduler_config = test_data + test_pred = clf.predict(test_inputs) + + # Estimate prediction error with RMSE + import numpy as np + + test_perf = np.array(test_perf) + print( + "Test prediction error (RMSE)", + np.sqrt(np.mean(np.power(test_perf - test_pred, 2))), + ) + print("Test performance", test_perf) + print("Test prediction", test_pred) + + print("======================= compare configurations =======================") + # Find best configuration for test_shapes + print("input shape, estimate_config, actual_config, correct") + correctness_count = 0 + mismatch_configs = [] + for shape in test_shapes: + estimate_config = find_best_parameters( + clf, shape, autotune_config.generate_scheduler_configurations(shape) + ) + + match_config = ( + flatten_configuration(estimate_config) == best_test_scheduler_config[shape] + ) + if not match_config: + mismatch_configs.append((shape, estimate_config)) + + correctness_count += int(match_config) + print( + f"{shape}, {estimate_config}, {best_test_scheduler_config[shape]}, {match_config}" + ) + print( + "% of predictions match nvfuser parameters", + correctness_count / len(test_shapes), + ) + print(correctness_count, "out of", len(test_shapes)) + + print("======================= compare performance =========================") + + for shape, estimate_config in mismatch_configs: + inputs = autotune_config.create_inputs(shape, script_config.tensor_datatype) + + with FusionDefinition() as presched_fd: + autotune_config.create_fusion_func(inputs)(presched_fd) + + _, est_perf = run_profile(autotune_config, presched_fd, inputs, estimate_config) + _, nvf_perf = run_profile(autotune_config, presched_fd, inputs) + est_perf_faster = est_perf < nvf_perf + print( + f"{shape} \t estimate_perf: {est_perf: .5f} \t nvfuser_perf: {nvf_perf: .5f} \t is_estimated_config_faster: {est_perf_faster}" + ) + print("=====================================================================") + + +# Given a machine learning model, compare the performance of its predicted configuration +# against nvfuser on a given fusion +def test_model(clf, script_config, autotune_config): + # For a specific batch size, gather performance across a range of hidden sizes. + # Calculate performance for best predicted and nvfuser configurations. Plot a + # chart comparing performance using matplotlib. + + # NOTE: The matplotlib experiment plots the kernel runtime, which could be + # different than the selected performance metric. Currently, the performance + # metric is effective_bandwidth_gbs. + + import matplotlib.pyplot as plt + import numpy as np + + FusionCache.reset() + est_perfs = [] + for hidden_shape in script_config.empirical_hidden_sizes: + inputs = autotune_config.create_inputs( + (script_config.empirical_batch_size, hidden_shape), + script_config.tensor_datatype, + ) + + estimate_config = find_best_parameters( + clf, + (script_config.empirical_batch_size, hidden_shape), + autotune_config.generate_scheduler_configurations( + (script_config.empirical_batch_size, hidden_shape) + ), + ) + + with FusionDefinition() as presched_fd: + autotune_config.create_fusion_func(inputs)(presched_fd) + + _, est_time_ms = run_profile( + autotune_config, presched_fd, inputs, estimate_config + ) + est_perfs.append(est_time_ms) + print( + f"{script_config.empirical_batch_size}, {hidden_shape}, {estimate_config}, {est_time_ms: .3f}" + ) + + FusionCache.reset() + nvf_perfs = [] + for hidden_shape in script_config.empirical_hidden_sizes: + inputs = autotune_config.create_inputs( + (script_config.empirical_batch_size, hidden_shape), + script_config.tensor_datatype, + ) + + with FusionDefinition() as presched_fd: + autotune_config.create_fusion_func(inputs)(presched_fd) + + _, nvf_time_ms = run_profile(autotune_config, presched_fd, inputs) + nvf_perfs.append(nvf_time_ms) + print( + f"{script_config.empirical_batch_size}, {hidden_shape}, {nvf_time_ms: .3f}" + ) + + # Get mean speed-up from nvfuser to empirical configurations across all input shapes. + # Negative value mean empirical configurations are slower than nvfuser. + print("Mean speed-up", np.mean(np.array(nvf_perfs) - np.array(est_perfs))) + + np_hidden_size = np.array(script_config.empirical_hidden_sizes) + plt.plot(np_hidden_size, np.array(est_perfs)) + plt.plot(np_hidden_size, np.array(nvf_perfs)) + + plt.xlabel("Hidden Size") + plt.ylabel("Time(ms)") + plt.title( + f"Batch Size = {script_config.empirical_batch_size}, Compare Machine Learning Heuristic vs NvFuser" + ) + plt.legend(["random_forest", "nvfuser"], loc="lower right") + plt.savefig( + f"{autotune_config}_empirical_batch_size_{script_config.empirical_batch_size}.png" + ) + plt.close("all") diff --git a/tests/cpp/test_matmul.cpp b/tests/cpp/test_matmul.cpp index 3e9f7f553c8..cbd51d97cfb 100644 --- a/tests/cpp/test_matmul.cpp +++ b/tests/cpp/test_matmul.cpp @@ -3657,7 +3657,6 @@ TEST_F(HopperMatmulTest, HSH_NT_128BSwizzle) { const auto dtype = DataType::Half; constexpr bool use_smem_epilogue = false; - constexpr bool use_warp_specialization = true; constexpr int64_t stages = 4; constexpr int64_t prefetch = 3; @@ -3801,13 +3800,8 @@ TEST_F(HopperMatmulTest, HSH_NT_128BSwizzle) { inlineMost(); - if (use_warp_specialization) { - tv0c->circularBuffer(stages, prefetch, WarpSpecialized(ParallelType::TIDy)); - tv1c->circularBuffer(stages, prefetch, WarpSpecialized(ParallelType::TIDy)); - } else { - tv0c->circularBuffer(stages, prefetch); - tv1c->circularBuffer(stages, prefetch); - } + tv0c->circularBuffer(stages, prefetch); + tv1c->circularBuffer(stages, prefetch); auto inputs = matmulAtInput3DHopperSS(M, N, K, layout, data_type_to_aten(dtype)); @@ -3825,9 +3819,9 @@ TEST_F(HopperMatmulTest, HSH_NT_128BSwizzle_NoBroadcasts) { Fusion fusion; FusionGuard fg(&fusion); - // constexpr int64_t M = 2048, N = 2048, K = 8192; + constexpr int64_t M = 2048, N = 2048, K = 8192; constexpr auto macro = MmaMacro::Hopper_64_256_16; - // constexpr auto layout = MmaLayout::NT; // [K, M] x [K, N] -> [M, N] + constexpr auto layout = MmaLayout::NT; // [K, M] x [K, N] -> [M, N] constexpr auto swizzle = MmaInputSmemSwizzle::B128; const auto dtype = DataType::Half; @@ -3948,8 +3942,31 @@ TEST_F(HopperMatmulTest, HSH_NT_128BSwizzle_NoBroadcasts) { } tv3->axis(-1)->parallelize(ParallelType::Vectorize); + { + // Check using a copy that improperly aligned axis are not inlined + Fusion tmp_fusion; + IrCloner ir_cloner = Fusion::copy(&fusion, &tmp_fusion); + FusionGuard tmp_fg(&tmp_fusion); + // [Mo, No, Ko, Mio, Nio, Mii, Nii, Ki] + // Swap the No and Ko axes, but only in tv2, the mma output + // [Mo, Ko, No, Mio, Nio, Mii, Nii, Ki] + // This should mean the smem operands are now inlined at position 1 instead + // of 3 + ir_cloner.clone(tv2)->reorder({{2, 1}, {1, 2}}); + inlineMost(); + ir_cloner.clone(tv2)->reorder({{2, 1}, {1, 2}}); + EXPECT_EQ(ir_cloner.clone(tv0c)->getComputeAtPosition(), 1); + // The outermost loop dim of tv1c is a broadcast Mo axis, so + // tv1c->inlineAt(1) does not inline past that axis and we wind up with + // compute-at position 0. + EXPECT_EQ(ir_cloner.clone(tv1c)->getComputeAtPosition(), 0); + } + inlineMost(); + EXPECT_EQ(tv0c->getComputeAtPosition(), 3); + EXPECT_EQ(tv1c->getComputeAtPosition(), 3); + if (stages > 1) { tv0c->circularBuffer(stages, prefetch); tv1c->circularBuffer(stages, prefetch); @@ -3963,7 +3980,17 @@ TEST_F(HopperMatmulTest, HSH_NT_128BSwizzle_NoBroadcasts) { pred_checker.handle(kernel->topLevelExprs()); ASSERT_TRUE(pred_checker.found_mma); - // TODO: compile and run kernel once inlining is fixed + auto [A3d, B3d] = + matmulAtInput3DHopperSS(M, N, K, layout, data_type_to_aten(dtype)); + at::Tensor A = A3d.squeeze(); + at::Tensor B = B3d.squeeze(); + std::vector inputs{A, B}; + + KernelExecutor ke; + ke.compile(&fusion, inputs, LaunchParams(), matmul_cparams); + auto cg_outputs = ke.run(inputs); + auto tref = atMatmul(A, B, layout); + EXPECT_TRUE(at::allclose(cg_outputs[0], tref, 1e-5, 1e-5)); } } // namespace nvfuser diff --git a/tests/cpp/test_matmul_scheduler.cpp b/tests/cpp/test_matmul_scheduler.cpp index fa80a096dce..3058ce59ad7 100644 --- a/tests/cpp/test_matmul_scheduler.cpp +++ b/tests/cpp/test_matmul_scheduler.cpp @@ -3197,9 +3197,6 @@ class HopperMatmulSchedulerTest mparams.circular_buffer_options.circular_buffer_smem_write = true; mparams.circular_buffer_options.circular_buffer_smem_read = true; mparams.circular_buffer_options.smem_circular_buffer_stage = 4; - - // TODO Create prefetch parameter - // mparams.circular_buffer_options.smem_circular_buffer_prefetch = 3; } void TearDown() { diff --git a/tests/cpp/test_multidevice_lower_communication.cpp b/tests/cpp/test_multidevice_lower_communication.cpp index d1f06d80e1d..d89f0a3f3a4 100644 --- a/tests/cpp/test_multidevice_lower_communication.cpp +++ b/tests/cpp/test_multidevice_lower_communication.cpp @@ -7,6 +7,7 @@ // clang-format on #include +#include #include #include @@ -16,15 +17,23 @@ namespace nvfuser { +using testing::Each; +using testing::IsTrue; +using testing::Pointer; +using testing::Property; + namespace { void assertIsCompiledToHostIrContainer( const FusionExecutorCache& executor_cache) { FusionKernelRuntime* runtime = executor_cache.getMostRecentKernelRuntime(); - EXPECT_TRUE(runtime->executors().size() == 1); - for (const auto& ea : runtime->executors()) { - EXPECT_TRUE(ea->isA()) - << "failed to compile to a HostIrContainer with Communications"; - } + EXPECT_EQ(runtime->executors().size(), 1); + EXPECT_THAT( + runtime->executors(), + Each(Pointer(Property( + "is a HostIrExecutor", + &ExecutorAbstract::isA, + IsTrue())))) + << "failed to compile to a HostIrContainer with Communications"; } } // namespace diff --git a/tests/cpp/test_resize.cpp b/tests/cpp/test_resize.cpp index 7d9e357d18b..40dff5237b1 100644 --- a/tests/cpp/test_resize.cpp +++ b/tests/cpp/test_resize.cpp @@ -55,7 +55,7 @@ void checkLoopDomainEquivalence( } // namespace -using ResizeTest = NVFuserFixtureParamTest; +using ResizeTest = NVFuserTest; using testing::Each; using testing::HasSubstr; @@ -64,14 +64,8 @@ using testing::Property; using testing::ThrowsMessage; using testing::UnorderedElementsAre; -INSTANTIATE_TEST_SUITE_P( - , - ResizeTest, - testing::Bool(), - testing::PrintToStringParamName()); - // Simple pad test -TEST_P(ResizeTest, Pad1) { +TEST_F(ResizeTest, Pad1) { Fusion fusion; FusionGuard fg(&fusion); @@ -89,11 +83,7 @@ TEST_P(ResizeTest, Pad1) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -105,7 +95,7 @@ TEST_P(ResizeTest, Pad1) { } // pad + split -TEST_P(ResizeTest, Pad2) { +TEST_F(ResizeTest, Pad2) { Fusion fusion; FusionGuard fg(&fusion); @@ -125,11 +115,7 @@ TEST_P(ResizeTest, Pad2) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -141,7 +127,7 @@ TEST_P(ResizeTest, Pad2) { } // pad, merge + split, inlineMost -TEST_P(ResizeTest, Pad3) { +TEST_F(ResizeTest, Pad3) { Fusion fusion; FusionGuard fg(&fusion); @@ -178,11 +164,7 @@ TEST_P(ResizeTest, Pad3) { std::vector aten_inputs({t0, t1}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -192,7 +174,7 @@ TEST_P(ResizeTest, Pad3) { } // pad + parallelization -TEST_P(ResizeTest, Pad4) { +TEST_F(ResizeTest, Pad4) { Fusion fusion; FusionGuard fg(&fusion); @@ -212,11 +194,7 @@ TEST_P(ResizeTest, Pad4) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -228,7 +206,7 @@ TEST_P(ResizeTest, Pad4) { } // pad + parallelization + RAW sync -TEST_P(ResizeTest, Pad5) { +TEST_F(ResizeTest, Pad5) { Fusion fusion; FusionGuard fg(&fusion); @@ -267,11 +245,7 @@ TEST_P(ResizeTest, Pad5) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -283,7 +257,7 @@ TEST_P(ResizeTest, Pad5) { } // pad + merge + split parallelization -TEST_P(ResizeTest, Pad6) { +TEST_F(ResizeTest, Pad6) { Fusion fusion; FusionGuard fg(&fusion); @@ -318,11 +292,7 @@ TEST_P(ResizeTest, Pad6) { std::vector aten_inputs({t0, t1}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -333,7 +303,7 @@ TEST_P(ResizeTest, Pad6) { // pad + unswitch. Having different extents in an unswitched loop nest // needs a special care (see UnrollPass::canOmitElseClause) -TEST_P(ResizeTest, Pad7) { +TEST_F(ResizeTest, Pad7) { Fusion fusion; FusionGuard fg(&fusion); @@ -369,11 +339,7 @@ TEST_P(ResizeTest, Pad7) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -430,7 +396,7 @@ TEST_F(ResizeTest, Pad8) { } #endif -TEST_P(ResizeTest, PadScheduler1) { +TEST_F(ResizeTest, PadScheduler1) { auto fusion = std::make_unique(); FusionGuard fg(fusion.get()); @@ -448,11 +414,7 @@ TEST_P(ResizeTest, PadScheduler1) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -462,7 +424,7 @@ TEST_P(ResizeTest, PadScheduler1) { NVF_CHECK(ref.equal(cg_outputs[0])); } -TEST_P(ResizeTest, PadScheduler2) { +TEST_F(ResizeTest, PadScheduler2) { auto fusion_ptr = std::make_unique(); auto& fusion = *fusion_ptr; FusionGuard fg(fusion_ptr.get()); @@ -487,11 +449,7 @@ TEST_P(ResizeTest, PadScheduler2) { std::vector aten_inputs({t0, t1}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion_ptr)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -540,7 +498,7 @@ TEST_F(ResizeTest, PadScheduler3) { // Two pad exprs, both using the same symbolic pad widths, segmented // into two kernels. Make sure the symbolic inputs are available to // both of the segmented kernels. -TEST_P(ResizeTest, PadScheduler4) { +TEST_F(ResizeTest, PadScheduler4) { auto fusion = std::make_unique(); FusionGuard fg(fusion.get()); @@ -569,11 +527,7 @@ TEST_P(ResizeTest, PadScheduler4) { std::vector aten_inputs({t0, 1, 1}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -584,7 +538,7 @@ TEST_P(ResizeTest, PadScheduler4) { // Pad a broadcast // See https://github.com/NVIDIA/Fuser/issues/798 -TEST_P(ResizeTest, PadBroadcastInput) { +TEST_F(ResizeTest, PadBroadcastInput) { auto fusion = std::make_unique(); FusionGuard fg(fusion.get()); @@ -609,11 +563,7 @@ TEST_P(ResizeTest, PadBroadcastInput) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -1437,7 +1387,7 @@ TEST_F(ResizeTest, SliceExtentSimplification) { << "Unexpected resize output extent: " << resize_extent->toInlineString(); } -TEST_P(ResizeTest, PadReduceScheduler1) { +TEST_F(ResizeTest, PadReduceScheduler1) { auto fusion_ptr = std::make_unique(); auto& fusion = *fusion_ptr; FusionGuard fg(fusion_ptr.get()); @@ -1472,11 +1422,7 @@ TEST_P(ResizeTest, PadReduceScheduler1) { [](auto pad_extent) { return pad_extent; }); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion_ptr)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -1761,7 +1707,7 @@ TEST_F(ResizeTest, SoftmaxSliceScheduler2) { } // Same as Pad1 but pad by specified value -TEST_P(ResizeTest, PadWithValue) { +TEST_F(ResizeTest, PadWithValue) { Fusion fusion; FusionGuard fg(&fusion); @@ -1782,11 +1728,7 @@ TEST_P(ResizeTest, PadWithValue) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -1798,7 +1740,7 @@ TEST_P(ResizeTest, PadWithValue) { } // Same as Pad1 but pad by negative value to create an empty tensor -TEST_P(ResizeTest, PadToEmptyTensor) { +TEST_F(ResizeTest, PadToEmptyTensor) { auto fusion = std::make_unique(); FusionGuard fg(fusion.get()); @@ -1821,11 +1763,7 @@ TEST_P(ResizeTest, PadToEmptyTensor) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -1836,7 +1774,7 @@ TEST_P(ResizeTest, PadToEmptyTensor) { } // Test that padding Half tensor by Double does not promote output -TEST_P(ResizeTest, PadHalfWithDoubleValue) { +TEST_F(ResizeTest, PadHalfWithDoubleValue) { Fusion fusion; FusionGuard fg(&fusion); @@ -1857,11 +1795,7 @@ TEST_P(ResizeTest, PadHalfWithDoubleValue) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -2421,7 +2355,7 @@ TEST_F(ResizeTest, SliceVectorization) { // Concretize a symbolic pad that results in a broadcast (static pads) // In this test, the sizes and pad widths are static, so there should be nothing // to concretize. -TEST_P(ResizeTest, ResizePadToBroadcastStatic) { +TEST_F(ResizeTest, ResizePadToBroadcastStatic) { std::vector t0_size = {2, 3, 2, 5, 6}; std::vector t1_size = {2, 4, 4, 3, 5}; // Note there are only 8 input scalars for 5D input. Implicit no-pad of dim 0 @@ -2471,11 +2405,7 @@ TEST_P(ResizeTest, ResizePadToBroadcastStatic) { std::vector aten_inputs({t0, t1}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -2495,7 +2425,7 @@ TEST_P(ResizeTest, ResizePadToBroadcastStatic) { } // Concretize a symbolic pad that results in a broadcast (dynamic pads) -TEST_P(ResizeTest, ResizePadToBroadcastDynamic) { +TEST_F(ResizeTest, ResizePadToBroadcastDynamic) { auto fusion = std::make_unique(); FusionGuard fg(fusion.get()); @@ -2543,11 +2473,7 @@ TEST_P(ResizeTest, ResizePadToBroadcastDynamic) { aten_inputs.insert(aten_inputs.end(), pad_widths.begin(), pad_widths.end()); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -2569,7 +2495,7 @@ TEST_P(ResizeTest, ResizePadToBroadcastDynamic) { } // See https://github.com/NVIDIA/Fuser/issues/596 -TEST_P(ResizeTest, ResizePadToBroadcastIssue596) { +TEST_F(ResizeTest, ResizePadToBroadcastIssue596) { auto fusion = std::make_unique(); FusionGuard fg(fusion.get()); @@ -2592,11 +2518,7 @@ TEST_P(ResizeTest, ResizePadToBroadcastIssue596) { std::vector aten_inputs({t0, t1}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); auto args = KernelArgumentHolder::createKernelArgumentHolder(aten_inputs); FusionKernelRuntime runtime(std::move(fusion), args); @@ -3091,7 +3013,7 @@ TEST_F(ResizeTest, SliceAndReshapeRepro540Manual) { // Test concretizing a pad that follows a reshape. This requires the // ExpressionEvaluator used in concretization to propagate shapes properly // across symbolic reshapes in order to infer the size of the downstream pad. -TEST_P(ResizeTest, ReshapeToPad) { +TEST_F(ResizeTest, ReshapeToPad) { std::unique_ptr fusion_ptr = std::make_unique(); Fusion& fusion = *fusion_ptr.get(); FusionGuard fg(&fusion); @@ -3113,11 +3035,7 @@ TEST_P(ResizeTest, ReshapeToPad) { fusion.addOutput(tv2); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion_ptr)); @@ -3262,7 +3180,7 @@ TEST_F(ResizeTest, CatOfExpandedBroadcast) { // padded in the empty dim as well as the expanded dims. // This should match test_python_frontend.py::test_pad_expanded_empty // See https://github.com/NVIDIA/Fuser/issues/870 -TEST_P(ResizeTest, PadExpandedEmpty) { +TEST_F(ResizeTest, PadExpandedEmpty) { auto fusion_ptr = std::make_unique(); auto& fusion = *fusion_ptr; FusionGuard fg(&fusion); @@ -3296,11 +3214,7 @@ TEST_P(ResizeTest, PadExpandedEmpty) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); FusionExecutorCache executor_cache(std::move(fusion_ptr)); auto cg_outputs = executor_cache.runFusionWithInputs(aten_inputs); @@ -3311,7 +3225,7 @@ TEST_P(ResizeTest, PadExpandedEmpty) { // Test that we can pad properly along broadcast dims // See https://github.com/NVIDIA/Fuser/issues/868 -TEST_P(ResizeTest, PadOfBroadcast) { +TEST_F(ResizeTest, PadOfBroadcast) { Fusion fusion; FusionGuard fg(&fusion); @@ -3329,11 +3243,7 @@ TEST_P(ResizeTest, PadOfBroadcast) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); @@ -3344,7 +3254,7 @@ TEST_P(ResizeTest, PadOfBroadcast) { // Test that we can cat along broadcast dims that have been expanded // See https://github.com/NVIDIA/Fuser/issues/868 -TEST_P(ResizeTest, PadOfExpandedBroadcast) { +TEST_F(ResizeTest, PadOfExpandedBroadcast) { Fusion fusion; FusionGuard fg(&fusion); @@ -3365,11 +3275,7 @@ TEST_P(ResizeTest, PadOfExpandedBroadcast) { std::vector aten_inputs({t0}); EnableOptionsGuard enable_options_guard; - if (GetParam()) { - EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); - } else { - EnableOptionsGuard::getCurOptions().unset(EnableOption::IdModel); - } + EnableOptionsGuard::getCurOptions().set(EnableOption::IdModel, {"all"}); KernelExecutor ke; ke.compile(&fusion, aten_inputs); diff --git a/tests/python/test_communication.py b/tests/python/test_communication.py index d0cea846669..75a94f6cec4 100644 --- a/tests/python/test_communication.py +++ b/tests/python/test_communication.py @@ -15,13 +15,13 @@ @pytest.mark.mpi def test_allgather(mpi_test): - num_devices = mpi_test.size - mesh = nvfuser.DeviceMesh(range(num_devices)) + d = mpi_test.size + mesh = nvfuser.DeviceMesh(range(d)) class Model(FusionDefinition): def definition(self): self.inp = self.define_tensor( - (num_devices * 4,), contiguity=True, dtype=DataType.Float + (d * 4,), contiguity=True, dtype=DataType.Float ) self.out = self.ops.set(self.inp) self.add_output(self.out) @@ -30,16 +30,116 @@ def multidevice_schedule(self): self.sched._set_device_mesh(self.inp, mesh) self.sched._set_device_mesh(self.out, mesh) - self.sched.split(self.inp, 0, num_devices, False) + self.sched.split(self.inp, 0, d, False) self.sched.parallelize(self.inp, 0, nvfuser.ParallelType.mesh_x) self.sched.set_allocation_as_loop(self.inp) - self.sched.split(self.out, 0, num_devices, False) + self.sched.split(self.out, 0, d, False) self.sched.set_allocation_as_loop(self.out) - unsharded = torch.randn(num_devices * 4) + unsharded = torch.randn(d * 4) sharded = mpi_test.shard_tensor(unsharded, 0, mesh) fd = Model() outputs = fd.execute([sharded]) torch.testing.assert_close(outputs[0].cpu(), unsharded) + + +@pytest.mark.mpi +def test_allreduce(mpi_test): + d = mpi_test.size + mesh = nvfuser.DeviceMesh(range(d)) + + class Model(FusionDefinition): + def definition(self): + self.inp = self.define_tensor((d, 4), contiguity=True, dtype=DataType.Float) + self.out = self.ops.sum(self.inp, [0]) + self.add_output(self.out) + + def multidevice_schedule(self): + self.sched._set_device_mesh(self.inp, mesh) + self.sched._set_device_mesh(self.out, mesh) + + self.sched.parallelize(self.inp, 0, nvfuser.ParallelType.mesh_x) + + unsharded = torch.randn(d, 4) + sharded = mpi_test.shard_tensor(unsharded, 0, mesh) + + fd = Model() + outputs = fd.execute([sharded]) + torch.testing.assert_close(outputs[0].cpu(), unsharded.sum(0)) + + +@pytest.mark.mpi +def test_reduce_scatter(mpi_test): + d = mpi_test.size + mesh = nvfuser.DeviceMesh(range(d)) + + class Model(FusionDefinition): + def definition(self): + self.inp = self.define_tensor( + (d, d * 4), contiguity=True, dtype=DataType.Float + ) + self.out = self.ops.sum(self.inp, [0]) + self.add_output(self.out) + + def multidevice_schedule(self): + self.sched._set_device_mesh(self.inp, mesh) + self.sched._set_device_mesh(self.out, mesh) + + self.sched.parallelize(self.inp, 0, nvfuser.ParallelType.mesh_x) + + self.sched.split(self.out, -1, d, False) + self.sched.parallelize(self.out, -2, nvfuser.ParallelType.mesh_x) + self.sched.set_allocation_as_loop(self.out) + + unsharded = torch.randn(d, d * 4) + sharded = mpi_test.shard_tensor(unsharded, 0, mesh) + + fd = Model() + outputs = fd.execute([sharded]) + torch.testing.assert_close( + outputs[0], mpi_test.shard_tensor(unsharded.sum(0), 0, mesh) + ) + + +@pytest.mark.mpi +def test_reduce_scatter_noncontiguous(mpi_test): + d = mpi_test.size + mesh = nvfuser.DeviceMesh(range(d)) + + class Model(FusionDefinition): + def definition(self): + self.inp = self.define_tensor( + (d, 3, d * 4), contiguity=True, dtype=DataType.Float + ) + self.out = self.ops.sum(self.inp, [0]) + self.add_output(self.out) + + def multidevice_schedule(self): + self.sched._set_device_mesh(self.inp, mesh) + self.sched._set_device_mesh(self.out, mesh) + + # inp: [iDID{d}, i{3}, i{d*4}] + # out: [r{d}, i{3}, i{d*4}] + # / \ + # iDID{d} i{4} + # + # Unlike test_reduce_scatter, this leads to extra data copy because + # the scattered axis is not outermost in allocation. + # ProcessGroupNCCL::reduce_scatter was able to handle + # non-contiguous scattering in a functional but suboptimal way. + self.sched.parallelize(self.inp, 0, nvfuser.ParallelType.mesh_x) + + self.sched.split(self.out, -1, d, False) + self.sched.parallelize(self.out, -2, nvfuser.ParallelType.mesh_x) + self.sched.set_allocation_as_loop(self.out) + + unsharded = torch.randn(d, 3, d * 4) + sharded = mpi_test.shard_tensor(unsharded, 0, mesh) + + fd = Model() + outputs = fd.execute([sharded]) + torch.testing.assert_close( + outputs[0], mpi_test.shard_tensor(unsharded.sum(0), 1, mesh) + )