Skip to content

Commit

Permalink
add example/use-execution-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
oathdruid committed Aug 2, 2024
1 parent 44ec4ec commit 6b3379d
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 1 deletion.
5 changes: 5 additions & 0 deletions example/use-execution-queue/.bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
common --registry=https://bcr.bazel.build
common --registry=https://baidu.github.io/babylon/registry
common --registry=https://raw.githubusercontent.com/bazelboost/registry/main

build --compilation_mode opt --copt=-O3 --cxxopt=-std=c++17
1 change: 1 addition & 0 deletions example/use-execution-queue/.bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7.2.1
30 changes: 30 additions & 0 deletions example/use-execution-queue/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
cc_library(
name = 'butex_interface',
hdrs = ['butex_interface.h'],
deps = [
'@babylon//:concurrent_sched_interface',
'@brpc//:bthread',
],
)

cc_library(
name = 'bthread_executor',
hdrs = ['bthread_executor.h'],
deps = [
':butex_interface',
'@babylon//:executor',
'@babylon//:logging',
'@brpc//:bthread',
],
)

cc_binary(
name = 'example',
srcs = ['example.cpp'],
deps = [
':bthread_executor',
'@babylon//:concurrent',
'@brpc',
'@tcmalloc//tcmalloc',
],
)
4 changes: 4 additions & 0 deletions example/use-execution-queue/MODULE.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
bazel_dep(name = 'babylon', version = '1.3.1')
bazel_dep(name = 'brpc', version = '1.10.0')
bazel_dep(name = 'tcmalloc', version = '0.0.0-20240411-5ed309d')
single_version_override(module_name = 'protobuf', version = '3.19.6')
36 changes: 36 additions & 0 deletions example/use-execution-queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Use ConcurrentExecutionQueue

## 示例构成

- `:example`: 典型的MPSC场景,对比bthread::ExecutionQueue的性能演示

## 性能演示

### 多线程低频

- qps: 100K
- concurrency: 7

| AMD EPYC 7W83 64-Core Processor | latency | cpu |
|---------------------------------|---------------------------|-------|
| bthread | [11058,12682,19824,25916] | 0.031 |
| babylon | [4658,5320,9819,13989] | 0.019 |

### 多线程高频

- qps: 5M
- concurrency: 7

| AMD EPYC 7W83 64-Core Processor | latency | cpu |
|---------------------------------|---------------------------------|-------|
| bthread | [727340,915718,1259166,1461166] | 1.104 |
| babylon | [272753,338788,598074,703571] | 0.618 |

### 多线程吞吐

- concurrency: 7

| AMD EPYC 7W83 64-Core Processor | max qps | latency | cpu |
|---------------------------------|---------|------------------------------------|-------|
| bthread | 6.26M | [1099557,1321548,1539623,1882140] | 1.449 |
| babylon | 14.49M | [1674166,1957155,3212593,3918438] | 1.886 |
54 changes: 54 additions & 0 deletions example/use-execution-queue/bthread_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include "babylon/executor.h"
#include "babylon/logging/logger.h"

#include "bthread/bthread.h"
#include "butex_interface.h"

BABYLON_NAMESPACE_BEGIN

// 使用bthread线程池执行的executor
class BthreadExecutor : public Executor {
public:
inline static BthreadExecutor& instance() noexcept {
static BthreadExecutor bthread_executor;
return bthread_executor;
}

protected:
// 启动bthread,并运行function
virtual int invoke(
MoveOnlyFunction<void(void)>&& function) noexcept override {
auto args = new MoveOnlyFunction<void(void)>(::std::move(function));
::bthread_t th;
if (0 != ::bthread_start_background(&th, NULL, run_function, args)) {
BABYLON_LOG(WARNING) << "start bthread to execute failed";
function = ::std::move(*args);
delete args;
return -1;
}
return 0;
}

private:
// 线程函数
inline static void* run_function(void* args) noexcept {
auto function = reinterpret_cast<MoveOnlyFunction<void(void)>*>(args);
(*function)();
delete function;
return nullptr;
}
};

// 使用bthread线程池的async
template <typename S = ::babylon::ButexInterface, typename C, typename... Args>
inline Future<typename ::std::result_of<typename ::std::decay<C>::type(
typename ::std::decay<Args>::type...)>::type,
S>
bthread_async(C&& callable, Args&&... args) noexcept {
return BthreadExecutor::instance().execute<S>(::std::forward<C>(callable),
::std::forward<Args>(args)...);
}

BABYLON_NAMESPACE_END
4 changes: 4 additions & 0 deletions example/use-execution-queue/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -ex

bazel build example
73 changes: 73 additions & 0 deletions example/use-execution-queue/butex_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include "babylon/concurrent/sched_interface.h"

#include "absl/time/clock.h"
#include "bthread/bthread.h"
#include "bthread/butex.h"

BABYLON_NAMESPACE_BEGIN

struct ButexInterface : public SchedInterface {
inline constexpr static bool futex_need_create() noexcept;
inline static uint32_t* create_futex() noexcept;
inline static void destroy_futex(uint32_t* futex) noexcept;

inline static int futex_wait(uint32_t* futex, uint32_t val,
const struct ::timespec* timeout) noexcept;
inline static int futex_wake_one(uint32_t* futex) noexcept;
inline static int futex_wake_all(uint32_t* futex) noexcept;

inline static void usleep(int64_t us) noexcept;
inline static void yield() noexcept;
};

struct BsleepInterface : public SchedInterface {
inline static void usleep(int64_t us) noexcept;
};

////////////////////////////////////////////////////////////////////////////////
inline constexpr bool ButexInterface::futex_need_create() noexcept {
return true;
}

inline uint32_t* ButexInterface::create_futex() noexcept {
return ::bthread::butex_create_checked<uint32_t>();
}

inline void ButexInterface::destroy_futex(uint32_t* butex) noexcept {
return ::bthread::butex_destroy(butex);
}

inline int ButexInterface::futex_wait(
uint32_t* butex, uint32_t val, const struct ::timespec* timeout) noexcept {
if (timeout == nullptr) {
return ::bthread::butex_wait(butex, val, nullptr);
} else {
auto abstime = ::absl::ToTimespec(::absl::Now() +
::absl::DurationFromTimespec(*timeout));
return ::bthread::butex_wait(butex, val, &abstime);
}
}

inline int ButexInterface::futex_wake_one(uint32_t* butex) noexcept {
return ::bthread::butex_wake(butex);
}

inline int ButexInterface::futex_wake_all(uint32_t* butex) noexcept {
return ::bthread::butex_wake_all(butex);
}

inline void ButexInterface::usleep(int64_t us) noexcept {
::bthread_usleep(us);
}

inline void ButexInterface::yield() noexcept {
::bthread_yield();
}

inline void BsleepInterface::usleep(int64_t us) noexcept {
::bthread_usleep(us);
}

BABYLON_NAMESPACE_END
116 changes: 116 additions & 0 deletions example/use-execution-queue/example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "babylon/concurrent/execution_queue.h"

#include "brpc/server.h"
#include "bthread_executor.h"
#include "bthread/execution_queue.h"
#include "gflags/gflags.h"

DEFINE_int32(dummy_port, 8000, "TCP Port of this dummy server");
DEFINE_uint64(concurrency, 7, "Concurrent logging thread num");
DEFINE_string(mode, "babylon", "");
DEFINE_uint64(qps, 100000, "");

namespace brpc {
bool IsAskedToQuit();
}

struct Task {
int64_t begin;
};

::bthread::ExecutionQueueId<Task> bthread_queue_id;
::babylon::ConcurrentExecutionQueue<Task> babylon_queue;

::bvar::Adder<ssize_t> pending;
::bvar::LatencyRecorder latency;

void run_once_bthread(int64_t begin) {
pending << 1;
::bthread::TaskOptions options;
::bthread::execution_queue_execute(bthread_queue_id, {begin}, &options);
}

void run_once_babylon(int64_t begin) {
pending << 1;
babylon_queue.execute({begin});
}

int bthread_queue_consume(void*, ::bthread::TaskIterator<Task>& iter) {
if (iter.is_queue_stopped()) {
return 0;
}
for (; iter; ++iter) {
latency << ::butil::monotonic_time_ns() - iter->begin;
pending << -1;
}
return 0;
}

void babylon_queue_consume(::babylon::ConcurrentExecutionQueue<Task>::Iterator iter, ::babylon::ConcurrentExecutionQueue<Task>::Iterator end) {
for (; iter != end; ++iter) {
latency << ::butil::monotonic_time_ns() - iter->begin;
pending << -1;
}
}

void run_loop() {
size_t times = 0;
int64_t expect_us = 0;
while (expect_us < 10000) {
++times;
expect_us = 1000000L * times * FLAGS_concurrency / FLAGS_qps;
}
::std::cerr << "expect_us " << expect_us << " times " << times << ::std::endl;

void (*run_once)(int64_t);
if (FLAGS_mode == "babylon") {
run_once = run_once_babylon;
} else if (FLAGS_mode == "bthread") {
run_once = run_once_bthread;
} else {
return;
}

::std::vector<::babylon::Future<void>> futures;
for (size_t i = 0; i < FLAGS_concurrency; ++i) {
futures.emplace_back(::babylon::BthreadExecutor::instance().execute([&] {
while (!::brpc::IsAskedToQuit()) {
auto begin = ::butil::monotonic_time_ns();
for (size_t j = 0; j < times; ++j) {
run_once(::butil::monotonic_time_ns());
}
auto end = ::butil::monotonic_time_ns();
auto use_us = (end - begin) / 1000;
if (use_us < expect_us) {
::usleep(expect_us - use_us);
}
}
}));
}

for (auto& future : futures) {
future.get();
}
}

int main(int argc, char* argv[]) {
::gflags::ParseCommandLineFlags(&argc, &argv, true);

::brpc::StartDummyServerAt(FLAGS_dummy_port);

pending.expose("test_" + FLAGS_mode + "_pending");
latency.expose("test_" + FLAGS_mode);

if (FLAGS_mode == "babylon") {
babylon_queue.initialize(1L << 18, ::babylon::BthreadExecutor::instance(), babylon_queue_consume);
} else if (FLAGS_mode == "bthread") {
::bthread::ExecutionQueueOptions options;
::bthread::execution_queue_start(&bthread_queue_id, &options, bthread_queue_consume, nullptr);
} else {
return 0;
}

run_loop();

return 0;
}
3 changes: 2 additions & 1 deletion src/babylon/logging/async_file_appender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ void AsyncFileAppender::write_use_plain_writev(Destination& dest,
for (size_t i = 0; i < size; ++i) {
pages.emplace_back(piov[i].iov_base);
}
::writev(fd, piov, size);
auto written = ::writev(fd, piov, size);
(void) written;
iter += size;
}

Expand Down

0 comments on commit 6b3379d

Please sign in to comment.