diff --git a/lib/compiler/include/compiler/cost_estimator.h b/lib/compiler/include/compiler/cost_estimator.h new file mode 100644 index 0000000000..744d463721 --- /dev/null +++ b/lib/compiler/include/compiler/cost_estimator.h @@ -0,0 +1,25 @@ +#ifndef _FLEXFLOW_COMPILER_COST_ESTIMATOR_H +#define _FLEXFLOW_COMPILER_COST_ESTIMATOR_H + +#include "compiler/machine_mapping.h" +#include "cost_estimate.h" +#include "pcg/machine_specification.h" +#include "pcg/machine_view.h" +#include "pcg/parallel_computation_graph.h" +#include "substitutions/sub_parallel_computation_graph.h" + +using SubParallelComputationGraphView = + OutputLabelledOpenMultiDiGraphView; + +namespace FlexFlow { + +float parallel_estimate_cost( + SubParallelComputationGraphView const &g, + CostEstimator const &estimator, + MachineMapping const &device_mapping, + std::unordered_map const + &frontier_machine_views); + +} // namespace FlexFlow + +#endif diff --git a/lib/compiler/src/cost_estimator.cc b/lib/compiler/src/cost_estimator.cc new file mode 100644 index 0000000000..1425f86609 --- /dev/null +++ b/lib/compiler/src/cost_estimator.cc @@ -0,0 +1,130 @@ +#include "compiler/cost_estimator.h" +#include "compiler/cost_estimate.h" +#include "compiler/machine_mapping.h" +#include "pcg/parallel_computation_graph.h" +#include "utils/deduplicated_priority_queue.h" +#include "utils/exception.h" +#include "utils/graph/serialparallel.h" +#include + +namespace FlexFlow { + +// Computes estimated execution cost for a single node +float node_estimate_cost(Node const &node, + SubParallelComputationGraphView const &g, + CostEstimator const &estimator, + MachineMapping const &device_mapping) { + std::unordered_set incoming_edges = + get_incoming_edges(g, node); + + std::vector inputs = transform( + as_vector(incoming_edges), [&](UpwardOpenMultiDiEdge const &input_edge) { + return g.at(input_edge).get_shape(); + }); + float cost = estimator.estimate_cost( + g.at(node).attrs, inputs, device_mapping.machine_views.at(node)); + return cost; +} + +struct TimedNode { // Node and associated finishing time + Node node; + req endtime; +}; +FF_VISITABLE_STRUCT(TimedNode, node, endtime); + +struct TimeComparison { + bool operator()(TimedNode const &lhs, TimedNode const &rhs) const { + return (lhs.endtime < rhs.endtime); + } +}; + +bool predecessors_have_been_processed( + std::unordered_set const &predecessors, + std::unordered_set processed) { + std::unordered_set simple_processed = + transform(processed, [](TimedNode const &tn) { return tn.node; }); + + return all_of(predecessors, [&simple_processed](Node p) { + return simple_processed.find(p) != simple_processed.end(); + }); +} + +std::vector get_devices(Node const &node, + MachineMapping const &device_mapping) { + return device_mapping.machine_views.at(node).device_ids(); +} + +float parallel_estimate_cost( + SubParallelComputationGraphView const &g, + CostEstimator const &estimator, + MachineMapping const &device_mapping, + std::unordered_map const + &frontier_machine_views) { + float current_time = 0; + std::unordered_set + frontier; // nodes whose dependencies (previous nodes) have been met, and + // are waiting to be processed. + DeduplicatedPriorityQueue, TimeComparison> + processing; // nodes currently being processed. + std::unordered_set + processed; // set of nodes that have already been processed + std::unordered_map + occupied; // keeps track of the devices that are currently occupied + + // Filling the frontier + for (auto const &[edge, _] : frontier_machine_views) { + Node node = get_dst_node(edge); + frontier.insert(node); + } + + auto start_node_processing = [&](Node const &node, + std::vector const &devices) { + float cost = node_estimate_cost(node, g, estimator, device_mapping); + processing.push({node, current_time + cost}); + for (device_id_t d : devices) { + occupied[d] = true; + } + frontier.erase(node); + }; + + auto finish_node_processing = [&](TimedNode const &finished) { + std::vector devices = + get_devices(finished.node, device_mapping); + for (device_id_t d : devices) { // free devices + occupied[d] = false; + } + processed.insert(finished); + current_time = finished.endtime; + }; + + while (!frontier.empty() || !processing.empty()) { + // Processing new nodes + std::unordered_set frontier_copy(frontier); + for (Node const &node : frontier_copy) { + std::vector devices = get_devices(node, device_mapping); + if (all_of(devices, + [&occupied](device_id_t d) { return occupied[d] == false; })) { + start_node_processing(node, devices); + } + } + + // Finish processing all nodes + while (!processing.empty()) { + TimedNode finished = processing.top(); + processing.pop(); + finish_node_processing(finished); + + // Adding candidates to the frontier + for (Node const &successor : get_successors(g, finished.node)) { + std::unordered_set predecessors = get_predecessors(g, successor); + + if (predecessors_have_been_processed(predecessors, processed)) { + + frontier.insert(successor); + } + } + } + } + return current_time; +} +} // namespace FlexFlow diff --git a/lib/compiler/src/machine_mapping.cc b/lib/compiler/src/machine_mapping.cc index 2b08e9fe23..92c3037283 100644 --- a/lib/compiler/src/machine_mapping.cc +++ b/lib/compiler/src/machine_mapping.cc @@ -5,6 +5,9 @@ #include "utils/exception.h" #include "utils/graph/serialparallel.h" +#include "utils/deduplicated_priority_queue.h" +#include + namespace FlexFlow { MachineMapping MachineMapping::combine(MachineMapping const &s1, diff --git a/lib/compiler/test/CMakeLists.txt b/lib/compiler/test/CMakeLists.txt index 13b1fd3b83..26ed4259f0 100644 --- a/lib/compiler/test/CMakeLists.txt +++ b/lib/compiler/test/CMakeLists.txt @@ -10,4 +10,5 @@ ff_add_test_executable( compiler doctest utils-test-common + pcg ) diff --git a/lib/compiler/test/src/test_parallel_cost_estimator.cc b/lib/compiler/test/src/test_parallel_cost_estimator.cc new file mode 100644 index 0000000000..24ec098e6b --- /dev/null +++ b/lib/compiler/test/src/test_parallel_cost_estimator.cc @@ -0,0 +1,228 @@ +#include "compiler/cost_estimate.h" +#include "compiler/cost_estimator.h" +#include "doctest/doctest.h" +#include "test_cost_estimator.h" + +using namespace FlexFlow; + +TEST_SUITE(FF_TEST_SUITE) { + + TEST_CASE("parallel_estimate_cost: linear graph") { + + // Straight line example, 3 nodes: ->(n1)->(n2)->(n3) + auto g = OutputLabelledOpenMultiDiGraph:: + template create< + UnorderedOutputLabelledOpenMultiDiGraph>(); + + Node n1 = g.add_node(Operator{InputAttrs{}, "n1"}); + Node n2 = g.add_node(Operator{InputAttrs{}, "n2"}); + Node n3 = g.add_node(Operator{InputAttrs{}, "n3"}); + + NodePort p1 = g.add_node_port(); + NodePort p2 = g.add_node_port(); + NodePort p3 = g.add_node_port(); + + // dst, dstport, uid + InputMultiDiEdge e0{n1, p1, {1, 1}}; + // MultiDiEdge: dst, dstport, src, srcport + MultiDiEdge e1{n2, p2, n1, p1}; + MultiDiEdge e2{n3, p3, n2, p2}; + + g.add_edge(e0); + g.add_edge(e1); + g.add_edge(e2); + + g.add_label(e0, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + g.add_label(e1, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + + g.add_label(e2, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + + CostEstimator estimator = + CostEstimator::create(); // Returns 0.1 regardless + std::unordered_map devices = { + // single device per node + {n1, make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}, + {n2, make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}, + {n3, make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}}; + + MachineMapping device_mapping = {devices}; + auto frontier_machine_views = + std::unordered_map{ + {e0, make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}, + }; + + float result = parallel_estimate_cost( + g, estimator, device_mapping, frontier_machine_views); + CHECK(std::abs(result - .3) < 1e-7); + } + + TEST_CASE("parallel_estimate_cost: non-linear graph") { + // Non-linear graph example, diamond pattern, 4 nodes + + auto g = OutputLabelledOpenMultiDiGraph:: + template create< + UnorderedOutputLabelledOpenMultiDiGraph>(); + + Node n0 = g.add_node(Operator{InputAttrs{}, "n0"}); + Node n1 = g.add_node(Operator{InputAttrs{}, "n1"}); + Node n2 = g.add_node(Operator{InputAttrs{}, "n2"}); + Node n3 = g.add_node(Operator{InputAttrs{}, "n3"}); + + NodePort p0 = g.add_node_port(); + NodePort p1 = g.add_node_port(); + NodePort p2 = g.add_node_port(); + NodePort p3 = g.add_node_port(); + + // dst, dstport, uid + InputMultiDiEdge e0{n0, p0, {1, 1}}; + // MultiDiEdge: dst, dstport, src, srcport + MultiDiEdge e1{n1, p1, n0, p0}; + MultiDiEdge e2{n2, p2, n0, p0}; + MultiDiEdge e3{n3, p3, n1, p1}; + MultiDiEdge e4{n3, p3, n2, p2}; + + g.add_edge(e0); + g.add_edge(e1); + g.add_edge(e2); + g.add_edge(e3); + g.add_edge(e4); + + g.add_label(e0, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + g.add_label(e1, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + g.add_label(e2, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + g.add_label(e3, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + g.add_label(e4, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + + CostEstimator estimator = CostEstimator::create(); + std::unordered_map devices = { + {n0, make_1d_machine_view(gpu_id_t(1), gpu_id_t(3))}, + {n1, + make_1d_machine_view(gpu_id_t(1), + gpu_id_t(3))}, // nodes n1, n2 can run in parallel + {n2, make_1d_machine_view(gpu_id_t(5), gpu_id_t(6))}, + {n3, make_1d_machine_view(gpu_id_t(1), gpu_id_t(3))}}; + + MachineMapping device_mapping = {devices}; + auto frontier_machine_views = + std::unordered_map{ + {e0, make_1d_machine_view(gpu_id_t(1), gpu_id_t(3))}, + }; + + float result = parallel_estimate_cost( + g, estimator, device_mapping, frontier_machine_views); + CHECK(std::abs(result - 0.3) < 1e-7); + } + + TEST_CASE("parallel_estimate_cost: more complex non-linear graph") { + /* Non-linear graph example, 7 nodes + graph TD + .(( )) --> |"e0"|n0 + n0 --> |"e1"| n1["n1"] + n0 --> |"e2"| n2["n2"] + n0 --> |"e3"| n3["n3"] + n1 --> |"e4"| n4["n4"] + n2 --> |"e5"| n4["n4"] + n2 --> |"e6"| n5["n5"] + n3 --> |"e7"| n5["n5"] + n4 --> |"e8"| n6["n6"] + n5 --> |"e9"| n6["n6"] + */ + auto g = OutputLabelledOpenMultiDiGraph:: + template create< + UnorderedOutputLabelledOpenMultiDiGraph>(); + + std::vector n; + for (int i = 0; i < 7; ++i) { + n.push_back(g.add_node(Operator{InputAttrs{}, "n" + std::to_string(i)})); + } + + std::vector p; + for (int i = 0; i < 7; ++i) { + p.push_back(g.add_node_port()); + } + // dst, dstport, uid + InputMultiDiEdge e0{n[0], p[0], {1, 1}}; + g.add_edge(e0); + g.add_label(e0, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + + // MultiDiEdge: dst, dstport, src, srcport + MultiDiEdge e1{n[1], p[1], n[0], p[0]}; + MultiDiEdge e2{n[2], p[2], n[0], p[0]}; + MultiDiEdge e3{n[3], p[3], n[0], p[0]}; + + MultiDiEdge e4{n[4], p[4], n[1], p[1]}; + MultiDiEdge e5{n[4], p[4], n[2], p[2]}; + MultiDiEdge e6{n[5], p[5], n[2], p[2]}; + MultiDiEdge e7{n[5], p[5], n[3], p[3]}; + + MultiDiEdge e8{n[6], p[6], n[4], p[4]}; + MultiDiEdge e9{n[6], p[6], n[5], p[5]}; + std::vector edges = {e1, e2, e3, e4, e5, e6, e7, e8, e9}; + + for (auto &edge : edges) { + g.add_edge(edge); + g.add_label(edge, + ParallelTensor(ParallelTensorDims({2, 1}), + DataType::FLOAT, + CreateGrad::YES)); + } + + // All machines contain a + CostEstimator estimator = CostEstimator::create(); + std::unordered_map devices = { + {n[0], make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}, + + {n[1], make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}, + {n[2], make_1d_machine_view(gpu_id_t(2), gpu_id_t(3))}, + {n[3], make_1d_machine_view(gpu_id_t(3), gpu_id_t(4))}, + + {n[4], + make_1d_machine_view( + gpu_id_t(1), gpu_id_t(3))}, // Note that GPUs overlap in this case, + // so this layer cannot be parallelized + {n[5], make_1d_machine_view(gpu_id_t(2), gpu_id_t(4))}, + + {n[6], make_1d_machine_view(gpu_id_t(1), gpu_id_t(20))}, + }; + + MachineMapping device_mapping = {devices}; + auto frontier_machine_views = + std::unordered_map{ + {e0, make_1d_machine_view(gpu_id_t(1), gpu_id_t(2))}, + }; + + float result = parallel_estimate_cost( + g, estimator, device_mapping, frontier_machine_views); + CHECK(std::abs(result - 0.5) < 1e-7); + } +} diff --git a/lib/pcg/include/pcg/device_id.h b/lib/pcg/include/pcg/device_id.h index b118d69259..db2dfdd5f2 100644 --- a/lib/pcg/include/pcg/device_id.h +++ b/lib/pcg/include/pcg/device_id.h @@ -18,7 +18,7 @@ struct cpu_id_t : strong_typedef { using device_id_t = std::variant; device_id_t operator+(device_id_t, size_t); -DeviceType get_device_type(device_id_t); +DeviceType get_device_type(device_id_t const &id); gpu_id_t unwrap_gpu(device_id_t); cpu_id_t unwrap_cpu(device_id_t); diff --git a/lib/pcg/include/pcg/machine_view.h b/lib/pcg/include/pcg/machine_view.h index 7521cd209a..951768ce09 100644 --- a/lib/pcg/include/pcg/machine_view.h +++ b/lib/pcg/include/pcg/machine_view.h @@ -13,7 +13,7 @@ namespace FlexFlow { struct MachineView { - std::vector device_ids() const; + std::vector device_ids() const; device_id_t at(FFOrdered const &coord) const; StridedRectangleSide at(size_t) const; diff --git a/lib/pcg/src/device_id.cc b/lib/pcg/src/device_id.cc index 2849df7c3c..b204962351 100644 --- a/lib/pcg/src/device_id.cc +++ b/lib/pcg/src/device_id.cc @@ -1,6 +1,7 @@ #include "pcg/device_id.h" #include "utils/exception.h" #include +#include namespace FlexFlow { @@ -13,7 +14,18 @@ DeviceType get_device_type(device_id_t const &id) { } } -device_id_t operator+(device_id_t, size_t) { - NOT_IMPLEMENTED(); +// Most likely not the best way to do it. +device_id_t operator+(device_id_t device, size_t increment) { + if (std::holds_alternative(device)) { + gpu_id_t gpu_id = std::get(device); + int new_value = static_cast(gpu_id) + static_cast(increment); + return gpu_id_t(new_value); + } else { + assert((std::holds_alternative(device))); + cpu_id_t cpu_id = std::get(device); + int new_value = static_cast(cpu_id) + static_cast(increment); + return cpu_id_t(new_value); + } } + } // namespace FlexFlow diff --git a/lib/pcg/src/machine_view.cc b/lib/pcg/src/machine_view.cc index 46f87833f0..01df91c2ba 100644 --- a/lib/pcg/src/machine_view.cc +++ b/lib/pcg/src/machine_view.cc @@ -1,5 +1,6 @@ #include "pcg/machine_view.h" #include "utils/utils.h" +#include namespace FlexFlow { @@ -21,6 +22,20 @@ MachineView make_1d_machine_view(cpu_id_t start, cpu_id_t stop, int stride) { return {start, rect}; } +std::vector MachineView::device_ids() const { + std::vector ids; + if (rect.num_dims() == 1) { + StridedRectangleSide side = this->rect.at(ff_dim_t{0}); + for (device_id_t id = this->start; id < this->start + side.get_num_points(); + id = id + 1) { + ids.push_back(id); + } + return ids; + } else { + NOT_IMPLEMENTED(); + } +} + device_id_t MachineView::at(FFOrdered const &coord) const { size_t offset = this->rect.at(coord); return this->start + offset; diff --git a/lib/pcg/src/strided_rectangle.cc b/lib/pcg/src/strided_rectangle.cc index 27ef9a7f5b..99146d6144 100644 --- a/lib/pcg/src/strided_rectangle.cc +++ b/lib/pcg/src/strided_rectangle.cc @@ -15,6 +15,11 @@ size_t StridedRectangle::at(FFOrdered const &coord) const { return idx; } +StridedRectangleSide StridedRectangle::at(ff_dim_t const &dim) const { + StridedRectangleSide side = this->sides.at(dim); + return side; +} + StridedRectangleSide::StridedRectangleSide(side_size_t const &num, int stride) : num_points(num.value()), stride(stride) {} @@ -30,8 +35,12 @@ side_size_t StridedRectangleSide::get_size() const { NOT_IMPLEMENTED(); } +num_points_t StridedRectangleSide::get_num_points() const { + return num_points; +} + size_t StridedRectangle::num_dims() const { - NOT_IMPLEMENTED(); + return this->sides.size(); } } // namespace FlexFlow diff --git a/lib/utils/include/utils/graph/algorithms.h b/lib/utils/include/utils/graph/algorithms.h index 87b42a90d2..fe7d38557e 100644 --- a/lib/utils/include/utils/graph/algorithms.h +++ b/lib/utils/include/utils/graph/algorithms.h @@ -160,6 +160,10 @@ std::unordered_set get_predecessors(DiGraphView const &, Node const &); std::unordered_map> get_predecessors(DiGraphView const &, std::unordered_set const &); +std::unordered_set get_successors(DiGraphView const &, Node const &); +std::unordered_map> + get_successors(DiGraphView const &, std::unordered_set const &); + Node get_src_node(MultiDiEdge const &); Node get_dst_node(MultiDiEdge const &); Node get_dst_node(InputMultiDiEdge const &); diff --git a/lib/utils/src/graph/algorithms.cc b/lib/utils/src/graph/algorithms.cc index 2223b120a7..d5ea57568f 100644 --- a/lib/utils/src/graph/algorithms.cc +++ b/lib/utils/src/graph/algorithms.cc @@ -376,6 +376,16 @@ std::unordered_set get_predecessors(DiGraphView const &g, Node const &n) { return get_predecessors(g, std::unordered_set{n}).at(n); } +std::unordered_map> + get_successors(DiGraphView const &g, + std::unordered_set const &nodes) { + return get_predecessors(flipped(g), nodes); +} + +std::unordered_set get_successors(DiGraphView const &g, Node const &n) { + return get_successors(g, std::unordered_set{n}).at(n); +} + std::vector get_unchecked_dfs_ordering( DiGraphView const &g, std::unordered_set const &starting_points) { UncheckedDFSView dfs_view = unchecked_dfs(g, starting_points);