diff --git a/example/use-execution-queue/.bazelrc b/example/use-execution-queue/.bazelrc new file mode 100644 index 00000000..6cf21e09 --- /dev/null +++ b/example/use-execution-queue/.bazelrc @@ -0,0 +1,5 @@ +common --registry=https://bcr.bazel.build +common --registry=https://baidu.github.io/babylon/registry +common --registry=https://raw.githubusercontent.com/bazelboost/registry/main + +build --compilation_mode opt --copt=-O3 --cxxopt=-std=c++17 diff --git a/example/use-execution-queue/.bazelversion b/example/use-execution-queue/.bazelversion new file mode 100644 index 00000000..b26a34e4 --- /dev/null +++ b/example/use-execution-queue/.bazelversion @@ -0,0 +1 @@ +7.2.1 diff --git a/example/use-execution-queue/BUILD b/example/use-execution-queue/BUILD new file mode 100644 index 00000000..4f326451 --- /dev/null +++ b/example/use-execution-queue/BUILD @@ -0,0 +1,30 @@ +cc_library( + name = 'butex_interface', + hdrs = ['butex_interface.h'], + deps = [ + '@babylon//:concurrent_sched_interface', + '@brpc//:bthread', + ], +) + +cc_library( + name = 'bthread_executor', + hdrs = ['bthread_executor.h'], + deps = [ + ':butex_interface', + '@babylon//:executor', + '@babylon//:logging', + '@brpc//:bthread', + ], +) + +cc_binary( + name = 'example', + srcs = ['example.cpp'], + deps = [ + ':bthread_executor', + '@babylon//:concurrent', + '@brpc', + '@tcmalloc//tcmalloc', + ], +) diff --git a/example/use-execution-queue/MODULE.bazel b/example/use-execution-queue/MODULE.bazel new file mode 100644 index 00000000..0aface42 --- /dev/null +++ b/example/use-execution-queue/MODULE.bazel @@ -0,0 +1,4 @@ +bazel_dep(name = 'babylon', version = '1.3.1') +bazel_dep(name = 'brpc', version = '1.10.0') +bazel_dep(name = 'tcmalloc', version = '0.0.0-20240411-5ed309d') +single_version_override(module_name = 'protobuf', version = '3.19.6') diff --git a/example/use-execution-queue/README.md b/example/use-execution-queue/README.md new file mode 100644 index 00000000..5e6dca8a --- /dev/null +++ b/example/use-execution-queue/README.md @@ -0,0 +1,36 @@ +# Use ConcurrentExecutionQueue + +## 示例构成 + +- `:example`: 典型的MPSC场景,对比bthread::ExecutionQueue的性能演示 + +## 性能演示 + +### 多线程低频 + +- qps: 100K +- concurrency: 7 + +| AMD EPYC 7W83 64-Core Processor | latency | cpu | +|---------------------------------|---------------------------|-------| +| bthread | [11058,12682,19824,25916] | 0.031 | +| babylon | [4658,5320,9819,13989] | 0.019 | + +### 多线程高频 + +- qps: 5M +- concurrency: 7 + +| AMD EPYC 7W83 64-Core Processor | latency | cpu | +|---------------------------------|---------------------------------|-------| +| bthread | [727340,915718,1259166,1461166] | 1.104 | +| babylon | [272753,338788,598074,703571] | 0.618 | + +### 多线程吞吐 + +- concurrency: 7 + +| AMD EPYC 7W83 64-Core Processor | max qps | latency | cpu | +|---------------------------------|---------|------------------------------------|-------| +| bthread | 6.26M | [1099557,1321548,1539623,1882140] | 1.449 | +| babylon | 14.49M | [1674166,1957155,3212593,3918438] | 1.886 | diff --git a/example/use-execution-queue/bthread_executor.h b/example/use-execution-queue/bthread_executor.h new file mode 100644 index 00000000..2ed7b2a5 --- /dev/null +++ b/example/use-execution-queue/bthread_executor.h @@ -0,0 +1,54 @@ +#pragma once + +#include "babylon/executor.h" +#include "babylon/logging/logger.h" + +#include "bthread/bthread.h" +#include "butex_interface.h" + +BABYLON_NAMESPACE_BEGIN + +// 使用bthread线程池执行的executor +class BthreadExecutor : public Executor { + public: + inline static BthreadExecutor& instance() noexcept { + static BthreadExecutor bthread_executor; + return bthread_executor; + } + + protected: + // 启动bthread,并运行function + virtual int invoke( + MoveOnlyFunction&& function) noexcept override { + auto args = new MoveOnlyFunction(::std::move(function)); + ::bthread_t th; + if (0 != ::bthread_start_background(&th, NULL, run_function, args)) { + BABYLON_LOG(WARNING) << "start bthread to execute failed"; + function = ::std::move(*args); + delete args; + return -1; + } + return 0; + } + + private: + // 线程函数 + inline static void* run_function(void* args) noexcept { + auto function = reinterpret_cast*>(args); + (*function)(); + delete function; + return nullptr; + } +}; + +// 使用bthread线程池的async +template +inline Future::type( + typename ::std::decay::type...)>::type, + S> +bthread_async(C&& callable, Args&&... args) noexcept { + return BthreadExecutor::instance().execute(::std::forward(callable), + ::std::forward(args)...); +} + +BABYLON_NAMESPACE_END diff --git a/example/use-execution-queue/build.sh b/example/use-execution-queue/build.sh new file mode 100755 index 00000000..82f05b52 --- /dev/null +++ b/example/use-execution-queue/build.sh @@ -0,0 +1,4 @@ +#!/bin/sh +set -ex + +bazel build example diff --git a/example/use-execution-queue/butex_interface.h b/example/use-execution-queue/butex_interface.h new file mode 100644 index 00000000..c7f66603 --- /dev/null +++ b/example/use-execution-queue/butex_interface.h @@ -0,0 +1,73 @@ +#pragma once + +#include "babylon/concurrent/sched_interface.h" + +#include "absl/time/clock.h" +#include "bthread/bthread.h" +#include "bthread/butex.h" + +BABYLON_NAMESPACE_BEGIN + +struct ButexInterface : public SchedInterface { + inline constexpr static bool futex_need_create() noexcept; + inline static uint32_t* create_futex() noexcept; + inline static void destroy_futex(uint32_t* futex) noexcept; + + inline static int futex_wait(uint32_t* futex, uint32_t val, + const struct ::timespec* timeout) noexcept; + inline static int futex_wake_one(uint32_t* futex) noexcept; + inline static int futex_wake_all(uint32_t* futex) noexcept; + + inline static void usleep(int64_t us) noexcept; + inline static void yield() noexcept; +}; + +struct BsleepInterface : public SchedInterface { + inline static void usleep(int64_t us) noexcept; +}; + +//////////////////////////////////////////////////////////////////////////////// +inline constexpr bool ButexInterface::futex_need_create() noexcept { + return true; +} + +inline uint32_t* ButexInterface::create_futex() noexcept { + return ::bthread::butex_create_checked(); +} + +inline void ButexInterface::destroy_futex(uint32_t* butex) noexcept { + return ::bthread::butex_destroy(butex); +} + +inline int ButexInterface::futex_wait( + uint32_t* butex, uint32_t val, const struct ::timespec* timeout) noexcept { + if (timeout == nullptr) { + return ::bthread::butex_wait(butex, val, nullptr); + } else { + auto abstime = ::absl::ToTimespec(::absl::Now() + + ::absl::DurationFromTimespec(*timeout)); + return ::bthread::butex_wait(butex, val, &abstime); + } +} + +inline int ButexInterface::futex_wake_one(uint32_t* butex) noexcept { + return ::bthread::butex_wake(butex); +} + +inline int ButexInterface::futex_wake_all(uint32_t* butex) noexcept { + return ::bthread::butex_wake_all(butex); +} + +inline void ButexInterface::usleep(int64_t us) noexcept { + ::bthread_usleep(us); +} + +inline void ButexInterface::yield() noexcept { + ::bthread_yield(); +} + +inline void BsleepInterface::usleep(int64_t us) noexcept { + ::bthread_usleep(us); +} + +BABYLON_NAMESPACE_END diff --git a/example/use-execution-queue/example.cpp b/example/use-execution-queue/example.cpp new file mode 100644 index 00000000..5ac5b8f8 --- /dev/null +++ b/example/use-execution-queue/example.cpp @@ -0,0 +1,116 @@ +#include "babylon/concurrent/execution_queue.h" + +#include "brpc/server.h" +#include "bthread_executor.h" +#include "bthread/execution_queue.h" +#include "gflags/gflags.h" + +DEFINE_int32(dummy_port, 8000, "TCP Port of this dummy server"); +DEFINE_uint64(concurrency, 7, "Concurrent logging thread num"); +DEFINE_string(mode, "babylon", ""); +DEFINE_uint64(qps, 100000, ""); + +namespace brpc { +bool IsAskedToQuit(); +} + +struct Task { + int64_t begin; +}; + +::bthread::ExecutionQueueId bthread_queue_id; +::babylon::ConcurrentExecutionQueue babylon_queue; + +::bvar::Adder pending; +::bvar::LatencyRecorder latency; + +void run_once_bthread(int64_t begin) { + pending << 1; + ::bthread::TaskOptions options; + ::bthread::execution_queue_execute(bthread_queue_id, {begin}, &options); +} + +void run_once_babylon(int64_t begin) { + pending << 1; + babylon_queue.execute({begin}); +} + +int bthread_queue_consume(void*, ::bthread::TaskIterator& iter) { + if (iter.is_queue_stopped()) { + return 0; + } + for (; iter; ++iter) { + latency << ::butil::monotonic_time_ns() - iter->begin; + pending << -1; + } + return 0; +} + +void babylon_queue_consume(::babylon::ConcurrentExecutionQueue::Iterator iter, ::babylon::ConcurrentExecutionQueue::Iterator end) { + for (; iter != end; ++iter) { + latency << ::butil::monotonic_time_ns() - iter->begin; + pending << -1; + } +} + +void run_loop() { + size_t times = 0; + int64_t expect_us = 0; + while (expect_us < 10000) { + ++times; + expect_us = 1000000L * times * FLAGS_concurrency / FLAGS_qps; + } + ::std::cerr << "expect_us " << expect_us << " times " << times << ::std::endl; + + void (*run_once)(int64_t); + if (FLAGS_mode == "babylon") { + run_once = run_once_babylon; + } else if (FLAGS_mode == "bthread") { + run_once = run_once_bthread; + } else { + return; + } + + ::std::vector<::babylon::Future> futures; + for (size_t i = 0; i < FLAGS_concurrency; ++i) { + futures.emplace_back(::babylon::BthreadExecutor::instance().execute([&] { + while (!::brpc::IsAskedToQuit()) { + auto begin = ::butil::monotonic_time_ns(); + for (size_t j = 0; j < times; ++j) { + run_once(::butil::monotonic_time_ns()); + } + auto end = ::butil::monotonic_time_ns(); + auto use_us = (end - begin) / 1000; + if (use_us < expect_us) { + ::usleep(expect_us - use_us); + } + } + })); + } + + for (auto& future : futures) { + future.get(); + } +} + +int main(int argc, char* argv[]) { + ::gflags::ParseCommandLineFlags(&argc, &argv, true); + + ::brpc::StartDummyServerAt(FLAGS_dummy_port); + + pending.expose("test_" + FLAGS_mode + "_pending"); + latency.expose("test_" + FLAGS_mode); + + if (FLAGS_mode == "babylon") { + babylon_queue.initialize(1L << 18, ::babylon::BthreadExecutor::instance(), babylon_queue_consume); + } else if (FLAGS_mode == "bthread") { + ::bthread::ExecutionQueueOptions options; + ::bthread::execution_queue_start(&bthread_queue_id, &options, bthread_queue_consume, nullptr); + } else { + return 0; + } + + run_loop(); + + return 0; +} diff --git a/src/babylon/logging/async_file_appender.cpp b/src/babylon/logging/async_file_appender.cpp index ca41bcb5..12e6a33c 100644 --- a/src/babylon/logging/async_file_appender.cpp +++ b/src/babylon/logging/async_file_appender.cpp @@ -130,7 +130,8 @@ void AsyncFileAppender::write_use_plain_writev(Destination& dest, for (size_t i = 0; i < size; ++i) { pages.emplace_back(piov[i].iov_base); } - ::writev(fd, piov, size); + auto written = ::writev(fd, piov, size); + (void) written; iter += size; }