diff --git a/.bazelrc b/.bazelrc index 378515d..3b46387 100644 --- a/.bazelrc +++ b/.bazelrc @@ -2,10 +2,10 @@ common --registry=https://bcr.bazel.build common --registry=file://%workspace%/registry common --registry=https://raw.githubusercontent.com/bazelboost/registry/main -build --cxxopt=-std=c++17 +build --cxxopt=-std=c++20 test --//:werror --features external_include_paths -test --@rules_cuda//cuda:enable=False + # SwissMemoryResource's patch violate odr rule. But still keep same struct with same defination and same size # change detect_odr_violation to level 1 to check that test --test_env=ASAN_OPTIONS=detect_odr_violation=1 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8901e40..59bd0e2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,9 +9,10 @@ on: jobs: basic: strategy: + fail-fast: false matrix: - compiler: [{name: gcc, flag: --action_env=CC=gcc-12}, {name: clang, flag: --action_env=CC=clang-14}] - std: [{name: c++17}, {name: c++14, flag: --cxxopt=-std=c++14 --cxxopt=-faligned-new}] + compiler: [{name: gcc, flag: --action_env=CC=gcc-14}, {name: clang, flag: --action_env=CC=clang-18}] + std: [{name: c++20}, {name: c++14, flag: --cxxopt=-std=c++14 --cxxopt=-faligned-new}] stdlib: [{name: stdlibc++}, {name: libc++, flag: --cxxopt=-stdlib=libc++ --linkopt=-stdlib=libc++}] feature: [{name: asan, flag: --config=asan}, {name: tsan, flag: --features=tsan}] exclude: @@ -19,7 +20,7 @@ jobs: stdlib: {name: libc++} - std: {name: c++14} feature: {name: tsan} - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 name: basic-${{matrix.compiler.name}}-${{matrix.std.name}}-${{matrix.stdlib.name}}-${{matrix.feature.name}} steps: - uses: actions/checkout@v4 @@ -29,12 +30,14 @@ jobs: key: bazel-disk-basic-${{matrix.compiler.name}}-${{matrix.std.name}}-${{matrix.stdlib.name}}-${{matrix.feature.name}}-${{github.sha}} restore-keys: bazel-disk-basic-${{matrix.compiler.name}}-${{matrix.std.name}}-${{matrix.stdlib.name}}-${{matrix.feature.name}}- save-always: true - - run: bazel test --compilation_mode=opt --disk_cache=bazel-disk --verbose_failures --test_output=errors ${{matrix.compiler.flag}} ${{matrix.std.flag}} ${{matrix.stdlib.flag}} ${{matrix.feature.flag}} test/... + - run: sudo apt install libc++-18-dev libc++abi-18-dev + - run: bazel test --compilation_mode=dbg --disk_cache=bazel-disk --verbose_failures --test_output=errors ${{matrix.compiler.flag}} ${{matrix.std.flag}} ${{matrix.stdlib.flag}} ${{matrix.feature.flag}} test/... arenastring: strategy: + fail-fast: false matrix: - compiler: [{name: gcc, flag: --action_env=CC=gcc-12}, {name: clang, flag: --action_env=CC=clang-14}] + compiler: [{name: gcc, flag: --action_env=CC=gcc-14}, {name: clang, flag: --action_env=CC=clang-18}] stdlib: [{name: stdlibc++}, {name: libc++, flag: --cxxopt=-stdlib=libc++ --linkopt=-stdlib=libc++}] mutable: [{name: default}, {name: half, flag: --config=arenastring}, {name: full, flag: --config=mutable-donated-string}] exclude: @@ -44,7 +47,7 @@ jobs: mutable: {name: half} - compiler: {name: clang} mutable: {name: full} - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 name: arenastring-${{matrix.compiler.name}}-${{matrix.stdlib.name}}-${{matrix.mutable.name}} steps: - uses: actions/checkout@v4 @@ -54,11 +57,12 @@ jobs: key: bazel-disk-arenastring-${{matrix.compiler.name}}-${{matrix.stdlib.name}}-${{matrix.mutable.name}}-${{github.sha}} restore-keys: bazel-disk-arenastring-${{matrix.compiler.name}}-${{matrix.stdlib.name}}-${{matrix.mutable.name}}- save-always: true + - run: sudo apt install libc++-18-dev libc++abi-18-dev - run: sed -i "/single_version_override.*protobuf/s/version = '[^']*'/version = '27.3.arenastring'/" MODULE.bazel - run: bazel test --compilation_mode=opt --disk_cache=bazel-disk --verbose_failures --test_output=errors --config=asan ${{matrix.compiler.flag}} ${{matrix.stdlib.flag}} ${{matrix.mutable.flag}} test/... aarch64: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - uses: actions/cache@v4 @@ -74,7 +78,7 @@ jobs: - run: bazel test --disk_cache=bazel-disk --verbose_failures --test_output=errors --platforms='@cross_config_toolchain//:cross' --action_env=CROSS_CC=/usr/bin/aarch64-linux-gnu-gcc-12 --features=-default_link_flags --test_env=LD_LIBRARY_PATH=/usr/aarch64-linux-gnu/lib -- test/... -test/logging:test_log_statically workspace: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - uses: actions/cache@v4 @@ -86,7 +90,7 @@ jobs: - run: bazel test --disk_cache=bazel-disk --verbose_failures --test_output=errors --enable_bzlmod=false test/... coverage: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - uses: actions/cache@v4 @@ -103,7 +107,7 @@ jobs: file: bazel-out/_coverage/_coverage_report.lcov cmake: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - uses: hendrikmuhs/ccache-action@v1.2 diff --git a/BUILD b/BUILD index b0cb564..403838d 100644 --- a/BUILD +++ b/BUILD @@ -125,6 +125,11 @@ alias( actual = '//src/babylon/concurrent:vector', ) +alias( + name = 'coroutine', + actual = '//src/babylon:coroutine', +) + alias( name = 'executor', actual = '//src/babylon:executor', diff --git a/CMakeLists.txt b/CMakeLists.txt index 724f1ea..b7e2981 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,11 @@ include(CMakePackageConfigHelpers) # for write_basic_package_version_file option(BUILD_DEPS "Use FetchContent download and build dependencies" OFF) +if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR) + set(CMAKE_CXX_STANDARD 20) + set(BUILD_TESTING ON) +endif() + if(BUILD_DEPS) include(FetchContent) FetchContent_Declare( @@ -30,8 +35,16 @@ if(BUILD_DEPS) URL "https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz" URL_HASH SHA256=8ad598c73ad796e0d8280b082cebd82a630d73e73cd3c70057938a6501bba5d7 ) + FetchContent_Declare( + libcoro + URL "https://github.com/jbaldwin/libcoro/archive/refs/tags/v0.12.1.tar.gz" + URL_HASH SHA256=2cb6f45fc73dad6008cc930d92939785684835e03b12df422b98fcab9e393add + ) if(BUILD_TESTING AND CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR) - FetchContent_MakeAvailable(googletest) + set(LIBCORO_FEATURE_NETWORKING OFF) + set(LIBCORO_BUILD_TESTS OFF) + set(LIBCORO_BUILD_EXAMPLES OFF) + FetchContent_MakeAvailable(googletest libcoro) endif() set(protobuf_BUILD_TESTS OFF) set(ABSL_ENABLE_INSTALL ON) @@ -113,6 +126,7 @@ if(BUILD_TESTING AND CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR) target_link_libraries("${TARGET_NAME}" babylon_test_proto) target_link_libraries("${TARGET_NAME}" babylon) target_link_libraries("${TARGET_NAME}" GTest::gtest_main) + target_link_libraries("${TARGET_NAME}" libcoro) gtest_discover_tests("${TARGET_NAME}") endforeach() diff --git a/MODULE.bazel b/MODULE.bazel index 555d0a3..07b40be 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -29,16 +29,12 @@ single_version_override(module_name = 'protobuf', version = '27.3') bazel_dep(name = 'googletest', version = '1.15.2', repo_name = 'com_google_googletest', dev_dependency = True) bazel_dep(name = 'platforms', version = '0.0.10', dev_dependency = True) bazel_dep(name = 'rules_cc', version = '0.0.9', dev_dependency = True) +bazel_dep(name = 'rules_cuda', version = '0.2.3', dev_dependency = True) -# rules_cuda latest release 0.2.1 is too old and do not have auto detect feature -bazel_dep(name = 'rules_cuda', version = '0.2.1', dev_dependency = True) -archive_override( - module_name = 'rules_cuda', - urls = ['https://github.com/bazel-contrib/rules_cuda/archive/3482c70dc60d9ab1ad26b768c117fcd61ee12494.tar.gz'], - strip_prefix = 'rules_cuda-3482c70dc60d9ab1ad26b768c117fcd61ee12494', - integrity = 'sha256-x78dpBtaMUgKBHf0ztSe7QirHLOv93xwTjc8+cUmlPU=', -) +# --registry=https://baidu.github.io/babylon/registry +bazel_dep(name = 'libcoro', version = '0.12.1', dev_dependency = True) +# cuda toolchain cuda = use_extension('@rules_cuda//cuda:extensions.bzl', 'toolchain', dev_dependency = True) cuda.local_toolchain() diff --git a/WORKSPACE b/WORKSPACE index 75c9e7d..12dbbff 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -4,18 +4,18 @@ load('@bazel_tools//tools/build_defs/repo:http.bzl', 'http_archive') http_archive( name = 'com_google_absl', - urls = ['https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz'], - strip_prefix = 'abseil-cpp-20211102.0', - sha256 = 'dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4', + urls = ['https://github.com/abseil/abseil-cpp/archive/refs/tags/20220623.1.tar.gz'], + strip_prefix = 'abseil-cpp-20220623.1', + sha256 = '91ac87d30cc6d79f9ab974c51874a704de9c2647c40f6932597329a282217ba8', ) http_archive( name = 'com_google_protobuf', - urls = ['https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.19.6.tar.gz'], + urls = ['https://github.com/protocolbuffers/protobuf/archive/refs/tags/v3.19.6.zip'], strip_prefix = 'protobuf-3.19.6', - sha256 = '9a301cf94a8ddcb380b901e7aac852780b826595075577bb967004050c835056', + sha256 = '387e2c559bb2c7c1bc3798c4e6cff015381a79b2758696afcbf8e88730b47389', ) -load("@com_google_protobuf//:protobuf_deps.bzl", "protobuf_deps") +load('@com_google_protobuf//:protobuf_deps.bzl', 'protobuf_deps') protobuf_deps() http_archive( @@ -24,25 +24,44 @@ http_archive( strip_prefix = 'rules_boost-4ab574f9a84b42b1809978114a4664184716f4bf', sha256 = '2215e6910eb763a971b1f63f53c45c0f2b7607df38c96287666d94d954da8cdc', ) -load("@com_github_nelhage_rules_boost//:boost/boost.bzl", "boost_deps") +load('@com_github_nelhage_rules_boost//:boost/boost.bzl', 'boost_deps') boost_deps() ################################################################################ # test only dependency http_archive( name = 'com_google_googletest', - urls = ['https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz'], - strip_prefix = 'googletest-1.14.0', - sha256 = '8ad598c73ad796e0d8280b082cebd82a630d73e73cd3c70057938a6501bba5d7', + urls = ['https://github.com/google/googletest/releases/download/v1.15.2/googletest-1.15.2.tar.gz'], + strip_prefix = 'googletest-1.15.2', + sha256 = '7b42b4d6ed48810c5362c265a17faebe90dc2373c885e5216439d37927f02926', ) http_archive( - name = "rules_cuda", - urls = ["https://github.com/bazel-contrib/rules_cuda/archive/3482c70dc60d9ab1ad26b768c117fcd61ee12494.tar.gz"], - strip_prefix = "rules_cuda-3482c70dc60d9ab1ad26b768c117fcd61ee12494", - sha256 = 'c7bf1da41b5a31480a0477f4ced49eed08ab1cb3aff77c704e373cf9c52694f5', + name = 'rules_foreign_cc', + urls = ['https://github.com/bazelbuild/rules_foreign_cc/releases/download/0.12.0/rules_foreign_cc-0.12.0.tar.gz'], + strip_prefix = 'rules_foreign_cc-0.12.0', + sha256 = 'a2e6fb56e649c1ee79703e99aa0c9d13c6cc53c8d7a0cbb8797ab2888bbc99a3', ) -load("@rules_cuda//cuda:repositories.bzl", "register_detected_cuda_toolchains", "rules_cuda_dependencies") +load('@rules_foreign_cc//foreign_cc:repositories.bzl', 'rules_foreign_cc_dependencies') +rules_foreign_cc_dependencies() +load("@bazel_features//:deps.bzl", "bazel_features_deps") +bazel_features_deps() + +http_archive( + name = 'libcoro', + urls = ['https://github.com/jbaldwin/libcoro/archive/refs/tags/v0.12.1.tar.gz'], + strip_prefix = 'libcoro-0.12.1', + sha256 = '2cb6f45fc73dad6008cc930d92939785684835e03b12df422b98fcab9e393add', + build_file = '@//:registry/modules/libcoro/0.12.1/overlay/BUILD', +) + +http_archive( + name = 'rules_cuda', + urls = ['https://github.com/bazel-contrib/rules_cuda/releases/download/v0.2.3/rules_cuda-v0.2.3.tar.gz'], + strip_prefix = 'rules_cuda-v0.2.3', + sha256 = 'c92b334d769a07cd991b7675b2f6076b8b95cd3b28b14268a2f379f8baae58e0', +) +load('@rules_cuda//cuda:repositories.bzl', 'register_detected_cuda_toolchains', 'rules_cuda_dependencies') rules_cuda_dependencies() register_detected_cuda_toolchains() ################################################################################ diff --git a/copts.bzl b/copts.bzl index 92a43f4..13b2832 100644 --- a/copts.bzl +++ b/copts.bzl @@ -19,11 +19,13 @@ BABYLON_CLANG_COPTS = ['-faligned-new', '-Weverything', '-Wno-unknown-warning-op # BABYLON_SERIALIZABLE宏内部定义的一些辅助成员在继承场景下会发生同名隐藏 # 但是这个场景下的隐藏本身是无害的,目前也还没有很好的方法做命名区分 '-Wno-shadow-field', + # 采用和gcc更一致的-Wswitch-default风格 + '-Wno-covered-switch-default', # TODO(lijiang01): 逐步梳理清除 - '-Wno-weak-vtables', '-Wno-float-conversion', '-Wno-switch-enum', + '-Wno-weak-vtables', '-Wno-float-conversion', '-Wno-switch-enum', '-Wno-c++17-extensions', '-Wno-gnu-anonymous-struct', '-Wno-nested-anon-types', '-Wno-array-bounds-pointer-arithmetic', '-Wno-cast-align', '-Wno-vla-extension', - '-Wno-unneeded-member-function', '-Wno-deprecated-declarations'] + '-Wno-unneeded-member-function', '-Wno-deprecated-declarations', '-Wno-unsafe-buffer-usage'] BABYLON_COPTS = select({ '//:compiler_gcc': BABYLON_GCC_COPTS, diff --git a/registry/modules/libcoro/0.12.1/MODULE.bazel b/registry/modules/libcoro/0.12.1/MODULE.bazel new file mode 100644 index 0000000..f383089 --- /dev/null +++ b/registry/modules/libcoro/0.12.1/MODULE.bazel @@ -0,0 +1,7 @@ +module( + name = 'libcoro', + version = '0.12.1', + compatibility_level = 1, +) + +bazel_dep(name = 'rules_foreign_cc', version = '0.8.0') diff --git a/registry/modules/libcoro/0.12.1/overlay/BUILD b/registry/modules/libcoro/0.12.1/overlay/BUILD new file mode 100644 index 0000000..d85ccde --- /dev/null +++ b/registry/modules/libcoro/0.12.1/overlay/BUILD @@ -0,0 +1,22 @@ +package( + default_visibility = ['//:__pkg__'], +) + +load('@rules_foreign_cc//foreign_cc:defs.bzl', 'cmake') + +filegroup( + name = 'all_srcs', + srcs = glob(['**'], exclude = ['bazel-*/**']), +) + +cmake( + name = 'libcoro', + lib_source = ':all_srcs', + cache_entries = { + 'LIBCORO_BUILD_TESTS': 'OFF', + 'LIBCORO_BUILD_EXAMPLES': 'OFF', + 'LIBCORO_FEATURE_NETWORKING': 'OFF', + }, + lib_name = 'libcoro', + visibility = ['//visibility:public'], +) diff --git a/registry/modules/libcoro/0.12.1/overlay/MODULE.bazel b/registry/modules/libcoro/0.12.1/overlay/MODULE.bazel new file mode 120000 index 0000000..9b599e3 --- /dev/null +++ b/registry/modules/libcoro/0.12.1/overlay/MODULE.bazel @@ -0,0 +1 @@ +../MODULE.bazel \ No newline at end of file diff --git a/registry/modules/libcoro/0.12.1/source.json b/registry/modules/libcoro/0.12.1/source.json new file mode 100644 index 0000000..e947f3a --- /dev/null +++ b/registry/modules/libcoro/0.12.1/source.json @@ -0,0 +1,9 @@ +{ + "url": "https://github.com/jbaldwin/libcoro/archive/refs/tags/v0.12.1.tar.gz", + "strip_prefix": "libcoro-0.12.1", + "integrity": "sha256-LLb0X8c9rWAIzJMNkpOXhWhINeA7Et9CK5j8q545Ot0=", + "overlay": { + "MODULE.bazel": "sha256-qMTe85CeeBmQXe2lUdrFXVu1K6oMPJ7CiwGGPbd1+A0=", + "BUILD": "sha256-et8EWyLNFl1uG3Zq+wsCOQyGkTFir9XXQEfwSki+ocI=" + } +} diff --git a/src/babylon/BUILD b/src/babylon/BUILD index 40fa8d5..655c961 100644 --- a/src/babylon/BUILD +++ b/src/babylon/BUILD @@ -68,29 +68,42 @@ cc_library( ) cc_library( - name = 'environment', - hdrs = ['environment.h', 'protect.h', 'unprotect.h'], + name = 'coroutine_and_executor', + srcs = ['coroutine.cpp', 'executor.cpp'], + hdrs = ['coroutine.h', 'current_executor.h', 'executor.h', 'executor.hpp'], copts = BABYLON_COPTS, includes = ['//src'], strip_include_prefix = '//src', deps = [ - '@com_google_absl//absl/base:core_headers', + ':future', + '//src/babylon/logging', + '//src/babylon/concurrent:bounded_queue', + '@libcoro', ], ) +alias( + name = 'coroutine', + actual = '//src/babylon:coroutine_and_executor', +) + + cc_library( - name = 'executor', - srcs = ['executor.cpp'], - hdrs = ['executor.h', 'executor.hpp'], + name = 'environment', + hdrs = ['environment.h', 'protect.h', 'unprotect.h'], copts = BABYLON_COPTS, includes = ['//src'], strip_include_prefix = '//src', deps = [ - ':future', - '//src/babylon/concurrent:bounded_queue', + '@com_google_absl//absl/base:core_headers', ], ) +alias( + name = 'executor', + actual = '//src/babylon:coroutine_and_executor', +) + cc_library( name = 'future', hdrs = ['future.h', 'future.hpp'], @@ -220,5 +233,6 @@ cc_library( deps = [ ':absl_base_internal_invoke', ':string_view', + '@com_google_absl//absl/utility', ], ) diff --git a/src/babylon/anyflow/builder.h b/src/babylon/anyflow/builder.h index a875ecd..3d0405a 100644 --- a/src/babylon/anyflow/builder.h +++ b/src/babylon/anyflow/builder.h @@ -1,6 +1,7 @@ #pragma once #include "babylon/any.h" +#include "babylon/anyflow/dependency.h" #include "babylon/anyflow/executor.h" #include "babylon/reusable/memory_resource.h" diff --git a/src/babylon/anyflow/builtin/alias.cpp b/src/babylon/anyflow/builtin/alias.cpp index 92fa939..19591d2 100644 --- a/src/babylon/anyflow/builtin/alias.cpp +++ b/src/babylon/anyflow/builtin/alias.cpp @@ -45,7 +45,7 @@ void AliasProcessor::apply(GraphBuilder& builder, const ::std::string& alias, vertex.anonymous_emit().to(alias); } -::std::atomic AliasProcessor::_s_idx = ATOMIC_VAR_INIT(0); +::std::atomic AliasProcessor::_s_idx {0}; } // namespace builtin } // namespace anyflow diff --git a/src/babylon/anyflow/builtin/const.cpp b/src/babylon/anyflow/builtin/const.cpp index 25f6d70..b8c7397 100644 --- a/src/babylon/anyflow/builtin/const.cpp +++ b/src/babylon/anyflow/builtin/const.cpp @@ -32,7 +32,7 @@ GraphVertexBuilder& ConstProcessor::apply_without_value( return vertex; } -::std::atomic ConstProcessor::_s_idx = ATOMIC_VAR_INIT(0); +::std::atomic ConstProcessor::_s_idx {0}; } // namespace builtin } // namespace anyflow diff --git a/src/babylon/anyflow/builtin/expression.cpp b/src/babylon/anyflow/builtin/expression.cpp index 44dfb70..2a9dd8c 100644 --- a/src/babylon/anyflow/builtin/expression.cpp +++ b/src/babylon/anyflow/builtin/expression.cpp @@ -1007,7 +1007,7 @@ int32_t ExpressionProcessor::expend_non_conditional_expression( return 0; } -::std::atomic ExpressionProcessor::_s_idx = ATOMIC_VAR_INIT(0); +::std::atomic ExpressionProcessor::_s_idx {0}; // ExpressionProcessor end /////////////////////////////////////////////////////////////////////////////// diff --git a/src/babylon/anyflow/builtin/select.cpp b/src/babylon/anyflow/builtin/select.cpp index b3a0265..2b7c5e3 100644 --- a/src/babylon/anyflow/builtin/select.cpp +++ b/src/babylon/anyflow/builtin/select.cpp @@ -56,7 +56,7 @@ void SelectProcessor::apply(GraphBuilder& builder, const ::std::string& dest, vertex.anonymous_emit().to(dest); } -::std::atomic SelectProcessor::_s_idx = ATOMIC_VAR_INIT(0); +::std::atomic SelectProcessor::_s_idx {0}; // SelectProcessor end /////////////////////////////////////////////////////////////////////////////// diff --git a/src/babylon/anyflow/closure.cpp b/src/babylon/anyflow/closure.cpp index 29e2c8e..a435c8e 100644 --- a/src/babylon/anyflow/closure.cpp +++ b/src/babylon/anyflow/closure.cpp @@ -1,6 +1,7 @@ #include "babylon/anyflow/closure.h" #include "babylon/anyflow/data.h" +#include "babylon/anyflow/vertex.h" BABYLON_NAMESPACE_BEGIN namespace anyflow { diff --git a/src/babylon/anyflow/data.cpp b/src/babylon/anyflow/data.cpp index fbec773..572a65f 100644 --- a/src/babylon/anyflow/data.cpp +++ b/src/babylon/anyflow/data.cpp @@ -71,6 +71,26 @@ int GraphData::recursive_activate(VertexStack& runnable_vertexes, return 0; } +int GraphData::activate(DataStack& activating_data, + VertexStack& runnable_vertexes, + ClosureContext* closure) noexcept { + // trigger时检测过一次ready,送给activate的如果没有producer直接报错 + if (ABSL_PREDICT_FALSE(_producers.empty())) { + BABYLON_LOG(WARNING) << "can not activate " << *this << " with no producer"; + return -1; + } + for (auto& producer : _producers) { + if (ABSL_PREDICT_FALSE(0 != producer->activate(activating_data, + runnable_vertexes, + closure))) { + BABYLON_LOG(WARNING) << "activate producer " << producer << " of " + << *this << " failed"; + return -1; + } + } + return 0; +} + bool GraphData::check_safe_mutable() const noexcept { // 依赖不超过1个,是安全的 if (_successors.size() <= 1) { diff --git a/src/babylon/anyflow/data.h b/src/babylon/anyflow/data.h index ecea4e1..7ba8195 100644 --- a/src/babylon/anyflow/data.h +++ b/src/babylon/anyflow/data.h @@ -375,9 +375,8 @@ class GraphData { // 激活当前data,如果producer依赖就绪,则加入runnable_vertexes // 如果producer还有依赖,则进入激活状态,并将进一步需要激活的节点 // 加入activating_data - inline int32_t activate(DataStack& activating_data, - VertexStack& runnable_vertexes, - ClosureContext* closure) noexcept; + int activate(DataStack& activating_data, VertexStack& runnable_vertexes, + ClosureContext* closure) noexcept; bool check_safe_mutable() const noexcept; diff --git a/src/babylon/anyflow/data.hpp b/src/babylon/anyflow/data.hpp index 7248942..f63199b 100644 --- a/src/babylon/anyflow/data.hpp +++ b/src/babylon/anyflow/data.hpp @@ -3,7 +3,10 @@ #include "babylon/anyflow/closure.h" #include "babylon/anyflow/data.h" #include "babylon/anyflow/dependency.h" -#include "babylon/anyflow/vertex.h" + +// clang-format off +#include "babylon/protect.h" +// clang-format on BABYLON_NAMESPACE_BEGIN namespace anyflow { @@ -338,16 +341,16 @@ void GraphData::default_on_reset(Any& data) noexcept { template inline void GraphData::set_on_reset(C&& callback) noexcept { -#if __cpp_init_captures - _on_reset = [callback = ::std::forward(callback)](Any& data) { -#else // !__cpp_init_captures - _on_reset = [callback](Any& data) { -#endif // !__cpp_init_captures - auto value = switch_to(data); - if (value != nullptr) { - callback(*value); + struct S { + void operator()(Any& data) { + auto value = switch_to(data); + if (value != nullptr) { + callback(*value); + } } + C callback; }; + _on_reset = S {.callback {::std::forward(callback)}}; } template @@ -597,26 +600,6 @@ inline void GraphData::trigger(DataStack& activating_data) noexcept { } } -inline int32_t GraphData::activate(DataStack& activating_data, - VertexStack& runnable_vertexes, - ClosureContext* closure) noexcept { - // trigger时检测过一次ready,送给activate的如果没有producer直接报错 - if (ABSL_PREDICT_FALSE(_producers.empty())) { - BABYLON_LOG(WARNING) << "can not activate " << *this << " with no producer"; - return -1; - } - for (auto& producer : _producers) { - if (ABSL_PREDICT_FALSE(0 != producer->activate(activating_data, - runnable_vertexes, - closure))) { - BABYLON_LOG(WARNING) << "activate producer " << producer << " of " - << *this << " failed"; - return -1; - } - } - return 0; -} - inline ::std::ostream& operator<<(::std::ostream& os, const GraphData& data) { os << "data[" << data._name << "]"; return os; @@ -626,3 +609,5 @@ inline ::std::ostream& operator<<(::std::ostream& os, const GraphData& data) { } // namespace anyflow BABYLON_NAMESPACE_END + +#include "babylon/unprotect.h" diff --git a/src/babylon/anyflow/dependency.cpp b/src/babylon/anyflow/dependency.cpp new file mode 100644 index 0000000..0acaa46 --- /dev/null +++ b/src/babylon/anyflow/dependency.cpp @@ -0,0 +1,171 @@ +#include "babylon/anyflow/dependency.h" + +#include "babylon/anyflow/vertex.h" + +BABYLON_NAMESPACE_BEGIN +namespace anyflow { + +int GraphDependency::activated_vertex_name( + ::std::vector& vertex_names) const noexcept { + int err = 0; + if (_ready) { + auto* producers = _target->producers(); + if (producers != nullptr && producers->empty() == false) { + for (auto& producer : *producers) { + vertex_names.emplace_back(producer->name()); + } + } else { + err = 1; + } + } else { + err = -1; + } + return err; +} + +int GraphDependency::activated_vertex_name( + std::string& vertex_name) const noexcept { + int err = 0; + if (_ready) { + auto* producers = _target->producers(); + if (producers != nullptr && producers->empty() == false) { + vertex_name = producers->at(0)->name(); + } else { + err = 1; + } + } else { + err = -1; + } + return err; +} + +int GraphDependency::activate(DataStack& activating_data) noexcept { + // 根据是否是条件依赖,对waiting_num进行+1或者+2 + int64_t waiting_num = _condition == nullptr ? 1 : 2; + waiting_num = + _waiting_num.fetch_add(waiting_num, ::std::memory_order_acq_rel) + + waiting_num; + // waiting_num终值域[-1, 0, 1, 2] + // [-1, 0]均表达激活前已经就绪,其余终值等待后续data可用来出发就绪 + // 包括condition不满足[-2 0|-1]和condition和target都就绪[-1 -1] + // [1]需要检测condition,如果成立(条件不存在也是成立)则尝试激活target + switch (waiting_num) { + // 激活时已经就绪,且条件不成立 + case -1: { + return 1; + } + // 激活时已经就绪,且条件可能成立 + case 0: { + if (check_established()) { + auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() + : _target->acquire_mutable_depend(); + if (ABSL_PREDICT_FALSE(!acquired_depend)) { + BABYLON_LOG(WARNING) + << "dependency " << _source << " to " << *_target + << " can not be mutable for other already depend it"; + return -1; + } + _ready = _target->ready(); + } + return 1; + } + case 1: { + // 无condition,激活target + if (_condition == nullptr) { + _established = true; + auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() + : _target->acquire_mutable_depend(); + if (ABSL_PREDICT_FALSE(!acquired_depend)) { + BABYLON_LOG(WARNING) + << "dependency " << _source << " to " << *_target + << " can not be mutable for other already depend it"; + return -1; + } + _target->trigger(activating_data); + // condition未就绪,激活condition + } else if (!_condition->ready()) { + _condition->trigger(activating_data); + // condition成立,激活target + } else if (check_established()) { + auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() + : _target->acquire_mutable_depend(); + if (ABSL_PREDICT_FALSE(!acquired_depend)) { + BABYLON_LOG(WARNING) + << "dependency " << *_source << " to " << *_target << " on " + << *_condition + << " can not be mutable for other already mutate it"; + return -1; + } + _target->trigger(activating_data); + } + // else condition不成立,但是waiting_num == 1 + // 说明condition正在【失败过程中】,即两次-1之间 + // 等待另一次-1到来即可,这里不做处理 + break; + } + // condition未就绪,激活condition + case 2: { + _condition->trigger(activating_data); + break; + } + default: { + break; + } + } + return 0; +} + +void GraphDependency::ready(GraphData* data, + VertexStack& runnable_vertexes) noexcept { + int64_t waiting_num = + _waiting_num.fetch_sub(1, ::std::memory_order_acq_rel) - 1; + // condition完成时检测条件是否成立 + if (data == _condition) { + // 成立时 + if (check_established()) { + // 如果waiting num是1,,则激活target + if (waiting_num == 1) { + auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() + : _target->acquire_mutable_depend(); + if (ABSL_PREDICT_FALSE(!acquired_depend)) { + BABYLON_LOG(WARNING) + << "dependency " << _source << " to " << *_target + << " can not be mutable for other already depend it"; + _source->closure()->finish(-1); + return; + } + + int32_t rec_ret = + _target->recursive_activate(runnable_vertexes, _source->closure()); + if (0 != rec_ret) { + BABYLON_LOG(WARNING) + << "recursive_activate from " << *_target << " failed"; + _source->closure()->finish(rec_ret); + return; + } + } + // 不成立,waiting num再减1,由于target可以从别的渠道完成,有击穿的可能 + // 通过边沿触发和激活时[-1, 0]双终态解决 + } else if (waiting_num != 0) { + waiting_num = _waiting_num.fetch_sub(1, ::std::memory_order_acq_rel) - 1; + } + } + // 无condition的target就绪 + // 或者condition满足时target已经就绪 + // 或者condition不满足时 + // 当condition和target的就绪以及激活操作并发时 + // 就绪的终态[0]和激活的终态[-1, 0]确保不重不漏 + if (waiting_num == 0 && nullptr != _source) { + if (data == _target) { + _ready = check_established(); + } else { + _ready = established() && _target->ready(); + } + if (_source->ready(this)) { + runnable_vertexes.emplace_back(_source); + } + } +} + +} // namespace anyflow +BABYLON_NAMESPACE_END diff --git a/src/babylon/anyflow/dependency.h b/src/babylon/anyflow/dependency.h index 82d92b5..68140f0 100644 --- a/src/babylon/anyflow/dependency.h +++ b/src/babylon/anyflow/dependency.h @@ -3,6 +3,8 @@ #include "babylon/any.h" #include "babylon/concurrent/transient_topic.h" +#include "absl/container/inlined_vector.h" + #include BABYLON_NAMESPACE_BEGIN @@ -74,9 +76,9 @@ class GraphDependency { // T == Any 特化:返回目标底层Any容器的指针,用于多类型支持等高级场景 template inline T* mutable_value() noexcept; - inline int activated_vertex_name( + int activated_vertex_name( ::std::vector& vertex_name) const noexcept; - inline int activated_vertex_name(std::string& vertex_name) const noexcept; + int activated_vertex_name(std::string& vertex_name) const noexcept; /////////////////////////////////////////////////////////////////////////// template @@ -96,8 +98,8 @@ class GraphDependency { // return 0: 正常激活,进入等待状态 // 1: 正常激活,进入完成状态 // <0: 激活失败 - inline int activate(DataStack& activating_data) noexcept; - inline void ready(GraphData* data, VertexStack& runnable_vertexes) noexcept; + int activate(DataStack& activating_data) noexcept; + void ready(GraphData* data, VertexStack& runnable_vertexes) noexcept; // 检测依赖是否成立,实际读取原子变量 // 如果依赖成立,设置_established供后续使用 inline bool check_established() noexcept; diff --git a/src/babylon/anyflow/dependency.hpp b/src/babylon/anyflow/dependency.hpp index 8e75f0f..ce16d48 100644 --- a/src/babylon/anyflow/dependency.hpp +++ b/src/babylon/anyflow/dependency.hpp @@ -2,7 +2,7 @@ #include "babylon/anyflow/data.h" #include "babylon/anyflow/dependency.h" -#include "babylon/anyflow/vertex.h" +// #include "babylon/anyflow/vertex.h" BABYLON_NAMESPACE_BEGIN namespace anyflow { @@ -118,167 +118,6 @@ bool GraphDependency::check_established() noexcept { return _established; } -int32_t GraphDependency::activate(DataStack& activating_data) noexcept { - // 根据是否是条件依赖,对waiting_num进行+1或者+2 - int64_t waiting_num = _condition == nullptr ? 1 : 2; - waiting_num = - _waiting_num.fetch_add(waiting_num, ::std::memory_order_acq_rel) + - waiting_num; - // waiting_num终值域[-1, 0, 1, 2] - // [-1, 0]均表达激活前已经就绪,其余终值等待后续data可用来出发就绪 - // 包括condition不满足[-2 0|-1]和condition和target都就绪[-1 -1] - // [1]需要检测condition,如果成立(条件不存在也是成立)则尝试激活target - switch (waiting_num) { - // 激活时已经就绪,且条件不成立 - case -1: { - return 1; - } - // 激活时已经就绪,且条件可能成立 - case 0: { - if (check_established()) { - auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() - : _target->acquire_mutable_depend(); - if (ABSL_PREDICT_FALSE(!acquired_depend)) { - BABYLON_LOG(WARNING) - << "dependency " << _source << " to " << *_target - << " can not be mutable for other already depend it"; - return -1; - } - _ready = _target->ready(); - } - return 1; - } - case 1: { - // 无condition,激活target - if (_condition == nullptr) { - _established = true; - auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() - : _target->acquire_mutable_depend(); - if (ABSL_PREDICT_FALSE(!acquired_depend)) { - BABYLON_LOG(WARNING) - << "dependency " << _source << " to " << *_target - << " can not be mutable for other already depend it"; - return -1; - } - _target->trigger(activating_data); - // condition未就绪,激活condition - } else if (!_condition->ready()) { - _condition->trigger(activating_data); - // condition成立,激活target - } else if (check_established()) { - auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() - : _target->acquire_mutable_depend(); - if (ABSL_PREDICT_FALSE(!acquired_depend)) { - BABYLON_LOG(WARNING) - << "dependency " << *_source << " to " << *_target << " on " - << *_condition - << " can not be mutable for other already mutate it"; - return -1; - } - _target->trigger(activating_data); - } - // else condition不成立,但是waiting_num == 1 - // 说明condition正在【失败过程中】,即两次-1之间 - // 等待另一次-1到来即可,这里不做处理 - break; - } - // condition未就绪,激活condition - case 2: { - _condition->trigger(activating_data); - break; - } - default: { - break; - } - } - return 0; -} - -void GraphDependency::ready(GraphData* data, - VertexStack& runnable_vertexes) noexcept { - int64_t waiting_num = - _waiting_num.fetch_sub(1, ::std::memory_order_acq_rel) - 1; - // condition完成时检测条件是否成立 - if (data == _condition) { - // 成立时 - if (check_established()) { - // 如果waiting num是1,,则激活target - if (waiting_num == 1) { - auto acquired_depend = !_mutable ? _target->acquire_immutable_depend() - : _target->acquire_mutable_depend(); - if (ABSL_PREDICT_FALSE(!acquired_depend)) { - BABYLON_LOG(WARNING) - << "dependency " << _source << " to " << *_target - << " can not be mutable for other already depend it"; - _source->closure()->finish(-1); - return; - } - - int32_t rec_ret = - _target->recursive_activate(runnable_vertexes, _source->closure()); - if (0 != rec_ret) { - BABYLON_LOG(WARNING) - << "recursive_activate from " << *_target << " failed"; - _source->closure()->finish(rec_ret); - return; - } - } - // 不成立,waiting num再减1,由于target可以从别的渠道完成,有击穿的可能 - // 通过边沿触发和激活时[-1, 0]双终态解决 - } else if (waiting_num != 0) { - waiting_num = _waiting_num.fetch_sub(1, ::std::memory_order_acq_rel) - 1; - } - } - // 无condition的target就绪 - // 或者condition满足时target已经就绪 - // 或者condition不满足时 - // 当condition和target的就绪以及激活操作并发时 - // 就绪的终态[0]和激活的终态[-1, 0]确保不重不漏 - if (waiting_num == 0 && nullptr != _source) { - if (data == _target) { - _ready = check_established(); - } else { - _ready = established() && _target->ready(); - } - if (_source->ready(this)) { - runnable_vertexes.emplace_back(_source); - } - } -} -int GraphDependency::activated_vertex_name( - ::std::vector& vertex_names) const noexcept { - int err = 0; - if (_ready) { - auto* producers = _target->producers(); - if (producers != nullptr && producers->empty() == false) { - for (auto& producer : *producers) { - vertex_names.emplace_back(producer->name()); - } - } else { - err = 1; - } - } else { - err = -1; - } - return err; -} - -int GraphDependency::activated_vertex_name( - std::string& vertex_name) const noexcept { - int err = 0; - if (_ready) { - auto* producers = _target->producers(); - if (producers != nullptr && producers->empty() == false) { - vertex_name = producers->at(0)->name(); - } else { - err = 1; - } - } else { - err = -1; - } - return err; -} - template inline InputChannel GraphDependency::channel() noexcept { return InputChannel {*this}; diff --git a/src/babylon/anyflow/executor.cpp b/src/babylon/anyflow/executor.cpp index cbdb7dc..b560b33 100644 --- a/src/babylon/anyflow/executor.cpp +++ b/src/babylon/anyflow/executor.cpp @@ -37,7 +37,9 @@ InplaceGraphExecutor& InplaceGraphExecutor::instance() noexcept { int ThreadPoolGraphExecutor::initialize(size_t worker_num, size_t queue_capacity) noexcept { - return _executor.initialize(worker_num, queue_capacity); + _executor.set_worker_number(worker_num); + _executor.set_global_capacity(queue_capacity); + return _executor.start(); } void ThreadPoolGraphExecutor::stop() noexcept { @@ -50,8 +52,8 @@ Closure ThreadPoolGraphExecutor::create_closure() noexcept { int ThreadPoolGraphExecutor::run(GraphVertex* vertex, GraphVertexClosure&& closure) noexcept { - _executor.submit([closure = ::std::move(closure), vertex]() mutable { - vertex->run(::std::move(closure)); + _executor.submit([captured_closure = ::std::move(closure), vertex]() mutable { + vertex->run(::std::move(captured_closure)); }); return 0; } diff --git a/src/babylon/concurrent/epoch.h b/src/babylon/concurrent/epoch.h index 53fbbaf..a8e3d0a 100644 --- a/src/babylon/concurrent/epoch.h +++ b/src/babylon/concurrent/epoch.h @@ -61,7 +61,7 @@ class Epoch { Slot& operator=(const Slot&) = delete; ~Slot() noexcept = default; - ::std::atomic version = ATOMIC_VAR_INIT(UINT64_MAX); + ::std::atomic version {UINT64_MAX}; size_t lock_times {0}; }; @@ -72,7 +72,7 @@ class Epoch { IdAllocator _id_allocator; ConcurrentVector _slots; - ::std::atomic _version = ATOMIC_VAR_INIT(0); + ::std::atomic _version {0}; friend Accessor; }; diff --git a/src/babylon/concurrent/execution_queue.h b/src/babylon/concurrent/execution_queue.h index 232f89c..124aba3 100644 --- a/src/babylon/concurrent/execution_queue.h +++ b/src/babylon/concurrent/execution_queue.h @@ -68,7 +68,7 @@ class ConcurrentExecutionQueue { void consume_until_empty() noexcept; Queue _queue; - ::std::atomic _events = ATOMIC_VAR_INIT(0); + ::std::atomic _events {0}; Executor* _executor {nullptr}; ConsumeFunction _consume_function; }; diff --git a/src/babylon/concurrent/garbage_collector.h b/src/babylon/concurrent/garbage_collector.h index caa6143..dd480b3 100644 --- a/src/babylon/concurrent/garbage_collector.h +++ b/src/babylon/concurrent/garbage_collector.h @@ -66,6 +66,9 @@ class GarbageCollector { ReclaimTask& operator=(const ReclaimTask&) = delete; ~ReclaimTask() noexcept = default; + ReclaimTask(R&& reclaimer, uint64_t lowest_epoch) noexcept + : reclaimer {::std::move(reclaimer)}, lowest_epoch {lowest_epoch} {} + R reclaimer; uint64_t lowest_epoch {UINT64_MAX}; }; @@ -117,8 +120,8 @@ ABSL_ATTRIBUTE_ALWAYS_INLINE inline void GarbageCollector::retire( template ABSL_ATTRIBUTE_ALWAYS_INLINE inline void GarbageCollector::retire( R&& reclaimer, uint64_t lowest_epoch) noexcept { - _queue.template push(ReclaimTask { - .reclaimer = ::std::move(reclaimer), .lowest_epoch = lowest_epoch}); + _queue.template push( + ReclaimTask {::std::forward(reclaimer), lowest_epoch}); } template diff --git a/src/babylon/coroutine.cpp b/src/babylon/coroutine.cpp new file mode 100644 index 0000000..61da6ee --- /dev/null +++ b/src/babylon/coroutine.cpp @@ -0,0 +1,21 @@ +#include "babylon/coroutine.h" + +#include "babylon/executor.h" + +#if __cpp_concepts && __cpp_lib_coroutine + +BABYLON_NAMESPACE_BEGIN + +// We cannot call function of executor in .h because it incomplete there. So +// functions need interact with executor are placed in .cpp here. +void BasicCoroutinePromise::resume_awaiter() noexcept { + _awaiter_executor->resume(_awaiter); +} + +void BasicCoroutinePromise::resume(::std::coroutine_handle<> handle) noexcept { + _executor->resume(handle); +} + +BABYLON_NAMESPACE_END + +#endif // __cpp_concepts && __cpp_lib_coroutine diff --git a/src/babylon/coroutine.h b/src/babylon/coroutine.h new file mode 100644 index 0000000..b475290 --- /dev/null +++ b/src/babylon/coroutine.h @@ -0,0 +1,627 @@ +#pragma once + +#include "babylon/current_executor.h" +#include "babylon/future.h" +#include "babylon/logging/logger.h" + +#include "absl/types/optional.h" + +#if __cpp_concepts && __cpp_lib_coroutine + +#include + +BABYLON_NAMESPACE_BEGIN + +class Executor; +template +class CoroutineTask; +template +class FutureAwaitable; +template +class SharedFutureAwaitable; + +//////////////////////////////////////////////////////////////////////////////// +// Get awaitable type +// P stand for promise type of awaiter coroutine +// A stand for raw awaitee object +// +// follow the first step in co_await: convert expression to awaitable +template +struct CoroutineAwaitableFrom { + using type = A; +}; +template + requires requires { + ::std::declval

().await_transform(::std::declval()); + } +struct CoroutineAwaitableFrom { + using type = + decltype(::std::declval

().await_transform(::std::declval())); +}; +template +using CoroutineAwaitableType = typename CoroutineAwaitableFrom::type; +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// Get awaiter type +// P stand for promise type of awaiter coroutine +// A stand for raw awaitee object +// +// follow the second step in co_await: convert awaitable to awaiter +template +struct CoroutineAwaiterFrom { + using type = CoroutineAwaitableType; +}; +template + requires requires { + ::std::declval>().operator co_await(); + } +struct CoroutineAwaiterFrom { + using type = decltype(::std::declval>(). + operator co_await()); +}; +template + requires requires { + operator co_await(::std::declval>()); + } +struct CoroutineAwaiterFrom { + using type = decltype(operator co_await( + ::std::declval>())); +}; +template +using CoroutineAwaiterType = typename CoroutineAwaiterFrom::type; +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// Get result type of a co_await expression +// P stand for promise type of awaiter coroutine +// A stand for raw awaitee object +// +// function as if: decltype(co_await expression) +// +// Given: +// SomeTaskType awaiter_coroutine() { +// auto x = co_await awaitee_object; +// } +// +// Then decltype(x) is: +// CoroutineAwaitableType +template +using CoroutineAwaitResultType = + decltype(::std::declval>().await_resume()); +//////////////////////////////////////////////////////////////////////////////// + +// Check if a callable invocation C(Args...) is a coroutine +template +concept CoroutineInvocable = + requires { + typename ::std::coroutine_traits<::std::invoke_result_t, + Args...>::promise_type; + }; + +// Check if a callable invocation C(Args...) is a babylon coroutine, thus, +// return CoroutineTask +template +concept CoroutineTaskInvocable = + CoroutineInvocable && + IsSpecialization<::std::invoke_result_t, CoroutineTask>::value; + +// Common part of CoroutineTask::promise_type +class BasicCoroutinePromise { + public: + class FinalAwaitable; + + ////////////////////////////////////////////////////////////////////////////// + // Protocol to be a coroutine promise type, except result type-related + // functions. + template + inline CoroutineTask get_return_object() noexcept; + inline constexpr ::std::suspend_always initial_suspend() const noexcept; + inline FinalAwaitable final_suspend() noexcept; + inline void unhandled_exception() noexcept; + // void return_void() noexcept; + // void return_value(T) noexcept; + ////////////////////////////////////////////////////////////////////////////// + + // Bind to specific executor. Both role of co_await, awaiter or awaitee, will + // be send back to this executor in resumption if they are not there already. + inline void set_executor(Executor& executor) noexcept; + inline Executor* executor() const noexcept; + + ////////////////////////////////////////////////////////////////////////////// + // Register an awaiter, coroutine which issue a co_await to this coroutine, + // along with it's binding executor if it's a babylon coroutine. + // + // Awaiter registered will be resumed after this coroutine finished. The + // resumption will move awaiter coroutine back to it's binding executor. If we + // already there or the awaiter has no binding executor, the resumption will + // happen in-place. + inline void set_awaiter(::std::coroutine_handle<> awaiter, + Executor* awaiter_executor) noexcept; + inline bool awaiter_inplace_resumable() const noexcept; + inline ::std::coroutine_handle<> awaiter() const noexcept; + void resume_awaiter() noexcept; + ////////////////////////////////////////////////////////////////////////////// + + // After awaiter suspend and finish registration, this newly created coroutine + // will send back to it's binding executor and resume. If we already there or + // has no binding executor, the resumption will happen in-place. + inline bool inplace_resumable() const noexcept; + void resume(::std::coroutine_handle<> handle) noexcept; + + // Propagate executor to awaitee automatically if not specified + template + requires(IsSpecialization::type, + CoroutineTask>::value) + inline T&& await_transform(T&& task) noexcept; + + // Wrap future to a awaitable to support co_await future. Reference type of + // future will propagate to result type of co_await expression, which means: + // + // // r1 and r2 will both copy from value that keep inside future + // T r1 = co_await future; + // T r2 = co_await future; + // + // // r1 will moved out from value that keep inside future + // // r2 will get an empty value + // T r1 = co_await ::std::move(future); + // T r2 = co_await ::std::move(future); + template + inline FutureAwaitable await_transform(Future&& future) noexcept; + template + inline SharedFutureAwaitable await_transform( + const Future& future) noexcept; + + ////////////////////////////////////////////////////////////////////////////// + // When co_await other awaitable, it will be their responsibility to resume + // current coroutine in a callback manner. As they may also have some async + // schedule mechanism, the resumption can happen outside the binding executor. + // + // To get back, a proxy task is inserted to construct a co_await chain like: + // this co_await proxy co_await awaitable + // + // After awaitable finish they directly resume the empty proxy task. The proxy + // task will properly check and send origin awaiter back to it's executor. + private: + template + requires requires { typename CoroutineAwaitResultType; } + struct WrapperTask; + template + using WrapperTaskType = typename WrapperTask::type; + template + class ReferenceWrapper; + + public: + template + requires(!IsSpecialization::type, + CoroutineTask>::value) + inline WrapperTaskType await_transform(A&& awaitable) noexcept; + template + inline A await_transform(ReferenceWrapper awaitable) noexcept; + ////////////////////////////////////////////////////////////////////////////// + + private: + Executor* _executor {nullptr}; + ::std::coroutine_handle<> _awaiter; + Executor* _awaiter_executor {nullptr}; +}; + +// Switch to awaiter if there is one. +class BasicCoroutinePromise::FinalAwaitable { + public: + inline FinalAwaitable(BasicCoroutinePromise* promise) noexcept; + + inline constexpr bool await_ready() const noexcept; + inline ::std::coroutine_handle<> await_suspend( + ::std::coroutine_handle<>) const noexcept; + inline void await_resume() const noexcept; + + private: + BasicCoroutinePromise* _promise {nullptr}; +}; + +// template +// struct BasicCoroutinePromise::WrapperTask { +// }; +// Awaitable may return rvalue reference. Remove that reference to make it fit +// T in a CoroutineTask. +template + requires requires { typename CoroutineAwaitResultType; } +struct BasicCoroutinePromise::WrapperTask { + using ResultType = CoroutineAwaitResultType; + using ForwardType = typename ::std::conditional< + ::std::is_rvalue_reference::value, + typename ::std::remove_reference::type, ResultType>::type; + using type = CoroutineTask; +}; + +// When building a co_await chain like `this co_await proxy co_await awaitable`, +// we need to know whether the current awaiter is already proxy, to avoid +// chaining infinitely. Use this wrapper to identify this. +template +class BasicCoroutinePromise::ReferenceWrapper { + public: + inline ReferenceWrapper(A awaitable) noexcept; + inline operator A() noexcept; + + private: + A _awaitable; +}; + +// CoroutineTask::promise_type for non-void T +template +class CoroutinePromise : public BasicCoroutinePromise { + private: + using RemoveReferenceType = typename ::std::remove_reference::type; + using PromiseValueType = + typename ::std::conditional<::std::is_lvalue_reference::value, + ::std::reference_wrapper, + RemoveReferenceType>::type; + + public: + inline CoroutineTask get_return_object() noexcept; + template + inline void return_value(U&& value) noexcept; + inline void return_value(T& value) noexcept; + inline T& value() noexcept; + + private: + ::absl::optional _value; +}; + +// CoroutineTask::promise_type for void T +template <> +class CoroutinePromise : public BasicCoroutinePromise { + public: + inline CoroutineTask<> get_return_object() noexcept; + inline static constexpr void return_void() noexcept; + inline static constexpr void value() noexcept; +}; + +template +class CoroutineTask { + public: + using promise_type = CoroutinePromise; + + inline CoroutineTask() noexcept = default; + inline CoroutineTask(CoroutineTask&& other) noexcept; + CoroutineTask(const CoroutineTask&) = delete; + inline CoroutineTask& operator=(CoroutineTask&& other) noexcept; + CoroutineTask& operator=(const CoroutineTask&) = delete; + inline ~CoroutineTask() noexcept; + + inline CoroutineTask& set_executor(Executor& executor) noexcept; + inline Executor* executor() const noexcept; + + ////////////////////////////////////////////////////////////////////////////// + // Protocol to be an awaitable + inline static constexpr bool await_ready() noexcept; + template + inline ::std::coroutine_handle<> await_suspend( + std::coroutine_handle> awaiter) noexcept; + inline ::std::coroutine_handle<> await_suspend( + std::coroutine_handle<> awaiter) noexcept; + inline T await_resume() noexcept; + ////////////////////////////////////////////////////////////////////////////// + + private: + inline CoroutineTask(::std::coroutine_handle handle) noexcept; + + inline ::std::coroutine_handle<> release() noexcept; + + inline ::std::coroutine_handle<> await_suspend( + std::coroutine_handle<> awaiter, Executor* awaiter_executor) noexcept; + + ::std::coroutine_handle _handle; + + friend BasicCoroutinePromise; + friend Executor; +}; + +template +class BasicFutureAwaitable { + public: + inline BasicFutureAwaitable(Future&& future) noexcept + : _future {::std::move(future)} {} + inline BasicFutureAwaitable(const Future& future) noexcept + : _future {future} {} + + inline bool await_ready() const noexcept { + return _future.ready(); + } + + template + inline void await_suspend( + ::std::coroutine_handle> handle) noexcept { + _future.on_finish([handle] { + handle.promise().resume(handle); + }); + } + + inline void await_suspend(::std::coroutine_handle<> handle) noexcept { + _future.on_finish([handle] { + handle.resume(); + }); + } + + inline T& await_resume() noexcept { + return _future.get(); + } + + private: + Future _future; +}; + +template +class FutureAwaitable : public BasicFutureAwaitable { + private: + using Base = BasicFutureAwaitable; + + public: + using Base::Base; + inline T&& await_resume() noexcept { + return ::std::move(Base::await_resume()); + } +}; + +template +class SharedFutureAwaitable : public BasicFutureAwaitable { + public: + using BasicFutureAwaitable::BasicFutureAwaitable; +}; + +//////////////////////////////////////////////////////////////////////////////// +// BasicCoroutinePromise::FinalAwaitable begin +inline BasicCoroutinePromise::FinalAwaitable::FinalAwaitable( + BasicCoroutinePromise* promise) noexcept + : _promise {promise} {} + +inline constexpr bool BasicCoroutinePromise::FinalAwaitable::await_ready() + const noexcept { + return false; +} + +inline ::std::coroutine_handle<> +BasicCoroutinePromise::FinalAwaitable::await_suspend( + ::std::coroutine_handle<> handle) const noexcept { + auto awaiter = _promise->awaiter(); + if (awaiter) { + if (_promise->awaiter_inplace_resumable()) { + return awaiter; + } + _promise->resume_awaiter(); + } else { + handle.destroy(); + } + return ::std::noop_coroutine(); +} + +inline void BasicCoroutinePromise::FinalAwaitable::await_resume() + const noexcept {} +// BasicCoroutinePromise::FinalAwaitable end +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// BasicCoroutinePromise::ReferenceWrapper begin +template +inline BasicCoroutinePromise::ReferenceWrapper::ReferenceWrapper( + A awaitable) noexcept + : _awaitable {::std::forward(awaitable)} {} + +template +inline BasicCoroutinePromise::ReferenceWrapper::operator A() noexcept { + return ::std::forward(_awaitable); +} +// BasicCoroutinePromise::ReferenceWrapper end +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// BasicCoroutinePromise begin +template +inline CoroutineTask BasicCoroutinePromise::get_return_object() noexcept { + return {::std::coroutine_handle>::from_promise( + static_cast&>(*this))}; +} + +inline constexpr ::std::suspend_always BasicCoroutinePromise::initial_suspend() + const noexcept { + return {}; +} + +inline BasicCoroutinePromise::FinalAwaitable +BasicCoroutinePromise::final_suspend() noexcept { + return {this}; +} + +ABSL_ATTRIBUTE_NORETURN +inline void BasicCoroutinePromise::unhandled_exception() noexcept { + abort(); +} + +inline void BasicCoroutinePromise::set_executor(Executor& executor) noexcept { + _executor = &executor; +} + +inline Executor* BasicCoroutinePromise::executor() const noexcept { + return _executor; +} + +inline void BasicCoroutinePromise::set_awaiter( + ::std::coroutine_handle<> awaiter, Executor* awaiter_executor) noexcept { + _awaiter = awaiter; + _awaiter_executor = awaiter_executor; +} + +inline bool BasicCoroutinePromise::awaiter_inplace_resumable() const noexcept { + return _awaiter_executor == nullptr || + _awaiter_executor == CurrentExecutor::get(); +} + +inline ::std::coroutine_handle<> BasicCoroutinePromise::awaiter() + const noexcept { + return _awaiter; +} + +inline bool BasicCoroutinePromise::inplace_resumable() const noexcept { + return _executor == nullptr || _executor == CurrentExecutor::get(); +} + +template + requires(IsSpecialization::type, + CoroutineTask>::value) +inline T&& BasicCoroutinePromise::await_transform(T&& task) noexcept { + if (!task.executor()) { + task.set_executor(*_executor); + } + return ::std::forward(task); +} + +template +inline FutureAwaitable BasicCoroutinePromise::await_transform( + Future&& future) noexcept { + return ::std::move(future); +} + +template +inline SharedFutureAwaitable BasicCoroutinePromise::await_transform( + const Future& future) noexcept { + return future; +} + +template + requires(!IsSpecialization::type, + CoroutineTask>::value) +inline BasicCoroutinePromise::WrapperTaskType< + A&&> BasicCoroutinePromise::await_transform(A&& awaitable) noexcept { + return [](A&& inner_awaitable) -> WrapperTaskType { + co_return co_await ReferenceWrapper {::std::forward(inner_awaitable)}; + }(::std::forward(awaitable)); +} + +template +inline A BasicCoroutinePromise::await_transform( + ReferenceWrapper awaitable) noexcept { + return awaitable; +} +// BasicCoroutinePromise end +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// CoroutinePromise begin +template +inline CoroutineTask CoroutinePromise::get_return_object() noexcept { + return BasicCoroutinePromise::get_return_object(); +} + +inline CoroutineTask<> CoroutinePromise::get_return_object() noexcept { + return BasicCoroutinePromise::get_return_object(); +} + +template +template +inline void CoroutinePromise::return_value(U&& value) noexcept { + _value.emplace(::std::forward(value)); +} + +template +inline void CoroutinePromise::return_value(T& value) noexcept { + _value.emplace(value); +} + +inline constexpr void CoroutinePromise::return_void() noexcept {} + +template +inline T& CoroutinePromise::value() noexcept { + return *_value; +} + +inline constexpr void CoroutinePromise::value() noexcept {} +// BasicCoroutinePromise end +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// CoroutineTask begin +template +inline CoroutineTask::CoroutineTask(CoroutineTask&& other) noexcept { + ::std::swap(_handle, other._handle); +} + +template +inline CoroutineTask& CoroutineTask::operator=( + CoroutineTask&& other) noexcept { + ::std::swap(_handle, other._handle); + return *this; +} + +template +inline CoroutineTask::~CoroutineTask() noexcept { + if (_handle) { + _handle.destroy(); + } +} + +template +inline CoroutineTask& CoroutineTask::set_executor( + Executor& executor) noexcept { + _handle.promise().set_executor(executor); + return *this; +} + +template +inline Executor* CoroutineTask::executor() const noexcept { + return _handle.promise().executor(); +} + +template +inline constexpr bool CoroutineTask::await_ready() noexcept { + return false; +} + +template +template +inline ::std::coroutine_handle<> CoroutineTask::await_suspend( + ::std::coroutine_handle> awaiter) noexcept { + return await_suspend(awaiter, awaiter.promise().executor()); +} + +template +inline ::std::coroutine_handle<> CoroutineTask::await_suspend( + ::std::coroutine_handle<> awaiter) noexcept { + return await_suspend(awaiter, nullptr); +} + +template +inline T CoroutineTask::await_resume() noexcept { + return _handle.promise().value(); +} + +template <> +inline void CoroutineTask::await_resume() noexcept {} + +template +inline CoroutineTask::CoroutineTask( + ::std::coroutine_handle handle) noexcept + : _handle(handle) {} + +template +inline ::std::coroutine_handle<> CoroutineTask::release() noexcept { + return ::std::exchange(_handle, nullptr); +} + +template +inline ::std::coroutine_handle<> CoroutineTask::await_suspend( + ::std::coroutine_handle<> awaiter, Executor* awaiter_executor) noexcept { + auto& promise = _handle.promise(); + promise.set_awaiter(awaiter, awaiter_executor); + if (promise.inplace_resumable()) { + return _handle; + } + promise.resume(_handle); + return ::std::noop_coroutine(); +} +// CoroutineTask end +//////////////////////////////////////////////////////////////////////////////// + +BABYLON_NAMESPACE_END + +#endif // __cpp_concepts && __cpp_lib_coroutine diff --git a/src/babylon/current_executor.h b/src/babylon/current_executor.h new file mode 100644 index 0000000..0a1b279 --- /dev/null +++ b/src/babylon/current_executor.h @@ -0,0 +1,27 @@ +#pragma once + +#include "babylon/environment.h" + +BABYLON_NAMESPACE_BEGIN + +class Executor; +class CurrentExecutor { + public: + inline static Executor* get() noexcept { + return storage(); + } + + private: + inline static void set(Executor* executor) noexcept { + storage() = executor; + } + + inline static Executor*& storage() noexcept { + static thread_local Executor* executor {nullptr}; + return executor; + } + + friend Executor; +}; + +BABYLON_NAMESPACE_END diff --git a/src/babylon/environment.h b/src/babylon/environment.h index 00f7e86..fe04a3b 100644 --- a/src/babylon/environment.h +++ b/src/babylon/environment.h @@ -105,7 +105,6 @@ // 通过『任意』引入一个c++头文件来引入一些基础环境信息,例如 // - GLIBCXX/LIBCPP标准库版本信息 // - _GLIBCXX_USE_CXX11_ABI开关信息 -// - __cpp_系列特性判断信息 // 选择cstddef是因为其本体只包含少量宏定义,相对比较轻量 #include @@ -149,8 +148,8 @@ #endif #if BABYLON_HAS_INCLUDE() -#include -#endif // BABYLON_HAS_INCLUDE() +#include // __cpp_xxxx +#endif // BABYLON_HAS_INCLUDE() // clang-format off // 在bazel环境启用treat_warnings_as_errors diff --git a/src/babylon/executor.cpp b/src/babylon/executor.cpp index d80e182..9982682 100644 --- a/src/babylon/executor.cpp +++ b/src/babylon/executor.cpp @@ -1,5 +1,9 @@ #include "babylon/executor.h" +// clang-foramt off +#include "babylon/protect.h" +// clang-foramt on + BABYLON_NAMESPACE_BEGIN //////////////////////////////////////////////////////////////////////////////// @@ -9,6 +13,19 @@ Executor::~Executor() noexcept {} int Executor::invoke(MoveOnlyFunction&&) noexcept { return -1; } + +#if __cpp_lib_coroutine +int Executor::resume(CoroutineHandle&& handle) noexcept { + return invoke([handle]() { + handle.resume(); + }); +} +#else // !__cpp_lib_coroutine +int Executor::resume(CoroutineHandle&&) noexcept { + // Fail if babylon itself is not compiled with -fcoroutines + return -1; +} +#endif // !__cpp_lib_coroutine // Executor end //////////////////////////////////////////////////////////////////////////////// @@ -24,30 +41,8 @@ InplaceExecutor& InplaceExecutor::instance() noexcept { return executor; } -InplaceExecutor::InplaceExecutor(bool flatten) noexcept : _flatten(flatten) {} - int InplaceExecutor::invoke(MoveOnlyFunction&& function) noexcept { - if (!_flatten) { - function(); - return 0; - } - - if (_in_execution) { - _pending_functions.emplace_back(::std::move(function)); - return 0; - } - - MoveOnlyFunction next_function = ::std::move(function); - while (true) { - _in_execution = true; - next_function(); - _in_execution = false; - if (_pending_functions.empty()) { - break; - } - next_function = ::std::move(_pending_functions.back()); - _pending_functions.pop_back(); - } + function(); return 0; } // InplaceExecutor end @@ -65,14 +60,19 @@ AlwaysUseNewThreadExecutor& AlwaysUseNewThreadExecutor::instance() noexcept { return executor; } +AlwaysUseNewThreadExecutor::~AlwaysUseNewThreadExecutor() noexcept { + while (_running.load(::std::memory_order_acquire) > 0) { + ::usleep(1000); + } +} + int AlwaysUseNewThreadExecutor::invoke( MoveOnlyFunction&& function) noexcept { - ::std::thread(::std::bind( - [](MoveOnlyFunction& captured_function) { - captured_function(); - }, - ::std::move(function))) - .detach(); + _running.fetch_add(1, ::std::memory_order_acq_rel); + ::std::thread([this, captured_function = ::std::move(function)] { + captured_function(); + _running.fetch_sub(1, ::std::memory_order_acq_rel); + }).detach(); return 0; } // AlwaysUseNewThreadExecutor end @@ -84,22 +84,60 @@ ThreadPoolExecutor::~ThreadPoolExecutor() noexcept { stop(); } -int ThreadPoolExecutor::initialize(size_t worker_num, - size_t queue_capacity) noexcept { - if (!_threads.empty()) { +void ThreadPoolExecutor::set_worker_number(size_t worker_number) noexcept { + _worker_number = worker_number; +} + +void ThreadPoolExecutor::set_local_capacity(size_t local_capacity) noexcept { + _local_capacity = local_capacity; +} + +void ThreadPoolExecutor::set_global_capacity(size_t global_capacity) noexcept { + _global_capacity = global_capacity; +} + +void ThreadPoolExecutor::set_enable_work_stealing( + bool enable_work_stealing) noexcept { + _enable_work_stealing = enable_work_stealing; +} + +int ThreadPoolExecutor::start() noexcept { + if (_running.load(::std::memory_order_acquire)) { return -1; } - _queue.reserve_and_clear(queue_capacity); - _threads.reserve(worker_num); - for (size_t i = 0; i < worker_num; ++i) { + _running.store(true, ::std::memory_order_release); + _global_task_queue.reserve_and_clear(_global_capacity * 2); + _local_task_queues.set_constructor( + [this](ConcurrentBoundedQueue* queue) { + new (queue) ConcurrentBoundedQueue; + queue->reserve_and_clear(_local_capacity * 2); + }); + _threads.reserve(_worker_number); + for (size_t i = 0; i < _worker_number; ++i) { _threads.emplace_back(&ThreadPoolExecutor::keep_execute, this); } + if (_balance_interval.count() >= 0) { + _balance_thread = ::std::thread(&ThreadPoolExecutor::keep_balance, this); + } return 0; } +void ThreadPoolExecutor::wakeup_one_worker() noexcept { + _global_task_queue.push( + Task {.type = TaskType::WAKEUP, .handle {}, .function {}}); +} + void ThreadPoolExecutor::stop() noexcept { + if (!_running.load(::std::memory_order_acquire)) { + return; + } + _running.store(false, ::std::memory_order_release); + if (_balance_thread.joinable()) { + _balance_thread.join(); + } for (size_t i = 0; i < _threads.size(); ++i) { - _queue.push(MoveOnlyFunction {}); + _global_task_queue.push( + Task {.type = TaskType::STOP, .handle {}, .function {}}); } for (auto& thread : _threads) { thread.join(); @@ -107,22 +145,109 @@ void ThreadPoolExecutor::stop() noexcept { _threads.clear(); } +int ThreadPoolExecutor::initialize(size_t worker_number, + size_t global_capacity) noexcept { + if (!_threads.empty()) { + return -1; + } + _worker_number = worker_number; + _global_capacity = global_capacity; + return start(); +} + int ThreadPoolExecutor::invoke( MoveOnlyFunction&& function) noexcept { - _queue.push(::std::move(function)); - return 0; + return enqueue_task({.type = TaskType::FUNCTION, + .handle {}, + .function {::std::move(function)}}); +} + +#if __cpp_lib_coroutine +int ThreadPoolExecutor::resume(CoroutineHandle&& handle) noexcept { + return enqueue_task( + {.type = TaskType::COROUTINE, .handle {handle}, .function {}}); +} +#else // !__cpp_lib_coroutine +int ThreadPoolExecutor::resume(CoroutineHandle&&) noexcept { + // Fail if babylon itself is not compiled with -fcoroutines + return -1; } +#endif // !__cpp_lib_coroutine void ThreadPoolExecutor::keep_execute() noexcept { + auto& local_queue = _local_task_queues.local(); while (true) { - MoveOnlyFunction function; - _queue.pop(function); - if (function) { - function(); - } else { - break; + Task task; + if (!local_queue.try_pop(task)) { + bool steal_success = false; + if (_enable_work_stealing) { + _local_task_queues.for_each([&](TaskQueue* iter, TaskQueue* end) { + if (steal_success) { + return; + } + while (iter != end) { + auto& queue = *iter++; + steal_success = queue.try_pop(task); + if (steal_success) { + return; + } + } + }); + } + if (!steal_success) { + _global_task_queue.pop(task); + } + } + switch (task.type) { +#if __cpp_lib_coroutine + case TaskType::COROUTINE: { + task.handle.resume(); + } break; +#endif // __cpp_lib_coroutine + case TaskType::FUNCTION: { + task.function(); + } break; + case TaskType::STOP: { + return; + } + case TaskType::WAKEUP: { + } break; + default: + assert(false); + } + } +} + +void ThreadPoolExecutor::keep_balance() noexcept { + while (_running.load(::std::memory_order_acquire)) { + ::std::this_thread::sleep_for(_balance_interval); + + _local_task_queues.for_each([&](TaskQueue* iter, TaskQueue* end) { + while (iter != end) { + auto& queue = *iter++; + bool success = true; + while (success) { + success = queue.try_pop([&](Task& task) { + enqueue_task(::std::move(task)); + }); + } + } + }); + } +} + +int ThreadPoolExecutor::enqueue_task(Task&& task) noexcept { + if (this == CurrentExecutor::get()) { + if (_local_capacity > 0) { + auto& local_queue = _local_task_queues.local(); + if (local_queue.size() < _local_capacity) { + local_queue.push(::std::move(task)); + return 0; + } } } + _global_task_queue.push(::std::move(task)); + return 0; } // ThreadPoolExecutor end //////////////////////////////////////////////////////////////////////////////// diff --git a/src/babylon/executor.h b/src/babylon/executor.h index 81ce2a5..a122347 100644 --- a/src/babylon/executor.h +++ b/src/babylon/executor.h @@ -1,118 +1,395 @@ #pragma once -#include "babylon/concurrent/bounded_queue.h" -#include "babylon/future.h" -#include "babylon/move_only_function.h" +#include "babylon/concurrent/bounded_queue.h" // ConcurrentBoundedQueue +#include "babylon/coroutine.h" // CoroutineTask +#include "babylon/future.h" // Future +#include "babylon/move_only_function.h" // MoveOnlyFunction -#include -#include +#if __cpp_lib_coroutine +#include // std::coroutine_handle +#endif + +#include // std::thread +#include // std::vector BABYLON_NAMESPACE_BEGIN -// 抽象执行器基类 +// Unified interface to run a task asynchronously. A task can be a closure of +// - normal function +// - member function +// - functor object, class or lambda +// - coroutine +// +// The closure packing is done in base Executor, and actual async mechanism is +// implemented by subclasses. class Executor { - public: - virtual ~Executor() noexcept = 0; + private: +#if __cpp_concepts && __cpp_lib_coroutine + // Coroutine only copy or move args... to internal state but ignore functor + // object itself, lambda with capture for example. Distinguish functor object + // out from others to properly handle them. + template + static constexpr bool IsPlainFunction = + ::std::is_function< + ::std::remove_pointer_t<::std::remove_reference_t>>::value || + ::std::is_member_function_pointer::value; +#endif // __cpp_concepts && __cpp_lib_coroutine - // 使用执行器执行一个任务 + public: + // The **effective** result type of C(Args...), generally + // std::invoke_result_t. + // + // Coroutine is a special case, for that, in specification C(Args...) need to + // return a task handle which more a future-like object than a meaningful + // value co_return-ed by the coroutine task. So instead of the handle, the + // **effective** result for a coroutine task is considered to be type of + // object as if `co_await C(Args...)` in a coroutine context. template - inline int submit(C&& callable, Args&&... args) noexcept; + struct Result; + template + using ResultType = typename Result::type; + +#if __cpp_concepts && __cpp_lib_coroutine + template + struct AwaitResult; + template + using AwaitResultType = typename AwaitResult::type; +#endif // __cpp_concepts && __cpp_lib_coroutine - // 使用执行器执行一个任务 - // 仿std::async接口,通过Future交互 + public: + // Thin wrapper of std::coroutine_handle<>. Implementor can resume it just + // like a std::coroutine_handle<>. The difference is CurrentExecutor will be + // set during execution. + class CoroutineHandle; + + Executor() noexcept = default; + Executor(Executor&&) noexcept = default; + Executor(const Executor&) noexcept = default; + Executor& operator=(Executor&&) noexcept = default; + Executor& operator=(const Executor&) noexcept = default; + virtual ~Executor() noexcept; + + ////////////////////////////////////////////////////////////////////////////// + // Execute a callable with executor, return a future object associate with + // that execution. + // + // When enable -fcoroutines and -fconcepts or use -std=c++20, execute a + // coroutine task is also supported. The returned future will associate with + // that coroutine, and can be used to wait and get the co_return value just + // like use co_await inside another coroutine. template - inline Future::type, F> execute( - C&& callable, Args&&... args) noexcept; +#if __cpp_concepts && __cpp_lib_coroutine + requires(::std::is_invocable::value && + !CoroutineInvocable) +#endif // __cpp_concepts && __cpp_lib_coroutine + inline Future, F> execute(C&& callable, + Args&&... args) noexcept; +#if __cpp_concepts && __cpp_lib_coroutine + template + requires CoroutineInvocable && Executor::IsPlainFunction + inline Future, F> execute(C&& callable, + Args&&... args) noexcept; + template + requires CoroutineInvocable && + (!Executor::IsPlainFunction) + inline Future, F> execute(C&& callable, + Args&&... args) noexcept; +#endif // __cpp_concepts && __cpp_lib_coroutine + ////////////////////////////////////////////////////////////////////////////// + +#if __cpp_concepts && __cpp_lib_coroutine + // Await a awaitable object, just like co_await it inside a coroutine context. + // Return a future object to wait and get that result. + template + inline Future, F> execute(A&& awaitable) noexcept; +#endif // __cpp_concepts && __cpp_lib_coroutine + + ////////////////////////////////////////////////////////////////////////////// + // Execute a callable with executor, return 0 if success scheduled. + // + // When enable -fcoroutines and -fconcepts or use -std=c++20, execute a + // coroutine task is also supported. + template +#if __cpp_concepts && __cpp_lib_coroutine + requires ::std::is_invocable::value && + (!CoroutineInvocable) +#endif // __cpp_concepts && __cpp_lib_coroutine + inline int submit(C&& callable, Args&&... args) noexcept; +#if __cpp_concepts && __cpp_lib_coroutine + template + requires CoroutineTaskInvocable && + Executor::IsPlainFunction + inline int submit(C&& callable, Args&&... args) noexcept; + template + requires CoroutineInvocable && + (!CoroutineTaskInvocable) && + Executor::IsPlainFunction + inline int submit(C&& callable, Args&&... args) noexcept; + template + requires CoroutineInvocable && + (!Executor::IsPlainFunction) + inline int submit(C&& callable, Args&&... args) noexcept; +#endif // __cpp_concepts && __cpp_lib_coroutine + ////////////////////////////////////////////////////////////////////////////// protected: - // 实际线程池实现者,只要在自己的【线程】中执行function即可 + // The actual async mechanism implementation. Every execute/submit interface + // above only do properly type-erased closure packing them self, and call this + // unified interface finally. + // + // A reasonable implementation will move function to destination thread and + // run it there. + // + // return ==0: Front-end transfer is success. The function will be called + // later always. + // !=0: Front-end transfer is fail. The function is not moved away, and + // never be called inside. virtual int invoke(MoveOnlyFunction&& function) noexcept; + // Execute/submit interface for coroutine will packing them to a + // coroutine_handle first, and then call this interface. Default + // implementation will forward to the invoke interface for convenience. A + // coroutine dedicated executor may override this to reduce forward + // overhead. + // + // return ==0: Front-end transfer is success. The coroutine will be resumed + // later always. + // !=0: Front-end transfer is fail. The coroutine state will keep + // as-is. + virtual int resume(CoroutineHandle&& handle) noexcept; + private: - // 成员函数使用std::mem_fn包装后才能和其他C等价处理 - template ::value, int>::type = 0> - inline static auto normalize(C&& callable) noexcept - -> decltype(::std::mem_fn(callable)); - - // 其他C不需要包装 - template ::value, int>::type = 0> - inline static C&& normalize(C&& callable) noexcept; - - // 执行function并对Promise进行set_value - template < - typename R, typename F, typename C, typename... Args, - typename ::std::enable_if::value, int>::type = 0> - inline static void run_and_set(Promise& promise, C&& callable, - Args&&... args) noexcept; - - // void特化 +#if __cpp_concepts && __cpp_lib_coroutine + template + inline int submit(CoroutineTask&& task) noexcept; + inline int resume(::std::coroutine_handle<> handle) noexcept; +#endif // __cpp_concepts && __cpp_lib_coroutine + + template + inline static void apply_and_set_value( + P& promise, C&& callable, ::std::tuple&& args_tuple) noexcept; template - inline static void run_and_set(Promise& promise, C&& callable, - Args&&... args) noexcept; + inline static void apply_and_set_value( + Promise& promise, C&& callable, + ::std::tuple&& args_tuple) noexcept; + +#if __cpp_concepts && __cpp_lib_coroutine + template + CoroutineTask<> await_and_set_value(P promise, A awaitable) noexcept; + template + CoroutineTask<> await_and_set_value(Promise promise, + A awaitable) noexcept; + + template + CoroutineTask<> await_apply_and_set_value(P promise, C callable, + T args_tuple) noexcept; + template + CoroutineTask<> await_apply_and_set_value(Promise promise, + C callable, T args_tuple) noexcept; +#endif // __cpp_concepts && __cpp_lib_coroutine + +#if __cpp_concepts && __cpp_lib_coroutine + friend BasicCoroutinePromise; +#endif // __cpp_concepts && __cpp_lib_coroutine +}; + +template +struct Executor::Result : public ::std::invoke_result {}; +#if __cpp_concepts && __cpp_lib_coroutine +template + requires CoroutineInvocable +struct Executor::Result { + using AwaitResultType = + CoroutineAwaitResultType::promise_type, + ::std::invoke_result_t>; + using type = typename ::std::conditional< + ::std::is_rvalue_reference::value, + typename ::std::remove_reference::type, + AwaitResultType>::type; }; -// 仿照std::async的函数版本,executor替代了内置的policy机制,可用户扩展 +template + requires requires { + typename CoroutineAwaitResultType::promise_type, + A>; + } +struct Executor::AwaitResult { + using AwaitReturnType = + CoroutineAwaitResultType::promise_type, A>; + using type = typename ::std::conditional< + ::std::is_rvalue_reference::value, + typename ::std::remove_reference::type, + AwaitReturnType>::type; +}; +#endif // __cpp_concepts && __cpp_lib_coroutine + +class Executor::CoroutineHandle { + public: + inline CoroutineHandle() noexcept = default; + inline CoroutineHandle(CoroutineHandle&&) noexcept = default; + inline CoroutineHandle(const CoroutineHandle&) noexcept = default; + inline CoroutineHandle& operator=(CoroutineHandle&&) noexcept = default; + inline CoroutineHandle& operator=(const CoroutineHandle&) noexcept = default; + inline ~CoroutineHandle() noexcept = default; + +#if __cpp_lib_coroutine + inline void resume() const noexcept; +#endif // __cpp_lib_coroutine + + private: +#if __cpp_lib_coroutine + inline CoroutineHandle(Executor* executor, + ::std::coroutine_handle<> handle) noexcept; +#endif // __cpp_lib_coroutine + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpragmas" +#pragma GCC diagnostic ignored "-Wunknown-warning-option" +#pragma GCC diagnostic ignored "-Wunused-private-field" + Executor* _executor {nullptr}; +#if __cpp_lib_coroutine + ::std::coroutine_handle<> _handle; + static_assert(sizeof(_handle) == sizeof(void*), "Check ABI consistentcy"); +#else // !__cpp_lib_coroutine + void* _handle {nullptr}; +#endif // !__cpp_lib_coroutine +#pragma GCC diagnostic pop + + friend Executor; +}; + +// Just like std::async, but replace policy to a more flexible executor. template -inline Future::type, F> async( - Executor& executor, C&& callable, Args&&... args) noexcept { +inline auto async(Executor& executor, C&& callable, Args&&... args) noexcept { return executor.execute(::std::forward(callable), ::std::forward(args)...); } -// 在当前线程原地运行的Executor -struct InplaceExecutor : public Executor { +// Sync executor run task just inside invocation of execute/submit. +class InplaceExecutor : public Executor { public: - // 获取全局单例 static InplaceExecutor& instance() noexcept; - InplaceExecutor() = default; - explicit InplaceExecutor(bool flatten) noexcept; - protected: virtual int invoke(MoveOnlyFunction&& function) noexcept override; private: - bool _flatten {false}; - bool _in_execution {false}; - ::std::vector> _pending_functions; + InplaceExecutor() noexcept = default; + InplaceExecutor(InplaceExecutor&&) = delete; + InplaceExecutor(const InplaceExecutor&) = delete; + InplaceExecutor& operator=(InplaceExecutor&&) = delete; + InplaceExecutor& operator=(const InplaceExecutor&) = delete; + virtual ~InplaceExecutor() noexcept override = default; }; -// 永远新起线程的Executor,对齐std::async的默认行为 +// Async executor launch new thread for every task execute/submit. struct AlwaysUseNewThreadExecutor : public Executor { public: - // 获取全局单例 static AlwaysUseNewThreadExecutor& instance() noexcept; protected: virtual int invoke(MoveOnlyFunction&& function) noexcept override; + + private: + AlwaysUseNewThreadExecutor() noexcept = default; + AlwaysUseNewThreadExecutor(AlwaysUseNewThreadExecutor&&) = delete; + AlwaysUseNewThreadExecutor(const AlwaysUseNewThreadExecutor&) = delete; + AlwaysUseNewThreadExecutor& operator=(AlwaysUseNewThreadExecutor&&) = delete; + AlwaysUseNewThreadExecutor& operator=(const AlwaysUseNewThreadExecutor&) = + delete; + virtual ~AlwaysUseNewThreadExecutor() noexcept override; + + ::std::atomic _running {0}; }; -// 转交给一组线程池运行的Executor +// Async executor use a thread pool as backend class ThreadPoolExecutor : public Executor { public: - // 析构自动停止运行 + // Use **this** in worker thread, so no copy nor move + ThreadPoolExecutor() noexcept = default; + ThreadPoolExecutor(ThreadPoolExecutor&&) noexcept = delete; + ThreadPoolExecutor(const ThreadPoolExecutor&) noexcept = delete; + ThreadPoolExecutor& operator=(ThreadPoolExecutor&&) noexcept = delete; + ThreadPoolExecutor& operator=(const ThreadPoolExecutor&) noexcept = delete; virtual ~ThreadPoolExecutor() noexcept override; - // 初始化 - // worker_num: 线程数 - // queue_capacity: 中转队列容量 - int initialize(size_t worker_num, size_t queue_capacity) noexcept; + // Parameters + // worker_number: Threads for running task. + // + // local_capacity: Task execute/submit inside another task will add to local + // first, if not exceed this capacity. + // + // global_capacity: Task execute/submit to + // global will wait in queue first. After waiting task exceed capacity, new + // task execute/submit will block. + // + // enable_work_stealing: When enable, worker + // thread will check other worker's local waiting task before trying wait and + // get task from global queue. + // + // balance_interval: When set to positive, a + // background thread will be used to **steal** all worker's local waiting task + // periodically. + void set_worker_number(size_t worker_number) noexcept; + void set_local_capacity(size_t local_capacity) noexcept; + void set_global_capacity(size_t global_capacity) noexcept; + void set_enable_work_stealing(bool enable_work_stealing) noexcept; + template + void set_balance_interval( + ::std::chrono::duration balance_interval) noexcept; + + int start() noexcept; + + // Even when work stealing is enabled, pending local task will not be consumed + // if the idle worker is block waiting for global task. If schedule latency is + // important, user can wakeup some worker actively to make the steal happen + // eagerly. + // + // Just like backup-requesting mechanism, work stealing and proactive wakeup + // is a trade-off of cpu usage for latency. So it is up to user to decide when + // and how many idle workers need to wakeup. + void wakeup_one_worker() noexcept; - // 停止运行 void stop() noexcept; + int ABSL_DEPRECATED("Use start instead") + initialize(size_t worker_num, size_t queue_capacity) noexcept; + protected: virtual int invoke(MoveOnlyFunction&& function) noexcept override; + virtual int resume(CoroutineHandle&& handle) noexcept override; private: + enum class TaskType { + COROUTINE, + FUNCTION, + WAKEUP, + STOP, + }; + + struct Task { + TaskType type; + CoroutineHandle handle; + MoveOnlyFunction function; + }; + + using TaskQueue = ConcurrentBoundedQueue; + void keep_execute() noexcept; + void keep_balance() noexcept; + int enqueue_task(Task&& task) noexcept; + + size_t _worker_number {1}; + size_t _local_capacity {0}; + size_t _global_capacity {1}; + bool _enable_work_stealing {false}; + ::std::chrono::microseconds _balance_interval {-1}; - ConcurrentBoundedQueue> _queue; + ::std::atomic _running {false}; + ::babylon::EnumerableThreadLocal _local_task_queues; + TaskQueue _global_task_queue; ::std::vector<::std::thread> _threads; + ::std::thread _balance_thread; }; BABYLON_NAMESPACE_END diff --git a/src/babylon/executor.hpp b/src/babylon/executor.hpp index 8e7d0ba..b9c34f1 100644 --- a/src/babylon/executor.hpp +++ b/src/babylon/executor.hpp @@ -1,29 +1,57 @@ #pragma once +#include "babylon/current_executor.h" // CurrentExecutor #include "babylon/executor.h" -#include +// clang-foramt off +#include "babylon/protect.h" +// clang-foramt on BABYLON_NAMESPACE_BEGIN //////////////////////////////////////////////////////////////////////////////// -// Executor::begin +// Executor::CoroutineHandle begin +#if __cpp_lib_coroutine +inline void Executor::CoroutineHandle::resume() const noexcept { + CurrentExecutor::set(_executor); + _handle.resume(); + CurrentExecutor::set(nullptr); +} + +inline Executor::CoroutineHandle::CoroutineHandle( + Executor* executor, ::std::coroutine_handle<> handle) noexcept + : _executor {executor}, _handle {handle} {} +#endif // __cpp_lib_coroutine +// Executor::CoroutineHandle end +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// Executor begin template -inline Future::type, F> Executor::execute( +#if __cpp_concepts && __cpp_lib_coroutine + requires(::std::is_invocable::value && + !CoroutineInvocable) +#endif // __cpp_concepts && __cpp_lib_coroutine +inline Future, F> Executor::execute( C&& callable, Args&&... args) noexcept { - using DC = typename ::std::decay::type; - using R = typename InvokeResult::type; - Promise promise; - auto future = promise.get_future(); - MoveOnlyFunction function {::std::bind( - [](Promise& captured_promise, DC& captured_callable, - typename ::std::decay::type&... captured_args) { - run_and_set(captured_promise, normalize(::std::move(captured_callable)), - ::std::move(captured_args)...); - }, - ::std::move(promise), - uncomposable_bind_argument(::std::forward(callable)), - uncomposable_bind_argument(::std::forward(args))...)}; + using R = ResultType; + struct S { + Executor* executor; + Promise promise; + typename ::std::decay::type callable; + ::std::tuple::type...> args_tuple; + void operator()() noexcept { + CurrentExecutor::set(executor); + apply_and_set_value(promise, ::std::move(callable), + ::std::move(args_tuple)); + CurrentExecutor::set(nullptr); + } + } s {.executor {this}, + .promise {}, + .callable {::std::forward(callable)}, + .args_tuple {::std::forward(args)...}}; + auto future = s.promise.get_future(); + MoveOnlyFunction function {::std::move(s)}; auto ret = invoke(::std::move(function)); if (ABSL_PREDICT_FALSE(ret != 0)) { future = Future(); @@ -31,49 +59,184 @@ inline Future::type, F> Executor::execute( return future; } +#if __cpp_concepts && __cpp_lib_coroutine +template + requires CoroutineInvocable && Executor::IsPlainFunction +inline Future, F> Executor::execute( + C&& callable, Args&&... args) noexcept { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpragmas" +#pragma GCC diagnostic ignored "-Wunknown-warning-option" +#if ABSL_HAVE_ADDRESS_SANITIZER +#pragma GCC diagnostic ignored "-Warray-bounds" +#endif // ABSL_HAVE_ADDRESS_SANITIZER + auto task = + ::std::invoke(::std::forward(callable), ::std::forward(args)...); +#pragma GCC diagnostic pop + return execute(::std::move(task)); +} + +template + requires CoroutineInvocable && (!Executor::IsPlainFunction) +inline Future, F> Executor::execute( + C&& callable, Args&&... args) noexcept { + using R = ResultType; + using InnerArgsTuple = typename CallableArgs::type; + Promise promise; + auto future = promise.get_future(); + submit(await_apply_and_set_value( + ::std::move(promise), ::std::forward(callable), + ::std::forward_as_tuple(::std::forward(args)...))); + return future; +} + +template +inline Future, F> Executor::execute( + A&& awaitable) noexcept { + using R = AwaitResultType; + Promise promise; + auto future = promise.get_future(); + submit( + await_and_set_value(::std::move(promise), ::std::forward(awaitable))); + return future; +} +#endif // __cpp_concepts && __cpp_lib_coroutine + template +#if __cpp_concepts && __cpp_lib_coroutine + requires ::std::is_invocable::value && + (!CoroutineInvocable) +#endif // __cpp_concepts && __cpp_lib_coroutine inline int Executor::submit(C&& callable, Args&&... args) noexcept { - typedef typename ::std::decay::type DC; - MoveOnlyFunction function {::std::bind( - [](DC& captured_callable, - typename ::std::decay::type&... captured_args) { - normalize(::std::move(captured_callable))( - ::std::move(captured_args)...); - }, - uncomposable_bind_argument(::std::forward(callable)), - uncomposable_bind_argument(::std::forward(args))...)}; + struct S { + Executor* executor; + typename ::std::decay::type callable; + ::std::tuple::type...> args_tuple; + void operator()() { + CurrentExecutor::set(executor); + ::std::apply(::std::move(callable), ::std::move(args_tuple)); + CurrentExecutor::set(nullptr); + } + }; + S s {.executor {this}, + .callable {::std::forward(callable)}, + .args_tuple {::std::forward(args)...}}; + MoveOnlyFunction function {::std::move(s)}; return invoke(::std::move(function)); } -template ::value, int>::type> -inline auto Executor::normalize(C&& callable) noexcept - -> decltype(::std::mem_fn(callable)) { - return ::std::mem_fn(callable); +#if __cpp_concepts && __cpp_lib_coroutine +template + requires CoroutineTaskInvocable && + Executor::IsPlainFunction +inline int Executor::submit(C&& callable, Args&&... args) noexcept { + auto task = + ::std::invoke(::std::forward(callable), ::std::forward(args)...); + return submit(::std::move(task)); +} + +template + requires CoroutineInvocable && + (!CoroutineTaskInvocable) && + Executor::IsPlainFunction +inline int Executor::submit(C&& callable, Args&&... args) noexcept { + using TaskType = ::std::invoke_result_t; + struct S { + static CoroutineTask<> wrapper(TaskType task) { + co_await task; + } + }; + auto task = + ::std::invoke(::std::forward(callable), ::std::forward(args)...); + return submit(S::wrapper(::std::move(task))); +} + +template + requires CoroutineInvocable && (!Executor::IsPlainFunction) +inline int Executor::submit(C&& callable, Args&&... args) noexcept { + using InnerArgsTuple = typename CallableArgs::type; + struct S { + static CoroutineTask<> wrapper(C callable, InnerArgsTuple args_tuple) { + co_await ::std::apply(callable, ::std::move(args_tuple)); + } + }; + auto wrapper_task = + S::wrapper(::std::forward(callable), + ::std::forward_as_tuple(::std::forward(args)...)); + return submit(::std::move(wrapper_task)); +} + +template +inline int Executor::submit(CoroutineTask&& task) noexcept { + task.set_executor(*this); + return resume(task.release()); +} + +inline int Executor::resume(::std::coroutine_handle<> handle) noexcept { + auto ret = resume({this, handle}); + if (ABSL_PREDICT_FALSE(ret != 0)) { + handle.destroy(); + } + return ret; } +#endif // __cpp_concepts && __cpp_lib_coroutine -template ::value, int>::type> -inline C&& Executor::normalize(C&& callable) noexcept { - return ::std::forward(callable); +template +inline void Executor::apply_and_set_value( + P& promise, C&& callable, ::std::tuple&& args_tuple) noexcept { + promise.set_value( + ::std::apply(::std::forward(callable), ::std::move(args_tuple))); } template -inline void Executor::run_and_set(Promise& promise, C&& callable, - Args&&... args) noexcept { - callable(::std::forward(args)...); +inline void Executor::apply_and_set_value( + Promise& promise, C&& callable, + ::std::tuple&& args_tuple) noexcept { + ::std::apply(::std::forward(callable), ::std::move(args_tuple)); promise.set_value(); } -template ::value, int>::type> -inline void Executor::run_and_set(Promise& promise, C&& callable, - Args&&... args) noexcept { - promise.set_value(callable(::std::forward(args)...)); +#if __cpp_concepts && __cpp_lib_coroutine +template +CoroutineTask<> Executor::await_and_set_value(P promise, A awaitable) noexcept { + promise.set_value(co_await ::std::move(awaitable)); } -// Executor::end + +template +CoroutineTask<> Executor::await_and_set_value(Promise promise, + A awaitable) noexcept { + co_await ::std::move(awaitable); + promise.set_value(); +} + +template +CoroutineTask<> Executor::await_apply_and_set_value(P promise, C callable, + T args_tuple) noexcept { + promise.set_value(co_await ::std::apply(::std::forward(callable), + ::std::move(args_tuple))); +} + +template +CoroutineTask<> Executor::await_apply_and_set_value(Promise promise, + C callable, + T args_tuple) noexcept { + co_await ::std::apply(::std::forward(callable), ::std::move(args_tuple)); + promise.set_value(); +} +#endif // __cpp_concepts && __cpp_lib_coroutine +// Executor end +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +// ThreadPoolExecutor begin +template +void ThreadPoolExecutor::set_balance_interval( + ::std::chrono::duration balance_interval) noexcept { + _balance_interval = balance_interval; +} +// ThreadPoolExecutor end //////////////////////////////////////////////////////////////////////////////// BABYLON_NAMESPACE_END + +#include "babylon/unprotect.h" diff --git a/src/babylon/future.h b/src/babylon/future.h index f2ee9df..564d84d 100644 --- a/src/babylon/future.h +++ b/src/babylon/future.h @@ -25,7 +25,7 @@ class Promise; template class Future { public: - typedef typename internal::future::NonVoid::type ResultType; + using ResultType = typename internal::future::NonVoid::type; template using IsCompatibleCallback = typename internal::future::IsCompatibleCallback; diff --git a/src/babylon/future.hpp b/src/babylon/future.hpp index 8ceaa55..6bb15e4 100644 --- a/src/babylon/future.hpp +++ b/src/babylon/future.hpp @@ -51,7 +51,7 @@ template struct ResultOfCallback { using U = typename NonVoid::type; using F = decltype(&run_callback); - using type = typename InvokeResult::type; + using type = ::std::invoke_result_t; }; // 可以支持使用run_callback(const C&, T&&)来执行的回调函数 @@ -68,8 +68,8 @@ struct IsCompatibleCallback { // 执行回调并设置到另一个Promise中,适配void类型 template ), C&, U&>::type>::value, + !::std::is_void<::std::invoke_result_t< + decltype(&run_callback), C&, U&>>::value, int>::type = 0> inline void run_callback(Promise& promise, C& callback, U& value) noexcept { @@ -77,8 +77,8 @@ inline void run_callback(Promise& promise, C& callback, } template ), C&, U&>::type>::value, + ::std::is_void<::std::invoke_result_t< + decltype(&run_callback), C&, U&>>::value, int>::type = 0> inline void run_callback(Promise& promise, C& callback, U& value) noexcept { @@ -93,12 +93,16 @@ inline void run_callback(Promise& promise, C& callback, template class FutureContext { public: - // void类型比较特殊,无法完成许多处理,定义为零大小类型方便统一书写 - typedef typename internal::future::NonVoid::type ValueType; + using ResultType = typename Future::ResultType; + using RemoveReferenceType = + typename ::std::remove_reference::type; + using ValueType = + typename ::std::conditional<::std::is_lvalue_reference::value, + ::std::reference_wrapper, + RemoveReferenceType>::type; + // Promise的构造和赋值是分离的,为了支持无法默认构造的类型 // 使用未初始化块替代类型本身作为成员,后续显式调用构造和析构 - typedef typename ::std::aligned_storage::type StorageType; // 回调链节点 struct CallbackNode { template @@ -149,13 +153,14 @@ class FutureContext { inline CallbackNode* seal() noexcept; inline ValueType& value() noexcept; + inline ValueType* pointer() noexcept; void wait_slow() noexcept; bool wait_for_slow(int64_t timeout_ns) noexcept; Futex _futex {0}; ::std::atomic _head {nullptr}; - StorageType _storage; + alignas(ValueType) uint8_t _storage[sizeof(ValueType)]; }; /////////////////////////////////////////////////////////////////////////////// @@ -192,7 +197,7 @@ template template inline void FutureContext::set_value(Args&&... args) { // 构造并设置value - new (&value()) ValueType(::std::forward(args)...); + new (pointer()) ValueType(::std::forward(args)...); // 原子发布数据: // 1、获取当前注册的回调链 // 2、标记后续不再接受回调注册 @@ -237,22 +242,22 @@ inline bool FutureContext::wait_for( template template inline void FutureContext::on_finish(C&& callback) noexcept { - auto head = _head.load(::std::memory_order_relaxed); + auto head = _head.load(::std::memory_order_acquire); if (is_sealed(head)) { internal::future::run_callback(callback, value()); return; } #if __cplusplus >= 201402L - auto* node = new CallbackNode( - [this, callback = ::std::forward(callback)]() mutable { - internal::future::run_callback(callback, value()); + auto node = new CallbackNode( + [this, captured_callback = ::std::forward(callback)]() mutable { + internal::future::run_callback(captured_callback, value()); }); #else // __cplusplus < 201402L - auto* node = new CallbackNode( + auto node = new CallbackNode( ::std::bind(internal::future::run_callback, uncomposable_bind_argument(::std::forward(callback)), - ::std::ref(value()))); + ::std::ref(*pointer()))); #endif // __cplusplus while (true) { @@ -280,7 +285,7 @@ inline void FutureContext::clear() noexcept { _futex.value().store(0, ::std::memory_order_relaxed); auto* head = _head.exchange(nullptr, ::std::memory_order_relaxed); if (is_sealed(head)) { - value().~ValueType(); + pointer()->~ValueType(); } else { while (head != nullptr) { auto* next_head = head->next; @@ -306,7 +311,15 @@ FutureContext::seal() noexcept { template inline typename FutureContext::ValueType& FutureContext::value() noexcept { - return reinterpret_cast(_storage); + assert(ready(::std::memory_order_acquire) && + "cannot read value before ready"); + return *pointer(); +} + +template +inline typename FutureContext::ValueType* +FutureContext::pointer() noexcept { + return reinterpret_cast(_storage); } template @@ -394,13 +407,8 @@ inline bool Future::wait_for( template template inline void Future::on_finish(C&& callback) noexcept { - if (ABSL_PREDICT_TRUE(valid())) { - _context->on_finish(::std::forward(callback)); - _context.reset(); - return; - } - - assert(false && "try watch invalid future"); + assert(valid() && "try watch invalid future"); + _context->on_finish(::std::forward(callback)); } template @@ -410,11 +418,11 @@ inline typename Future::template ThenFuture Future::then( ThenPromise promise; auto future = promise.get_future(); #if __cplusplus >= 201402L - on_finish( - [promise = ::std::move(promise), - callback = ::std::forward(callback)](ResultType& value) mutable { - internal::future::run_callback(promise, callback, value); - }); + on_finish([captured_promise = ::std::move(promise), + captured_callback = + ::std::forward(callback)](ResultType& value) mutable { + internal::future::run_callback(captured_promise, captured_callback, value); + }); #else // __cplusplus < 201402L on_finish( ::std::bind(internal::future::run_callback, M, C, ResultType>, diff --git a/src/babylon/logging/log_severity.h b/src/babylon/logging/log_severity.h index c9f5be4..dfcc142 100644 --- a/src/babylon/logging/log_severity.h +++ b/src/babylon/logging/log_severity.h @@ -6,6 +6,8 @@ #include "babylon/protect.h" // clang-format on +#include // uint8_t + BABYLON_NAMESPACE_BEGIN class LogSeverity { @@ -37,12 +39,17 @@ class LogSeverity { private: uint8_t _value {DEBUG}; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpragmas" +#pragma GCC diagnostic ignored "-Wunknown-warning-option" +#pragma GCC diagnostic ignored "-Wunsafe-buffer-usage" static constexpr StringView names[NUM] = { [DEBUG] = "DEBUG", [INFO] = "INFO", [WARNING] = "WARNING", [FATAL] = "FATAL", }; +#pragma GCC diagnostic pop }; inline constexpr LogSeverity::LogSeverity(uint8_t value) noexcept diff --git a/src/babylon/logging/rolling_file_object.cpp b/src/babylon/logging/rolling_file_object.cpp index 93c8116..cbffa07 100644 --- a/src/babylon/logging/rolling_file_object.cpp +++ b/src/babylon/logging/rolling_file_object.cpp @@ -45,7 +45,7 @@ void RollingFileObject::delete_expire_files() noexcept { ::std::vector<::std::string> to_be_deleted_files; { - ::std::lock_guard<::std::mutex> {_tracking_files_mutex}; + ::std::lock_guard<::std::mutex> lock {_tracking_files_mutex}; while (_tracking_files.size() > _max_file_number) { to_be_deleted_files.emplace_back(::std::move(_tracking_files.front())); _tracking_files.pop_front(); @@ -165,7 +165,7 @@ int RollingFileObject::open() noexcept { } if (_max_file_number != SIZE_MAX) { - ::std::lock_guard<::std::mutex> {_tracking_files_mutex}; + ::std::lock_guard<::std::mutex> lock {_tracking_files_mutex}; _tracking_files.push_back(full_file_name); } auto old_fd = _fd; diff --git a/src/babylon/mlock.cpp b/src/babylon/mlock.cpp index 8620f02..248da0f 100644 --- a/src/babylon/mlock.cpp +++ b/src/babylon/mlock.cpp @@ -32,8 +32,8 @@ int MemoryLocker::start() noexcept { return -1; } _stoped = ::std::promise(); - auto stoped = _stoped.get_future(); - _thread = ::std::thread([&, stoped = ::std::move(stoped)] { + _thread = ::std::thread([&] { + auto stoped = _stoped.get_future(); do { check_and_lock(); _round++; diff --git a/src/babylon/move_only_function.h b/src/babylon/move_only_function.h index 5947451..a24dffe 100644 --- a/src/babylon/move_only_function.h +++ b/src/babylon/move_only_function.h @@ -50,27 +50,6 @@ class MoveOnlyFunction { ::std::function _function; }; -// ::std::bind在遇到参数也是bind_expression时,会进行特化处理,做多级组合 -// 对需要把函数作为参数传递的场景,会导致bind_expression和普通函数表现不一致 -// 这里专门制作一个包装以便统一处理,例如 -// ::std::bind(function, normal_args, args_maybe_bind_result); -// 改为 -// ::std::bind(function, normal_args, -// uncomposable_bind_argument(arg_maybe_bind_result)); -template -struct UncomposableBindArgument { - inline UncomposableBindArgument(T&& value); - inline UncomposableBindArgument(const T& value); - inline operator T&() noexcept; - inline operator const T&() const noexcept; - - T value; -}; - -template -inline UncomposableBindArgument::type> -uncomposable_bind_argument(T&& value); - BABYLON_NAMESPACE_END #include "babylon/move_only_function.hpp" diff --git a/src/babylon/move_only_function.hpp b/src/babylon/move_only_function.hpp index 66093e0..6a9ecbf 100644 --- a/src/babylon/move_only_function.hpp +++ b/src/babylon/move_only_function.hpp @@ -77,34 +77,4 @@ inline R MoveOnlyFunction::operator()(Args... args) const { return _function(::std::forward(args)...); } -//////////////////////////////////////////////////////////////////////////////// -// UncomposableBindArgument begin -template -inline UncomposableBindArgument::UncomposableBindArgument(T&& input_value) - : value(::std::move(input_value)) {} - -template -inline UncomposableBindArgument::UncomposableBindArgument( - const T& input_value) - : value(input_value) {} - -template -inline UncomposableBindArgument::operator T&() noexcept { - return value; -} - -template -inline UncomposableBindArgument::operator const T&() const noexcept { - return value; -} - -template -inline UncomposableBindArgument::type> -uncomposable_bind_argument(T&& value) { - return UncomposableBindArgument::type>( - ::std::forward(value)); -} -// UncomposableBindArgument end -//////////////////////////////////////////////////////////////////////////////// - BABYLON_NAMESPACE_END diff --git a/src/babylon/reusable/manager.h b/src/babylon/reusable/manager.h index 9e1bc1e..7fa70f3 100644 --- a/src/babylon/reusable/manager.h +++ b/src/babylon/reusable/manager.h @@ -69,8 +69,8 @@ class ReusableManager { // 但是子类创建完成后,就可以将反射信息记录到ReusableTraits机制的AllocationMetadata中 // 后续的重建就可以依照协议完成了 template ::type>::value>::type> + typename = typename ::std::enable_if< + ::std::is_same>::value>::type> ReusableAccessor create_object(C&& creator) noexcept; // 对所有create_object创建的实例统计执行逻辑清空 diff --git a/src/babylon/reusable/memory_resource.cpp b/src/babylon/reusable/memory_resource.cpp index 0ba42d6..6cf14df 100644 --- a/src/babylon/reusable/memory_resource.cpp +++ b/src/babylon/reusable/memory_resource.cpp @@ -248,8 +248,8 @@ void ExclusiveMonotonicBufferResource::release() noexcept { PAGE_ARRAY_CAPACITY - _last_page_pointer); _last_page_array = _last_page_array->next; - //将pages的地址提前拷一份存起来, - //避免当page_rray持有自己时,page_rray所在的page被提前释放后_last_page_pointer失效 + // 将pages的地址提前拷一份存起来, + // 避免当page_rray持有自己时,page_rray所在的page被提前释放后_last_page_pointer失效 for (size_t i = 0; i < size; ++i) { tmp_pages[i] = SanitizerHelper::unpoison(_last_page_pointer[i], _page_allocator->page_size()); diff --git a/src/babylon/reusable/message.cpp b/src/babylon/reusable/message.cpp index d7cd374..6f00146 100644 --- a/src/babylon/reusable/message.cpp +++ b/src/babylon/reusable/message.cpp @@ -165,6 +165,9 @@ void MessageAllocationMetadata::FieldAllocationMetadata::update_repeated_field( __BABYLON_TMP_CASE(CPPTYPE_STRING, ::std::string) __BABYLON_TMP_CASE(CPPTYPE_MESSAGE, ::google::protobuf::Message) #undef __BABYLON_TMP_CASE + default: + assert(false); + break; } } @@ -223,6 +226,9 @@ void MessageAllocationMetadata::FieldAllocationMetadata::reserve_repeated_field( repeated_field->Clear(); break; } + default: + assert(false); + break; } } #pragma GCC diagnostic pop diff --git a/src/babylon/time.cpp b/src/babylon/time.cpp index 0b14cbf..941245c 100644 --- a/src/babylon/time.cpp +++ b/src/babylon/time.cpp @@ -98,8 +98,9 @@ int FastLocalTimeConverter::to_weekday(::absl::CivilSecond civil) noexcept { return 5; case ::absl::Weekday::saturday: return 6; + default: + ::abort(); } - return -1; } ABSL_ATTRIBUTE_NOINLINE diff --git a/src/babylon/type_traits.h b/src/babylon/type_traits.h index 1d6a1a7..1f027f1 100644 --- a/src/babylon/type_traits.h +++ b/src/babylon/type_traits.h @@ -1,22 +1,53 @@ #pragma once #include "babylon/absl_base_internal_invoke.h" // ::absl::base_internal::is_invocable_r -#include "babylon/environment.h" -#include "babylon/string_view.h" // StringView +#include "babylon/string_view.h" // StringView // clang-format off #include "babylon/protect.h" // clang-format on +#include "absl/utility/utility.h" // absl::apply + #include // std::invoke_result -#if __cpp_lib_is_invocable < 201703L namespace std { +#if !__cpp_lib_is_invocable template using is_invocable = ::absl::base_internal::is_invocable_r; +using ::absl::base_internal::invoke_result_t; using ::absl::base_internal::is_invocable_r; -}; // namespace std -#endif // __cpp_lib_is_invocable < 201703L +#endif // !__cpp_lib_is_invocable + +#if !__cpp_lib_apply +using ::absl::apply; +#endif // !__cpp_lib_apply + +#if !__cpp_lib_invoke +using ::absl::base_internal::invoke; +#endif // !__cpp_lib_invoke +} // namespace std + +#if !__cpp_lib_is_invocable +BABYLON_NAMESPACE_BEGIN +namespace internal { +template +struct InvokeResult {}; +template +struct InvokeResult< + typename ::std::enable_if<::std::is_invocable::value>::type, F, + Args...> { + using type = ::std::invoke_result_t; +}; +} // namespace internal +BABYLON_NAMESPACE_END + +namespace std { +template +struct invoke_result + : public ::babylon::internal::InvokeResult {}; +} // namespace std +#endif // !__cpp_lib_is_invocable BABYLON_NAMESPACE_BEGIN @@ -56,7 +87,7 @@ struct TypeId { inline static const StringView get_type_name() noexcept; static const Id ID; #else // __clang__ || GLIBCXX_VERSION >= 920200312 - inline static constexpr StringView get_type_name() noexcept; + inline static constexpr babylon::StringView get_type_name() noexcept; static constexpr Id ID {TypeId::get_type_name()}; #endif // __clang__ || GLIBCXX_VERSION >= 920200312 }; @@ -233,13 +264,108 @@ class ParameterPack { // ParameterPack end //////////////////////////////////////////////////////////////////////////////// -#if __cpp_lib_is_invocable >= 201703L -template -struct InvokeResult : public ::std::invoke_result {}; -#else // !__cpp_lib_is_invocable -template -struct InvokeResult : public ::std::result_of {}; -#endif // !__cpp_lib_is_invocable +template typename> +struct IsSpecialization : public ::std::false_type {}; +template