Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

200 lines of code to rewrite rocksdb into a coroutine program (demo version, only for review) #6

Open
wants to merge 3 commits into
base: 6.1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ endif()

list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake/modules/")

include(FetchContent)
set(FETCHCONTENT_QUIET false)
FetchContent_Declare(
photon
GIT_REPOSITORY https://github.com/alibaba/PhotonLibOS.git
GIT_TAG v0.6.3
)
set(ENABLE_URING ON CACHE INTERNAL "Enable iouring")
FetchContent_MakeAvailable(photon)
set(PHOTON_INCLUDE_DIR ${photon_SOURCE_DIR}/include/)

option(WITH_JEMALLOC "build with JeMalloc" OFF)
option(WITH_SNAPPY "build with SNAPPY" OFF)
option(WITH_LZ4 "build with lz4" OFF)
Expand Down Expand Up @@ -178,7 +189,7 @@ else()
if(MINGW)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format")
endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14")
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer")
include(CheckCXXCompilerFlag)
Expand Down Expand Up @@ -459,6 +470,7 @@ endif()
include_directories(${PROJECT_SOURCE_DIR})
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src)
include_directories(${PHOTON_INCLUDE_DIR})
find_package(Threads REQUIRED)

add_subdirectory(third-party/gtest-1.7.0/fused-src/gtest)
Expand Down Expand Up @@ -742,18 +754,18 @@ else()

add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES})
target_link_libraries(${ROCKSDB_SHARED_LIB}
${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
${THIRDPARTY_LIBS} ${SYSTEM_LIBS} -Wl,--whole-archive $<BUILD_INTERFACE:photon_static> -Wl,--no-whole-archive)
set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES
LINKER_LANGUAGE CXX
VERSION ${ROCKSDB_VERSION}
SOVERSION ${ROCKSDB_VERSION_MAJOR}
CXX_STANDARD 11
CXX_STANDARD 14
OUTPUT_NAME "rocksdb")
endif()

add_library(${ROCKSDB_STATIC_LIB} STATIC ${SOURCES})
target_link_libraries(${ROCKSDB_STATIC_LIB}
${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
${THIRDPARTY_LIBS} ${SYSTEM_LIBS} $<BUILD_INTERFACE:photon_static>)

if(WIN32)
add_library(${ROCKSDB_IMPORT_LIB} SHARED ${SOURCES})
Expand Down
55 changes: 31 additions & 24 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#if defined(OS_LINUX)
#include <linux/fs.h>
#endif
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -120,8 +119,9 @@ class PosixEnv : public Env {
PosixEnv();

~PosixEnv() override {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
LOG_INFO("global PosixEnv destruct: Join thread pools");
for (auto& tid : threads_to_join_) {
tid.join();
}
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads();
Expand Down Expand Up @@ -760,18 +760,11 @@ class PosixEnv : public Env {
return thread_status_updater_->GetThreadList(thread_list);
}

static uint64_t gettid(pthread_t tid) {
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}

static uint64_t gettid() {
pthread_t tid = pthread_self();
return gettid(tid);
return (uint64_t) photon::CURRENT;
}

uint64_t GetThreadID() const override { return gettid(pthread_self()); }
uint64_t GetThreadID() const override { return gettid(); }

Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override {
struct statvfs sbuf;
Expand Down Expand Up @@ -847,7 +840,7 @@ class PosixEnv : public Env {
return 0;
}

void SleepForMicroseconds(int micros) override { usleep(micros); }
void SleepForMicroseconds(int micros) override { std::this_thread::sleep_for(std::chrono::microseconds(micros)); }

Status GetHostName(char* name, uint64_t len) override {
int ret = gethostname(name, static_cast<size_t>(len));
Expand Down Expand Up @@ -1008,8 +1001,8 @@ class PosixEnv : public Env {
size_t page_size_;

std::vector<ThreadPoolImpl> thread_pools_;
pthread_mutex_t mu_;
std::vector<pthread_t> threads_to_join_;
std::mutex mu_;
std::vector<std::thread> threads_to_join_;
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool allow_non_owner_access_;
Expand All @@ -1021,7 +1014,6 @@ PosixEnv::PosixEnv()
page_size_(getpagesize()),
thread_pools_(Priority::TOTAL),
allow_non_owner_access_(true) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id));
Expand Down Expand Up @@ -1059,20 +1051,16 @@ static void* StartThreadWrapper(void* arg) {
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t);
ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
std::lock_guard<std::mutex> lock(mu_);
threads_to_join_.emplace_back(std::thread(&StartThreadWrapper, state));
}

void PosixEnv::WaitForJoin() {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
for (auto& tid : threads_to_join_) {
tid.join();
}
threads_to_join_.clear();
}
Expand Down Expand Up @@ -1104,6 +1092,24 @@ std::string Env::GenerateUniqueId() {
return uuid2;
}

PhotonEnv::PhotonEnv() {
int ret = photon::init(photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE);
if (ret != 0) {
LOG_FATAL("photon init failed");
}
// Max 8 vcpu. Hardcoded for now.
ret = photon::std::work_pool_init(8, photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE);
if (ret != 0) {
LOG_FATAL("work pool init failed");
}
}

PhotonEnv::~PhotonEnv() {
photon::std::work_pool_fini();
photon::fini();
LOG_INFO("PhotonEnv finished");
}

//
// Default Posix Env
//
Expand All @@ -1118,6 +1124,7 @@ Env* Env::Default() {
// of their construction, having this call here guarantees that
// the destructor of static PosixEnv will go first, then the
// the singletons of ThreadLocalPtr.
PhotonEnv::Singleton();
ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS();
Expand Down
36 changes: 10 additions & 26 deletions env/io_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,8 @@ Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
size_t left = n;
char* ptr = scratch;
while (left > 0) {
r = pread(fd_, ptr, left, static_cast<off_t>(offset));
r = photon::iouring_pread(fd_, ptr, left, offset, 0, -1);
if (r <= 0) {
if (r == -1 && errno == EINTR) {
continue;
}
break;
}
ptr += r;
Expand Down Expand Up @@ -335,11 +332,8 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
size_t left = n;
char* ptr = scratch;
while (left > 0) {
r = pread(fd_, ptr, left, static_cast<off_t>(offset));
r = photon::iouring_pread(fd_, ptr, left, offset, 0, -1);
if (r <= 0) {
if (r == -1 && errno == EINTR) {
continue;
}
break;
}
ptr += r;
Expand Down Expand Up @@ -761,6 +755,7 @@ Status PosixWritableFile::Append(const Slice& data) {
size_t left = data.size();
while (left != 0) {
ssize_t done = write(fd_, src, left);
std::this_thread::yield();
if (done < 0) {
if (errno == EINTR) {
continue;
Expand All @@ -784,11 +779,8 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
const char* src = data.data();
size_t left = data.size();
while (left != 0) {
ssize_t done = pwrite(fd_, src, left, static_cast<off_t>(offset));
ssize_t done = photon::iouring_pwrite(fd_, src, left, offset, -1);;
if (done < 0) {
if (errno == EINTR) {
continue;
}
return IOError("While pwrite to file at offset " + ToString(offset),
filename_, errno);
}
Expand Down Expand Up @@ -870,14 +862,14 @@ Status PosixWritableFile::Close() {
Status PosixWritableFile::Flush() { return Status::OK(); }

Status PosixWritableFile::Sync() {
if (fdatasync(fd_) < 0) {
if (photon::iouring_fdatasync(fd_) < 0) {
return IOError("While fdatasync", filename_, errno);
}
return Status::OK();
}

Status PosixWritableFile::Fsync() {
if (fsync(fd_) < 0) {
if (photon::iouring_fsync(fd_) < 0) {
return IOError("While fsync", filename_, errno);
}
return Status::OK();
Expand Down Expand Up @@ -984,13 +976,9 @@ Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) {
const char* src = data.data();
size_t left = data.size();
while (left != 0) {
ssize_t done = pwrite(fd_, src, left, offset);
ssize_t done = photon::iouring_pwrite(fd_, src, left, offset, -1);
if (done < 0) {
// error while writing to file
if (errno == EINTR) {
// write was interrupted, try again.
continue;
}
return IOError(
"While write random read/write file at offset " + ToString(offset),
filename_, errno);
Expand All @@ -1010,13 +998,9 @@ Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
size_t left = n;
char* ptr = scratch;
while (left > 0) {
ssize_t done = pread(fd_, ptr, left, offset);
ssize_t done = photon::iouring_pread(fd_, ptr, left, offset, 0, -1);
if (done < 0) {
// error while reading from file
if (errno == EINTR) {
// read was interrupted, try again.
continue;
}
return IOError("While reading random read/write file offset " +
ToString(offset) + " len " + ToString(n),
filename_, errno);
Expand All @@ -1038,14 +1022,14 @@ Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
Status PosixRandomRWFile::Flush() { return Status::OK(); }

Status PosixRandomRWFile::Sync() {
if (fdatasync(fd_) < 0) {
if (photon::iouring_fdatasync(fd_) < 0) {
return IOError("While fdatasync random read/write file", filename_, errno);
}
return Status::OK();
}

Status PosixRandomRWFile::Fsync() {
if (fsync(fd_) < 0) {
if (photon::iouring_fsync(fd_) < 0) {
return IOError("While fsync random read/write file", filename_, errno);
}
return Status::OK();
Expand Down
17 changes: 17 additions & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1454,4 +1454,21 @@ Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname);
// This is a factory method for TimedEnv defined in utilities/env_timed.cc.
Env* NewTimedEnv(Env* base_env);

class PhotonEnv {
public:
static PhotonEnv& Singleton() {
static PhotonEnv instance;
return instance;
}

PhotonEnv(PhotonEnv const&) = delete;
PhotonEnv(PhotonEnv&&) = delete;
PhotonEnv& operator=(PhotonEnv const&) = delete;
PhotonEnv& operator=(PhotonEnv&&) = delete;

private:
PhotonEnv();
~PhotonEnv();
};

} // namespace rocksdb
4 changes: 2 additions & 2 deletions monitoring/iostats_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
namespace rocksdb {

#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
__thread IOStatsContext iostats_context;
photon::thread_local_ptr<IOStatsContext> iostats_context;
#endif

IOStatsContext* get_iostats_context() {
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
return &iostats_context;
return iostats_context.operator->();
#else
return nullptr;
#endif
Expand Down
18 changes: 9 additions & 9 deletions monitoring/iostats_context_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,38 @@

#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
namespace rocksdb {
extern __thread IOStatsContext iostats_context;
extern photon::thread_local_ptr<IOStatsContext> iostats_context;
} // namespace rocksdb

// increment a specific counter by the specified value
#define IOSTATS_ADD(metric, value) (iostats_context.metric += value)
#define IOSTATS_ADD(metric, value) (iostats_context->metric += value)

// Increase metric value only when it is positive
#define IOSTATS_ADD_IF_POSITIVE(metric, value) \
if (value > 0) { IOSTATS_ADD(metric, value); }

// reset a specific counter to zero
#define IOSTATS_RESET(metric) (iostats_context.metric = 0)
#define IOSTATS_RESET(metric) (iostats_context->metric = 0)

// reset all counters to zero
#define IOSTATS_RESET_ALL() (iostats_context.Reset())
#define IOSTATS_RESET_ALL() (iostats_context->Reset())

#define IOSTATS_SET_THREAD_POOL_ID(value) \
(iostats_context.thread_pool_id = value)
(iostats_context->thread_pool_id = value)

#define IOSTATS_THREAD_POOL_ID() (iostats_context.thread_pool_id)
#define IOSTATS_THREAD_POOL_ID() (iostats_context->thread_pool_id)

#define IOSTATS(metric) (iostats_context.metric)
#define IOSTATS(metric) (iostats_context->metric)

// Declare and set start time of the timer
#define IOSTATS_TIMER_GUARD(metric) \
PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \
PerfStepTimer iostats_step_timer_##metric(&(iostats_context->metric)); \
iostats_step_timer_##metric.Start();

// Declare and set start time of the timer
#define IOSTATS_CPU_TIMER_GUARD(metric, env) \
PerfStepTimer iostats_step_timer_##metric( \
&(iostats_context.metric), env, true, \
&(iostats_context->metric), env, true, \
PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
iostats_step_timer_##metric.Start();

Expand Down
4 changes: 2 additions & 2 deletions monitoring/perf_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ PerfContext perf_context;
#if defined(OS_SOLARIS)
__thread PerfContext perf_context_;
#else
thread_local PerfContext perf_context;
photon::thread_local_ptr<PerfContext> perf_context;
#endif
#endif

Expand All @@ -26,7 +26,7 @@ PerfContext* get_perf_context() {
#if defined(OS_SOLARIS)
return &perf_context_;
#else
return &perf_context;
return perf_context.operator->();
#endif
#endif
}
Expand Down
Loading