Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: numa_partitioner for parallel_for. #1461

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
9 changes: 9 additions & 0 deletions include/oneapi/tbb/parallel_for.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ void parallel_for( const Range& range, const Body& body, affinity_partitioner& p
start_for<Range,Body,affinity_partitioner>::run(range,body,partitioner);
}

//! Parallel iteration over range with numa_partitioner.
/** @ingroup algorithms **/
template<typename Range, typename Body, typename T>
__TBB_requires(tbb_range<Range> && parallel_for_body<Body, Range>)
void parallel_for(const Range& range, const Body& body, numa_partitioner<T>& n_partitioner) {
n_partitioner.execute_for(range, body);
}


//! Parallel iteration over range with default partitioner and user-supplied context.
/** @ingroup algorithms **/
template<typename Range, typename Body>
Expand Down
102 changes: 92 additions & 10 deletions include/oneapi/tbb/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
#include <algorithm>
#include <atomic>
#include <type_traits>
#include <tbb/blocked_range.h>
#include <tbb/blocked_range2d.h>
#include <tbb/blocked_range3d.h>

#if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
// Workaround for overzealous compiler warnings
Expand All @@ -67,7 +70,8 @@ class static_partitioner;
class affinity_partitioner;
class affinity_partition_type;
class affinity_partitioner_base;

template <typename T> class numa_partitioner;

inline std::size_t get_initial_auto_partitioner_divisor() {
const std::size_t factor = 4;
return factor * static_cast<std::size_t>(max_concurrency());
Expand Down Expand Up @@ -567,14 +571,15 @@ class affinity_partition_type : public dynamic_grainsize_mode<linear_affinity_mo
@ingroup algorithms */
class simple_partitioner {
public:
simple_partitioner() {}
simple_partitioner() {}
// new implementation just extends existing interface
typedef simple_partition_type task_partition_type;

private:
template<typename Range, typename Body, typename Partitioner> friend struct start_for;
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_scan;
// new implementation just extends existing interface
typedef simple_partition_type task_partition_type;
// TODO: consider to make split_type public
typedef simple_partition_type::split_type split_type;

Expand All @@ -594,14 +599,14 @@ class simple_partitioner {
class auto_partitioner {
public:
auto_partitioner() {}
// new implementation just extends existing interface
typedef auto_partition_type task_partition_type;

private:
template<typename Range, typename Body, typename Partitioner> friend struct start_for;
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_scan;
// new implementation just extends existing interface
typedef auto_partition_type task_partition_type;
// TODO: consider to make split_type public
typedef auto_partition_type::split_type split_type;

Expand All @@ -627,13 +632,15 @@ class auto_partitioner {
class static_partitioner {
public:
static_partitioner() {}
// new implementation just extends existing interface
typedef static_partition_type task_partition_type;

private:
template<typename Range, typename Body, typename Partitioner> friend struct start_for;
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_scan;
// new implementation just extends existing interface
typedef static_partition_type task_partition_type;

// TODO: consider to make split_type public
typedef static_partition_type::split_type split_type;
};
Expand All @@ -642,18 +649,92 @@ class static_partitioner {
class affinity_partitioner : affinity_partitioner_base {
public:
affinity_partitioner() {}
// new implementation just extends existing interface
typedef affinity_partition_type task_partition_type;

private:
template<typename Range, typename Body, typename Partitioner> friend struct start_for;
template<typename Range, typename Body, typename Partitioner> friend struct start_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_deterministic_reduce;
template<typename Range, typename Body, typename Partitioner> friend struct start_scan;
// new implementation just extends existing interface
typedef affinity_partition_type task_partition_type;

// TODO: consider to make split_type public
typedef affinity_partition_type::split_type split_type;
};

template<typename BasePartitioner>
class numa_partitioner {
std::size_t num_numa_nodes;
BasePartitioner& base_partitioner;

public:
numa_partitioner() : num_numa_nodes(get_number_of_numa_nodes()), base_partitioner() { initialize_arena();}
numa_partitioner(BasePartitioner& bp) : num_numa_nodes(get_number_of_numa_nodes()), base_partitioner(bp) { initialize_arena();}

void initialize_arena() const {
for (std::size_t node = 0; node < num_numa_nodes; ++node) {
this->arenas.emplace_back(tbb::task_arena::constraints().set_numa_id(node));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the same instance is used across multiple parallel_fors, won't the arenas vector keep growing? I think initialize would be repeatedly invoked.

}
}
typedef detail::proportional_split split_type;
typedef typename BasePartitioner::task_partition_type task_partition_type;

template<typename Range, typename Body>
void execute_for(const Range& range, const Body& body) const;

template<typename Range, typename Body>
void execute_reduce(const Range& range, Body& body) const;

private:
mutable std::vector<oneapi::tbb::task_arena> arenas;

// Function to get the number of NUMA nodes in the system
std::size_t get_number_of_numa_nodes() {
return oneapi::tbb::info::numa_nodes().size();
}

// Helper function to split a range into multiple subranges
template<typename Range>
void split_range(const Range& range, std::vector<Range>& subranges, std::size_t num_parts) const {
subranges.push_back(range); // Start with the full range
for (std::size_t i = 1; i < num_parts; ++i) {
if (!subranges.back().is_divisible()) break; // If the range is no longer divisible, stop splitting
Range new_range = subranges.back(); // Copy the last range
subranges.back()= Range(new_range, detail::split());
subranges.push_back(new_range); // Add the new subrange
}
}
};

template<typename BasePartitioner>
template<typename Range, typename Body>
void numa_partitioner<BasePartitioner>::execute_for(const Range& range, const Body& body) const{
if (range.is_divisible() && num_numa_nodes > 1) {
std::vector<Range> subranges;
split_range(range, subranges, num_numa_nodes);
std::vector<oneapi::tbb::task_group> task_groups(num_numa_nodes);
//initialize_arena();

for (std::size_t i = 0; i < num_numa_nodes; ++i) {
arenas[i].execute([&]() {
task_groups[i].run([&, i] {
parallel_for(subranges[i], body, base_partitioner);
});
});
}
for (std::size_t i = 0; i < num_numa_nodes; ++i) {
arenas[i].execute([&task_groups, i]() {
task_groups[i].wait();
});
}
}
else {
parallel_for(range,body,base_partitioner);
}
}



} // namespace d1
} // namespace detail

Expand All @@ -663,6 +744,7 @@ using detail::d1::auto_partitioner;
using detail::d1::simple_partitioner;
using detail::d1::static_partitioner;
using detail::d1::affinity_partitioner;
using detail::d1::numa_partitioner;
// Split types
using detail::split;
using detail::proportional_split;
Expand Down
133 changes: 133 additions & 0 deletions test/tbb/test_parallel_for.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,136 @@ TEST_CASE("parallel_for constraints") {
#if _MSC_VER
#pragma warning (pop)
#endif

// Define a simple functor to use with parallel_for
struct SimpleFunctor {
void operator()(const tbb::blocked_range<size_t>& r) const {
for (size_t i = r.begin(); i != r.end(); ++i) {
// For simplicity, we'll just ensure each element is visited
}
}
};

void TestNumaPartitionerSimple() {
const size_t N = 1000;
std::vector<int> vec(N, 1);

tbb::blocked_range<size_t> range(0, N);
SimpleFunctor functor;
tbb::affinity_partitioner ap;
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap);

// Test parallel_for with numa_partitioner
parallel_for(range, functor, n_partitioner);

// Verify results (for now, just check if the function runs without errors)
CHECK(true);
}

void TestNumaPartitionerWithBody() {
const size_t N = 1000;
std::vector<int> vec(N, 0);

tbb::blocked_range<size_t> range(0, N);

auto body = [&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
vec[i] = 1; // Set each element to 1
}
};

tbb::affinity_partitioner ap;
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap);

// Test parallel_for with numa_partitioner and a lambda body
parallel_for(range, body, n_partitioner);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a test with more than one parallel_for invocation. I think that would uncover some of the design issues.


// Verify results
for (size_t i = 0; i < N; ++i) {
CHECK(vec[i] == 1);
}
}

void TestNumaPartitionerEmptyRange() {
const size_t N = 0;
std::vector<int> vec(N, 1);

tbb::blocked_range<size_t> range(0, N);
SimpleFunctor functor;
tbb::affinity_partitioner ap;
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap);

// Test parallel_for with an empty range
parallel_for(range, functor, n_partitioner);

// Verify that the function runs without errors and vec remains unchanged
CHECK(true);
}

//! Testing parallel_for with numa_partitioner
//! \brief \ref requirement
TEST_CASE("NUMA partitioner tests") {
TestNumaPartitionerSimple();
TestNumaPartitionerWithBody();
TestNumaPartitionerEmptyRange();
}

void TestNumaPartitionerNonDivisibleRange() {
const size_t N = 1003; // A number that's less likely to be divisible by standard partition sizes
std::vector<int> vec(N, 0);

tbb::blocked_range<size_t> range(0, N);

auto body = [&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
vec[i] = 1;
}
};

tbb::affinity_partitioner ap;
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap);

// Test parallel_for with a non-divisible range
parallel_for(range, body, n_partitioner);

// Verify results
for (size_t i = 0; i < N; ++i) {
CHECK(vec[i] == 1);
}
}

void TestNumaPartitionerExceptionHandling() {
const size_t N = 1000;
std::vector<int> vec(N, 0);

tbb::blocked_range<size_t> range(0, N);

auto body = [&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
if (i == N / 2) throw std::runtime_error("Test exception");
vec[i] = 1;
}
};

tbb::affinity_partitioner ap;
tbb::numa_partitioner<tbb::affinity_partitioner> n_partitioner(ap);

// Test parallel_for with exception handling
bool exceptionCaught = false;
try {
parallel_for(range, body, n_partitioner);
} catch (const std::runtime_error& e) {
exceptionCaught = true;
}

// Verify that the exception was caught
CHECK(exceptionCaught == true);
}

// Add this to test suite
TEST_CASE("NUMA partitioner tests- exceptions") {
TestNumaPartitionerNonDivisibleRange();
TestNumaPartitionerExceptionHandling();
}


Loading