Skip to content

Commit

Permalink
Presegmentation pass to force segment breaks when inplace update can …
Browse files Browse the repository at this point in the history
…cause RW race (#2999)

Issue #2664

A RW race can occur when an intermediate tensorview is aliased to a
fusion input and the intermediate tensorview or the aliased input is a
producer/consumer of a broadcast op. This presegmentation pass traverses
the fusion to find such inplace updates, and inserts `segmet_set + set`
to force the inplace update into a separate copy kernel. This ensures
that the write to the fusion input only occurs when all the reads of
that fusion input have concluded.

---------

Co-authored-by: jjsjann123 <[email protected]>
  • Loading branch information
Priya2698 and jjsjann123 authored Oct 28, 2024
1 parent 628a47e commit f097e58
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 1 deletion.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ list(APPEND NVFUSER_SRCS
${NVFUSER_SRCS_DIR}/preseg_passes/remove_bcast_squeeze.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/remove_empty.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/reorder_sharded_axis.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/segment_inplace_update.cpp
${NVFUSER_SRCS_DIR}/rng.cpp
${NVFUSER_SRCS_DIR}/runtime/allocations.cpp
${NVFUSER_SRCS_DIR}/runtime/executor.cpp
Expand Down
2 changes: 2 additions & 0 deletions csrc/preseg_passes/pre_segmenter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <preseg_passes/remove_bcast_squeeze.h>
#include <preseg_passes/remove_empty.h>
#include <preseg_passes/reorder_sharded_axis.h>
#include <preseg_passes/segment_inplace_update.h>

namespace nvfuser::preseg_passes {

Expand Down Expand Up @@ -65,6 +66,7 @@ namespace nvfuser::preseg_passes {
OptimizationPass<ExactMappedExtentSubstitutionPass>::runPass(fusion);
OptimizationPass<AllocationDomainPass>::runPass(fusion);
OptimizationPass<RemoveBcastSqueeze>::runPass(fusion);
OptimizationPass<SegmentInplaceUpdatePass>::runPass(fusion);
}

} // namespace nvfuser::preseg_passes
156 changes: 156 additions & 0 deletions csrc/preseg_passes/segment_inplace_update.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// clang-format off
/*
* SPDX-FileCopyrightText: Copyright (c) 2024-present NVIDIA CORPORATION & AFFILIATES.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on
#include <deque>
#include <vector>

#include <fusion.h>
#include <id_model/id_model.h>
#include <id_model/to_string.h>
#include <ir/utils.h>
#include <ops/alias.h>
#include <preseg_passes/segment_inplace_update.h>

namespace nvfuser::preseg_passes {
// When an intermediate tensorview is aliased to a fusion input,
// a RW race occurs, when the intermediate tensorview or
// the aliased input is in path of a broadcast.
// This preseg pass :
// 1. Finds any tensorviews used in inplace updates (AllocationType:ReuseBuffer)
// in the fusion
// 2. Traverses the fusion graph starting from broadcast ops and stores all
// direct/indirect producer/consumer tensorviews.
// 3. For all aliased tensorviews, if the aliased tensorview or the aliased
// input is present in the set of visited tensorviews in step 2, we insert a
// segment set and set to force a separate copy kernel. Additionally,
// we check for implict broadcasts if any aliased input already has a
// broadcast dimension that is concretized later in the fusion. This ensures
// that all write operations to the fusion inputs occur after the read
// operations have completed. See Issue #2664: https://
// github.com/NVIDIA/Fuser/issues/2664
namespace {
void insertSegmentSet(Fusion* fusion) {
std::vector<TensorView*> aliased_tvs;

// Find all tensorviews which are used in inplace updates.
// Aliases will always be fusion outputs.
for (Val* out : fusion->outputs()) {
if (fusion->getOutputAlias(out->as<TensorView>()).type ==
AllocationType::ReuseBuffer) {
aliased_tvs.push_back(out->as<TensorView>());
}
}

// Return early if there is no inplace update
if (aliased_tvs.empty()) {
return;
}

// fusion->exprs() is a topologically sorted list. Filter out the broadcast
// ops from the list.
auto all_exprs = fusion->exprs();
auto all_bcast_ops = ir_utils::filterByType<BroadcastOp>(all_exprs);

// Traverse and store all direct/indirect consumer tensorviews of these
// broadcast nodes. If the tensorview has been visited, return --> this means
// that we have already traversed that branch
std::unordered_set<TensorView*> visited_tvs;
for (auto bcast_op : all_bcast_ops) {
std::deque<TensorView*> tvs_to_visit;
tvs_to_visit.push_back(bcast_op->output(0)->as<TensorView>());
while (!tvs_to_visit.empty()) {
TensorView* current_tv = tvs_to_visit.front();
tvs_to_visit.pop_front();
if (visited_tvs.count(current_tv)) {
continue;
}
visited_tvs.insert(current_tv);
std::vector<Expr*> current_tv_uses = current_tv->uses();
for (Expr* use : current_tv_uses) {
for (auto output_tv :
ir_utils::filterByType<TensorView>(use->outputs())) {
tvs_to_visit.push_back(output_tv->as<TensorView>());
}
}
}
}

// Traverse and store the direct/indirect producer tensorviews of these
// broadcast nodes If that tensorview has been visited, return.
for (auto bcast_op : all_bcast_ops) {
std::deque<TensorView*> tvs_to_visit;
tvs_to_visit.push_back(bcast_op->input(0)->as<TensorView>());
while (!tvs_to_visit.empty()) {
TensorView* current_tv = tvs_to_visit.front();
tvs_to_visit.pop_front();
if (visited_tvs.count(current_tv)) {
continue;
}
visited_tvs.insert(current_tv);
auto definition = current_tv->definition();
if (definition != nullptr) {
for (auto input_tv :
ir_utils::filterByType<TensorView>(definition->inputs())) {
tvs_to_visit.push_back(input_tv->as<TensorView>());
}
}
}
}

// Use permissive IdModel graph to identify any concretized broadcast
// iterdomain in any aliased input.
auto id_model = IdModel(fusion, /*build_graphs=*/false);
id_model.buildPermissiveGraph();
const ValGraph& permissive_graph =
id_model.idGraph(IdMappingMode::PERMISSIVE);

auto hasConcretizedBroadcast = [&](TensorView* tv) -> bool {
if (!tv->hasBroadcast()) {
return false;
}
for (IterDomain* id : tv->getLogicalDomain()) {
if (!id->isBroadcast()) {
continue;
}
if (!permissive_graph.hasGroup(id)) {
continue;
}
const ValGroup& val_group = permissive_graph.toGroup(id);
for (auto other_id : val_group.get()->vector()) {
if (!other_id->as<IterDomain>()->isBroadcast()) {
return true;
}
}
}
return false;
};

// For all aliased tensorviews:
// 1) if that tv or the corresponding aliased input is a producer/consumer of
// a broadcast op, or 2) the aliased input has a concretized broadcast, insert
// a (segment_set + set) to force the inplace update into a separate copy
// kernel. NOTE: We cannot use a segment_set alone. Since, there will be no
// data flow across this segment_set (the output of segment_set is an output
// of given fusion with no uses), it will be merged with other segments.
// https://github.com/NVIDIA/Fuser/blob/92b635125ae509cc6b2ccbe29e957586a9cbb059/csrc/fusion_segmenter.cpp#L2331-L2346
for (auto aliased_tv : aliased_tvs) {
TensorView* aliased_input =
fusion->getOutputAlias(aliased_tv).aliased_io->as<TensorView>();
if (visited_tvs.count(aliased_tv) || visited_tvs.count(aliased_input) ||
hasConcretizedBroadcast(aliased_input)) {
TensorView* alias_seg = segment_set(aliased_tv);
TensorView* alias_copy = set(alias_seg);
fusion->replaceOutput(aliased_tv, alias_copy);
}
}
}
} // namespace

void SegmentInplaceUpdatePass::runPass(Fusion* fusion) {
insertSegmentSet(fusion);
}
} // namespace nvfuser::preseg_passes
27 changes: 27 additions & 0 deletions csrc/preseg_passes/segment_inplace_update.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// clang-format off
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-present NVIDIA CORPORATION & AFFILIATES.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on
#pragma once

#include <preseg_passes/optimization_pass.h>

namespace nvfuser::preseg_passes {

//! RemoveEmptyPass removes intermediate empty tensors (those with at least one
//! extent zero thar are neither a fusion output or input).
class SegmentInplaceUpdatePass
: public OptimizationPass<SegmentInplaceUpdatePass> {
friend class OptimizationPass<SegmentInplaceUpdatePass>;

protected:
static void runPass(Fusion* fusion);
static std::string name() {
return "SegmentInplaceUpdate";
}
};

} // namespace nvfuser::preseg_passes
40 changes: 39 additions & 1 deletion tests/cpp/test_alias.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ir/utils.h>
#include <ops/alias.h>
#include <ops/arith.h>
#include <preseg_passes/segment_inplace_update.h>
#include <sys_utils.h>
#include <tests/cpp/utils.h>
#include <tests/cpp/validator.h>
Expand Down Expand Up @@ -1016,8 +1017,11 @@ TEST_F(AliasTest, ReuseBuffer_AliasAcrossSegments) {
testValidate(
fec.fusion(), outputs, {original_t0, t1, t2}, __LINE__, __FILE__);

// https://github.com/NVIDIA/Fuser/pull/2999 will cause 3 segments instead of
// the optimal 2 segments. Change back to 2 segments once
// https://github.com/NVIDIA/Fuser/issues/3251 is resolved.
EXPECT_EQ(
fec.getMostRecentKernelRuntime()->fusionSegments()->groups().size(), 2)
fec.getMostRecentKernelRuntime()->fusionSegments()->groups().size(), 3)
<< "segmentation didn't happen as expected";

auto t3 = original_t0.add(1.0);
Expand Down Expand Up @@ -1426,4 +1430,38 @@ TEST_F(AliasTest, Bookend_Issue2375) {
HeuristicIs(SchedulerType::InnerPersistent)));
}

// Repro for https://github.com/NVIDIA/Fuser/issues/2664
TEST_F(AliasTest, Issue2664) {
auto fusion = std::make_unique<Fusion>();
FusionGuard fg(fusion.get());

constexpr int64_t n = 4194304;
const DataType dtype = DataType::Float;
const std::vector<int64_t> input_shape = {n};

auto tv1 = makeContigTensor(1, dtype);
auto tv2 = makeContigTensor(0, dtype);
fusion->addInput(tv1);
fusion->addInput(tv2);

auto s3 = IrBuilder::create<Val>(1.0);
auto tv4 = add(tv2, s3);
auto tv5 = broadcast(tv4, {true});
auto tv7 = expand(tv5, {tv1->axis(0)->extent()});
auto tv8 = mul(tv1, tv7);
fusion->aliasOutputToInput(tv4, tv2, AllocationType::ReuseBuffer);
fusion->addOutput(tv8);

auto options =
at::TensorOptions().dtype(data_type_to_aten(dtype)).device(at::kCUDA, 0);
auto t1 = at::randn(input_shape, options);
auto t2 = at::randn({}, options);
auto aten_out = (t2 + 1.0) * t1;

FusionExecutorCache fec(std::move(fusion));
auto out_tensors = fec.runFusionWithInputs({t1, t2});
testValidate(
fec.fusion(), out_tensors, {t1, t2}, {aten_out}, __LINE__, __FILE__);
}

} // namespace nvfuser
Loading

0 comments on commit f097e58

Please sign in to comment.