Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build Segments for User Schedule Segmentation #3334

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ if(BUILD_PYTHON)
${NVFUSER_SRCS_DIR}/python_frontend/fusion_cache.cpp
${NVFUSER_SRCS_DIR}/python_frontend/fusion_definition.cpp
${NVFUSER_SRCS_DIR}/python_frontend/fusion_state.cpp
${NVFUSER_SRCS_DIR}/python_frontend/segmentation.cpp
${NVFUSER_SRCS_DIR}/python_frontend/translation.cpp
${NVFUSER_SRCS_DIR}/python_frontend/translation_utils.cpp
${NVFUSER_SRCS_DIR}/serde/fusion_record.cpp
Expand Down
16 changes: 16 additions & 0 deletions csrc/python_frontend/fusion_definition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <fusion_profiler.h>
#include <instrumentation.h>
#include <options.h>
#include <preseg_passes/pre_segmenter.h>
#include <python_frontend/fusion_cache.h>
#include <python_frontend/fusion_definition.h>
#include <python_frontend/translation.h>
Expand Down Expand Up @@ -673,4 +674,19 @@ std::vector<std::pair<double, double>> FusionDefinition::getValTolerances(
return get_val_constants(preschedFusion(), inputs);
}

int64_t FusionDefinition::setupSegmentation(
const at::ArrayRef<c10::IValue>& inputs) {
NVF_CHECK(id().has_value(), "FusionDefinition definition does not exist!");
NVF_ERROR(
segmentation_state_ == nullptr, "SegmentationState already exists!");
segmentation_state_ = std::make_unique<SegmentationState>();
return segmentation_state_->setupSegmentation(
preschedFusion(), map_value_to_fid_, inputs);
}

void FusionDefinition::finalizeSegmentation() {
// Destroy SegmentedState
segmentation_state_.reset();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a design question. What's the point of separating this into two steps?

Doesn't look like anything else is happening between setupSegmentation() to finalizeSegmentation().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3335 adds the buildSegmentation() to translate the CPP segments to their corresponding python definition.

We still need to destroy the segmentation_state_ in this PR.


} // namespace nvfuser::python_frontend
16 changes: 14 additions & 2 deletions csrc/python_frontend/fusion_definition.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
// clang-format on
#pragma once
#include <exceptions.h>
#include <functional>
#include <iostream>

#include <python_frontend/fusion_state.h>
#include <runtime/fusion_executor_cache.h>
#include <python_frontend/segmentation.h>
#include <visibility.h>
#include <unordered_map>

namespace nvfuser::python_frontend {

Expand All @@ -20,8 +22,9 @@ class FusionDefinition;
class FusionInterface;
class FusionState;
struct RecordFunctor;
struct UserSchedule;
class SegmentationState;
struct TrieNode;
struct UserSchedule;

//! This is helper function used to print a python formated
//! Fusion IR DataType when printing a fusion definition.
Expand Down Expand Up @@ -254,6 +257,12 @@ class NVF_API FusionDefinition : public FusionState {
//! Get all Tensors in FusionState.
NVF_API std::vector<Tensor> tensors();

//! Run segmentation algorithm on FusionDefinition. Returns the number of
//! segments.
NVF_API int64_t setupSegmentation(const at::ArrayRef<c10::IValue>& inputs);
//! After creating segments, destroy SegmentationState.
NVF_API void finalizeSegmentation();

private:
//! Returns the FusionCache Ptr that holds the cache of Fusions
FusionCache* fusionCache() const;
Expand Down Expand Up @@ -288,6 +297,9 @@ class NVF_API FusionDefinition : public FusionState {
UserSchedule* user_sched_;
//! Number of recording_states_ before applying user schedule
int64_t num_recording_states_presched_ = 0;
//! Data member that creates SegmentedFusion from cloned, prescheduled Fusion
//! then translates the segments to python FusionDefinitions.
std::unique_ptr<SegmentationState> segmentation_state_;

public:
//! The Operators are not directly defined in this header. They are defined
Expand Down
42 changes: 21 additions & 21 deletions csrc/python_frontend/fusion_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ std::ostream& operator<<(std::ostream& os, const State& state) {
return os;
}

std::vector<Val*> getExtents(Fusion* fusion) {
rdspring1 marked this conversation as resolved.
Show resolved Hide resolved
NVF_CHECK(fusion != nullptr, "Fusion is undefined.");

std::vector<Val*> extents;
for (Val* v : fusion->inputs()) {
// short-circuit: skip if not TensorView
if (!v->isA<TensorView>()) {
continue;
}
TensorView* tv = v->as<TensorView>();
std::vector<IterDomain*> logical_dom =
TensorDomain::noReductions(tv->getLogicalDomain());
std::transform(
logical_dom.begin(),
logical_dom.end(),
std::back_inserter(extents),
[](IterDomain* id) { return id->getMaybeExpandedExtent(); });
}
return extents;
}

FusionState::FusionState()
: end_record_(new EndRecord()),
recording_(),
Expand Down Expand Up @@ -249,27 +270,6 @@ const std::vector<int64_t>& FusionState::extents() const {
return extents_fid_;
}

std::vector<Val*> FusionState::getExtents(Fusion* fusion) {
NVF_CHECK(fusion != nullptr, "Fusion is undefined.");

std::vector<Val*> extents;
for (Val* v : fusion->inputs()) {
// short-circuit: skip if not TensorView
if (!v->isA<TensorView>()) {
continue;
}
TensorView* tv = v->as<TensorView>();
std::vector<IterDomain*> logical_dom =
TensorDomain::noReductions(tv->getLogicalDomain());
std::transform(
logical_dom.begin(),
logical_dom.end(),
std::back_inserter(extents),
[](IterDomain* id) { return id->getMaybeExpandedExtent(); });
}
return extents;
}

void FusionState::addExtents() {
NVF_CHECK(fusion_ != nullptr, "Fusion is undefined.");

Expand Down
5 changes: 3 additions & 2 deletions csrc/python_frontend/fusion_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ struct State {

NVF_API std::ostream& operator<<(std::ostream& os, const State& state);

//! Get extents for TensorView inputs in Fusion
std::vector<Val*> getExtents(Fusion* fusion);

//! FusionState contains the information used to build a new cpp Fusion object.
//! Unlike FusionDefinition, it does not modify the FusionCache Trie structure.
class FusionState {
Expand Down Expand Up @@ -103,8 +106,6 @@ class FusionState {
std::unique_ptr<FusionState> clone();

private:
//! Get extents for TensorView inputs in Fusion
std::vector<Val*> getExtents(Fusion* fusion);
//! Add extents of TensorView inputs to FusionState
void addExtents();
//! Change the fusion ptr and reset its state
Expand Down
29 changes: 29 additions & 0 deletions csrc/python_frontend/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,35 @@ void initNvFuserPythonBindings(PyObject* module) {
.def("inputs", [](FusionDefinition& self) { return self.inputs(); })
.def("outputs", [](FusionDefinition& self) { return self.outputs(); })
.def("extents", [](FusionDefinition& self) { return self.extents(); })
.def(
"_setup_segmentation",
[](FusionDefinition& self, const py::iterable& iter) {
// Instrumentation to mark the beginning of segmentation
inst::Trace::instance()->beginEvent(
"FusionDefinition Segmentation");
std::vector<c10::IValue> inputs;
for (py::handle obj : iter) {
// Allows for a Vector of Sizes to be inputed as a list/tuple
if (py::isinstance<py::list>(obj) ||
py::isinstance<py::tuple>(obj)) {
for (py::handle item : obj) {
inputs.push_back(
torch::jit::toIValue(item, c10::AnyType::get()));
}
} else {
inputs.push_back(
torch::jit::toIValue(obj, c10::AnyType::get()));
}
}
return self.setupSegmentation(inputs);
})
.def(
"_finalize_segmentation",
[](FusionDefinition& self) {
self.finalizeSegmentation();
// Mark the end of segmentation
inst::Trace::instance()->endEvent(nullptr);
})
.def(
"__repr__",
[](FusionDefinition& self) {
Expand Down
143 changes: 143 additions & 0 deletions csrc/python_frontend/segmentation.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// clang-format off
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-present NVIDIA CORPORATION & AFFILIATES.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on
#include <python_frontend/fusion_definition.h>
#include <python_frontend/translation.h>

namespace nvfuser::python_frontend {

int64_t SegmentationState::setupSegmentation(
Fusion* fusion,
const std::unordered_map<const Val*, int64_t>& map_value_to_original_fid,
const at::ArrayRef<c10::IValue>& inputs) {
// Check state
NVF_ERROR(fusion != nullptr);
NVF_ERROR(cloned_fusion_ == nullptr);
NVF_ERROR(segmented_fusion_ == nullptr);
NVF_ERROR(group_run_order_.empty());
NVF_ERROR(map_cloned_value_to_fid_.empty());
NVF_ERROR(cloned_extents_.empty());

int8_t device = getCommonDeviceCUDA(inputs);
NVF_CHECK(
inputs.empty() || device > -1, "Inputs are not all on the same device!");

// Step 1) Clone preschedFusion CPP Fusion.
cloned_fusion_ = std::make_unique<Fusion>();

// The IRCloner returned by Fusion::copy acts as map from the original fusion
// to the cloned fusion.
IrCloner original_to_cloned_map = Fusion::copy(fusion, cloned_fusion_.get());

KernelArgumentHolder args =
KernelArgumentHolder::createKernelArgumentHolder(inputs, device);

// Step 2) Concretize fusion with input arguments.
std::unordered_map<Val*, Val*> symbolic_to_concrete_map =
DynamicTransform::concretizeFusion(cloned_fusion_.get(), args);

// Step 3) Given the map_value_to_original_fid, the IRCloner returned by
// Fusion::copy, AND the symbolic_to_concrete map returned by
// concretization pass, create a mapping from cloned Vals to original fusion
// state indices.
std::transform(
map_value_to_original_fid.begin(),
map_value_to_original_fid.end(),
std::inserter(map_cloned_value_to_fid_, map_cloned_value_to_fid_.end()),
[&](const auto& item) {
const Val* original_value = item.first;
int64_t fid = item.second;
Val* cloned_val = original_to_cloned_map.clone(original_value);
if (symbolic_to_concrete_map.count(cloned_val)) {
cloned_val = symbolic_to_concrete_map.at(cloned_val);
}
return std::make_pair(cloned_val, fid);
});

// Track the extents for input TensorViews in cloned CPP Fusion.
cloned_extents_ = getExtents(cloned_fusion_.get());

// Create runtime infomation
SchedulerRuntimeInfo runtime_info(
cloned_fusion_.get(),
args,
/*precomputed_values=*/nullptr,
cloned_fusion_->allTvs());

// Run segmentation algorithm
segmented_fusion_ = SegmentCandidateFinder::segment(
std::move(cloned_fusion_), &args, runtime_info);

// Get the order for fusion segments
prepareGroupOrder();

// Return the number of segments created by segmentation algorithm.
return (int64_t)segmented_fusion_->groups().size();
}

void SegmentationState::prepareGroupOrder() {
Priya2698 marked this conversation as resolved.
Show resolved Hide resolved
NVF_ERROR(segmented_fusion_ != nullptr);

// Gather initial inputs for SegmentedFusion.
std::unordered_set<Val*> available_input(
segmented_fusion_->inputs().begin(), segmented_fusion_->inputs().end());

// The size of the tensor dimensions can be used as an input of the segments.
// NvFuser does not support returning scalar values. Segmentation must pass
// those sizes as segment arguments manually.
std::vector<Val*> extents = getExtents(segmented_fusion_->completeFusion());
std::copy(
extents.begin(),
extents.end(),
std::inserter(available_input, available_input.end()));

// Track the run status of all SegmentedGroups in SegmentedFusion
std::vector<bool> group_ran(segmented_fusion_->groups().size(), false);

// While not all the SegmentedGroups are run:
while (!std::all_of(
group_ran.begin(), group_ran.end(), [](bool b) { return b; })) {
bool ran_any_group = false;

// Find the first segment with all inputs available to run
for (size_t group_i : c10::irange(segmented_fusion_->groups().size())) {
SegmentedGroup* group = segmented_fusion_->groups().at(group_i);

// short-circuit: Already ran this segmented group.
if (group_ran.at(group_i)) {
continue;
}

const std::vector<Val*>& group_inputs = group->inputs();
bool ready_to_run = std::all_of(
group_inputs.begin(),
group_inputs.end(),
[&available_input](Val* val) { return available_input.count(val); });

// short-circuit: This segmented group is not ready to run.
if (!ready_to_run) {
continue;
}

// Add SegmentedGroup to group_run_order_.
group_run_order_.push_back(group);

// Mark all outputs of SegmentedGroup as ready.
const std::vector<Val*>& group_outputs = group->outputs();
for (size_t group_out_i : c10::irange(group_outputs.size())) {
available_input.insert(group_outputs.at(group_out_i));
}
group_ran[group_i] = true;
ran_any_group = true;
}
NVF_ERROR(
ran_any_group,
"Failed to run any group; An error must have occured in segmentation.");
}
}

} // namespace nvfuser::python_frontend
Loading
Loading