Skip to content

Commit

Permalink
Enable fullyconnect parallel for per node core allocation (openvinoto…
Browse files Browse the repository at this point in the history
…olkit#23593)

### Details:
- *integrated
[PR19801](openvinotoolkit#19801),
[PR23007](openvinotoolkit#23007) and
[PR23127](openvinotoolkit#23127
 - enable sub streams for per node core allocation
 - update class ModelDistributionPolicy and class SubStreamsMode
- refactor get_model_prefer_threads() with class ModelDistributionPolicy
 - remove get_default_latency_streams() since it is always 1 now
 - add sub streams to executor for per node core allocation
- Improve the performance of Fully connect layer on 2-socket Xeon
systems.

### Tickets:
 - *123078, 129972, 132954*

---------

Co-authored-by: Shen, Wanglei <[email protected]>
Co-authored-by: Xiuchuan Zhai <[email protected]>
Co-authored-by: Vladislav Golubev <[email protected]>
  • Loading branch information
4 people authored Apr 1, 2024
1 parent d384662 commit e2c6ae9
Show file tree
Hide file tree
Showing 38 changed files with 2,134 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class OPENVINO_RUNTIME_API CPUStreamsExecutor : public IStreamsExecutor {

int get_socket_id() override;

void run_sub_stream(Task task, int id) override;

private:
struct Impl;
std::unique_ptr<Impl> _impl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
// (for large #streams)
};

/**
* @enum StreamsMode
* @brief This enum contains definition of each sub streams mode, indicating the main stream situation.
*/
enum class StreamsMode {
SUB_STREAMS_NULL, //!< Do not create sub streams
SUB_STREAMS_FOR_SOCKET, //!< Create sub streams for multiple sockets in main stream
LATENCY, //!< latency mode
THROUGHPUT, //!< throughput mode
};

private:
std::string _name; //!< Used by `ITT` to name executor threads
int _streams = 1; //!< Number of streams.
Expand All @@ -82,6 +93,7 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
std::vector<std::vector<int>> _streams_info_table = {};
std::vector<std::vector<int>> _stream_processor_ids;
bool _cpu_reservation = false;
int _sub_streams = 0;

/**
* @brief Get and reserve cpu ids based on configuration and hardware information,
Expand Down Expand Up @@ -190,6 +202,15 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
int get_thread_binding_offset() const {
return _threadBindingOffset;
}
int get_sub_streams() const {
return _sub_streams;
}
StreamsMode get_sub_stream_mode() const {
const auto proc_type_table = get_proc_type_table();
int sockets = proc_type_table.size() > 1 ? static_cast<int>(proc_type_table.size()) - 1 : 1;
return _sub_streams > 0 ? StreamsMode::SUB_STREAMS_FOR_SOCKET
: (_streams <= sockets ? StreamsMode::LATENCY : StreamsMode::THROUGHPUT);
}
bool operator==(const Config& config) {
if (_name == config._name && _streams == config._streams &&
_threads_per_stream == config._threads_per_stream && _threadBindingType == config._threadBindingType &&
Expand Down Expand Up @@ -250,6 +271,27 @@ class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
* @param task A task to start
*/
virtual void execute(Task task) = 0;

/**
* @brief Execute ov::Task inside sub stream of task executor context
* @param task A task to start
* @param id Sub stream id
*/
virtual void run_sub_stream(Task task, int id) = 0;

/**
* @brief Execute all of the tasks and waits for its completion.
* Default run_sub_stream_and_wait() method implementation uses run_sub_stream() pure virtual method
* and higher level synchronization primitives from STL.
* The task is wrapped into std::packaged_task which returns std::future.
* std::packaged_task will call the task and signal to std::future that the task is finished
* or the exception is thrown from task
* Than std::future is used to wait for task execution completion and
* task exception extraction
* @note run_sub_stream_and_wait() does not copy or capture tasks!
* @param tasks A vector of tasks to execute
*/
void run_sub_stream_and_wait(const std::vector<Task>& tasks);
};

} // namespace threading
Expand Down
98 changes: 94 additions & 4 deletions src/inference/src/dev/threading/cpu_streams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "openvino/itt.hpp"
#include "openvino/runtime/system_conf.hpp"
#include "openvino/runtime/threading/cpu_streams_executor_internal.hpp"
#include "openvino/runtime/threading/cpu_streams_info.hpp"
#include "openvino/runtime/threading/executor_manager.hpp"
#include "openvino/runtime/threading/thread_local.hpp"

Expand Down Expand Up @@ -58,6 +59,11 @@ struct CPUStreamsExecutor::Impl {
_streamId = _impl->_streamIdQueue.front();
_impl->_streamIdQueue.pop();
}
if (!_impl->_subStreamIdQueue.empty() && _impl->_subStreamsNum < _impl->_config.get_sub_streams()) {
_sub_stream_id = _impl->_subStreamIdQueue.front();
_impl->_subStreamIdQueue.pop();
_impl->_subStreamsNum++;
}
}
_numaNodeId =
_impl->_config.get_streams()
Expand Down Expand Up @@ -144,9 +150,8 @@ struct CPUStreamsExecutor::Impl {
.set_max_threads_per_core(max_threads_per_core)});
} else {
_taskArena.reset(new custom::task_arena{concurrency});
_cpu_ids = static_cast<int>(stream_processors.size()) == _impl->_config.get_streams()
? stream_processors[stream_id]
: _cpu_ids;
_cpu_ids =
stream_id < static_cast<int>(stream_processors.size()) ? stream_processors[stream_id] : _cpu_ids;
if (_cpu_ids.size() > 0) {
CpuSet processMask;
int ncpus = 0;
Expand All @@ -166,7 +171,8 @@ struct CPUStreamsExecutor::Impl {
StreamCreateType stream_type;
const auto org_proc_type_table = get_org_proc_type_table();
int streams_num = _impl->_config.get_streams();
const auto stream_id = streams_num == 0 ? 0 : _streamId % streams_num;
const auto stream_id =
streams_num == 0 ? 0 : (_sub_stream_id >= 0 ? streams_num + _sub_stream_id : _streamId % streams_num);
get_cur_stream_info(stream_id,
_impl->_config.get_cpu_reservation(),
org_proc_type_table,
Expand All @@ -193,6 +199,7 @@ struct CPUStreamsExecutor::Impl {
int _numaNodeId = 0;
int _socketId = 0;
bool _execute = false;
int _sub_stream_id = -1;
std::queue<Task> _taskQueue;
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
std::unique_ptr<custom::task_arena> _taskArena;
Expand Down Expand Up @@ -314,13 +321,17 @@ struct CPUStreamsExecutor::Impl {
_exectorMgr = executor_manager();
auto numaNodes = get_available_numa_nodes();
int streams_num = _config.get_streams();
int sub_streams_num = _config.get_sub_streams();
if (streams_num != 0) {
std::copy_n(std::begin(numaNodes),
std::min<std::size_t>(streams_num, numaNodes.size()),
std::back_inserter(_usedNumaNodes));
} else {
_usedNumaNodes = numaNodes;
}
if (sub_streams_num > 0) {
_subTaskThread.assign(sub_streams_num, std::make_shared<SubQueue>());
}
for (auto streamId = 0; streamId < streams_num; ++streamId) {
_threads.emplace_back([this, streamId] {
openvino::itt::threadName(_config.get_name() + "_" + std::to_string(streamId));
Expand All @@ -343,6 +354,31 @@ struct CPUStreamsExecutor::Impl {
});
}
_streams.set_thread_ids_map(_threads);

for (auto subId = 0; subId < sub_streams_num; ++subId) {
_subThreads.emplace_back([this, subId, sub_streams_num] {
openvino::itt::threadName(_config.get_name() + "_subthreads" + "_" + std::to_string(subId));
for (bool stopped = false; !stopped;) {
Task task;
{ _subTaskThread[subId]->que_pop(task, stopped); }
if (task) {
{
std::lock_guard<std::mutex> lock{_streamIdMutex};
if (_subStreamsNum < sub_streams_num) {
_subStreamIdQueue.push(subId);
} else {
std::queue<int> empty;
std::swap(_subStreamIdQueue, empty);
}
}
Execute(task, *(_streams.local()));
}
}
});
}
if (_subThreads.size() > 0) {
_streams.set_thread_ids_map(_subThreads);
}
}

void Enqueue(Task task) {
Expand All @@ -353,6 +389,10 @@ struct CPUStreamsExecutor::Impl {
_queueCondVar.notify_one();
}

void Enqueue_sub(Task task, int id) {
_subTaskThread[id]->que_push(std::move(task));
}

void Execute(const Task& task, Stream& stream) {
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
auto& arena = stream._taskArena;
Expand Down Expand Up @@ -382,15 +422,49 @@ struct CPUStreamsExecutor::Impl {
}
}

struct SubQueue {
std::mutex _subMutex;
std::condition_variable _subQueueCondVar;
bool _isSubStopped = false;
std::queue<Task> _subTaskQueue;

SubQueue() {}

void que_push(Task task) {
{
std::lock_guard<std::mutex> lock(_subMutex);
_subTaskQueue.emplace(std::move(task));
}
_subQueueCondVar.notify_one();
}

void que_pop(Task& task, bool& stopped) {
std::unique_lock<std::mutex> lock(_subMutex);
_subQueueCondVar.wait(lock, [&] {
return !_subTaskQueue.empty() || (stopped = _isSubStopped);
});
if (!_subTaskQueue.empty()) {
task = std::move(_subTaskQueue.front());
_subTaskQueue.pop();
}
}

~SubQueue() {}
};

Config _config;
std::mutex _streamIdMutex;
int _streamId = 0;
std::queue<int> _streamIdQueue;
std::queue<int> _subStreamIdQueue;
int _subStreamsNum = 0;
std::vector<std::thread> _threads;
std::vector<std::thread> _subThreads;
std::mutex _mutex;
std::condition_variable _queueCondVar;
std::queue<Task> _taskQueue;
bool _isStopped = false;
std::vector<std::shared_ptr<SubQueue>> _subTaskThread;
std::vector<int> _usedNumaNodes;
CustomThreadLocal _streams;
std::shared_ptr<ExecutorManager> _exectorMgr;
Expand Down Expand Up @@ -424,6 +498,18 @@ CPUStreamsExecutor::~CPUStreamsExecutor() {
thread.join();
}
}
for (size_t i = 0; i < _impl->_subTaskThread.size(); i++) {
{
std::lock_guard<std::mutex> lock(_impl->_subTaskThread[i]->_subMutex);
_impl->_subTaskThread[i]->_isSubStopped = true;
}
_impl->_subTaskThread[i]->_subQueueCondVar.notify_all();
}
for (auto& thread : _impl->_subThreads) {
if (thread.joinable()) {
thread.join();
}
}
}

void CPUStreamsExecutor::execute(Task task) {
Expand All @@ -438,5 +524,9 @@ void CPUStreamsExecutor::run(Task task) {
}
}

void CPUStreamsExecutor::run_sub_stream(Task task, int id) {
_impl->Enqueue_sub(std::move(task), id);
}

} // namespace threading
} // namespace ov
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void get_cur_stream_info(const int stream_id,
bool cpu_reserve = cpu_reservation;
bool ecore_used = false;
for (size_t i = 0; i < streams_info_table.size(); i++) {
stream_total += streams_info_table[i][NUMBER_OF_STREAMS];
stream_total += std::abs(streams_info_table[i][NUMBER_OF_STREAMS]);
if (stream_id < stream_total) {
stream_info_id = i;
break;
Expand Down Expand Up @@ -93,10 +93,10 @@ void reserve_cpu_by_streams_info(const std::vector<std::vector<int>> _streams_in
bool last_all_proc = false;

for (size_t i = 0; i < _streams_info_table.size(); i++) {
if (_streams_info_table[i][NUMBER_OF_STREAMS] > 0) {
if (_streams_info_table[i][NUMBER_OF_STREAMS] != 0) {
stream_pos.push_back(num_streams);
}
num_streams += _streams_info_table[i][NUMBER_OF_STREAMS];
num_streams += std::abs(_streams_info_table[i][NUMBER_OF_STREAMS]);
}
num_conditions = static_cast<int>(stream_pos.size());
_stream_processors.assign(num_streams, std::vector<int>());
Expand All @@ -107,10 +107,13 @@ void reserve_cpu_by_streams_info(const std::vector<std::vector<int>> _streams_in
std::vector<std::string> proc_types;
std::vector<std::string> numa_nodes;
std::vector<std::string> sockets;
if (_streams_info_table[i][NUMBER_OF_STREAMS] > 0) {
if (_streams_info_table[i][NUMBER_OF_STREAMS] != 0) {
streams_table.push_back(_streams_info_table[i]);
if (_streams_info_table[i][NUMBER_OF_STREAMS] < 0) {
streams_table[streams_table.size() - 1][NUMBER_OF_STREAMS] = 1;
}
}
if (last_all_proc && _streams_info_table[i][NUMBER_OF_STREAMS] > 0) {
if (last_all_proc && _streams_info_table[i][NUMBER_OF_STREAMS] != 0) {
last_all_proc = false;
condition_idx++;
}
Expand Down
31 changes: 31 additions & 0 deletions src/inference/src/dev/threading/istreams_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "openvino/runtime/threading/istreams_executor.hpp"

#include <algorithm>
#include <future>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -303,10 +304,13 @@ void IStreamsExecutor::Config::update_executor_config() {
// Recaculate _streams, _threads and _threads_per_stream by _streams_info_table
int num_streams = 0;
_threads = 0;
_sub_streams = 0;
for (size_t i = 0; i < _streams_info_table.size(); i++) {
if (_streams_info_table[i][NUMBER_OF_STREAMS] > 0) {
num_streams += _streams_info_table[i][NUMBER_OF_STREAMS];
_threads += _streams_info_table[i][NUMBER_OF_STREAMS] * _streams_info_table[i][THREADS_PER_STREAM];
} else if (_streams_info_table[i][NUMBER_OF_STREAMS] == -1) {
_sub_streams += 1;
}
}
_threads_per_stream = _streams_info_table[0][THREADS_PER_STREAM];
Expand Down Expand Up @@ -346,5 +350,32 @@ void IStreamsExecutor::Config::set_config_zero_stream() {
_cpu_reservation = false;
}

void IStreamsExecutor::run_sub_stream_and_wait(const std::vector<Task>& tasks) {
std::vector<std::packaged_task<void()>> packagedTasks;
std::vector<std::future<void>> futures;
for (std::size_t i = 0; i < tasks.size(); ++i) {
packagedTasks.emplace_back([&tasks, i] {
tasks[i]();
});
futures.emplace_back(packagedTasks.back().get_future());
}
for (std::size_t i = 0; i < tasks.size(); ++i) {
run_sub_stream(
[&packagedTasks, i] {
packagedTasks[i]();
},
static_cast<int>(i));
}
// std::future::get will rethrow exception from task.
// We should wait all tasks before any exception is thrown.
// So wait() and get() for each future moved to separate loops
for (auto&& future : futures) {
future.wait();
}
for (auto&& future : futures) {
future.get();
}
}

} // namespace threading
} // namespace ov
Loading

0 comments on commit e2c6ae9

Please sign in to comment.