diff --git a/Makefile b/Makefile index 71cd57b9b..f549e3627 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,8 @@ MONITOR_SRC := src/monitor/teramo_main.cc MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \ src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \ - src/master/test/master_impl_test.cc src/io/test/load_test.cc + src/master/test/master_impl_test.cc src/io/test/load_test.cc \ + src/common/test/thread_pool_test.cc TEST_OUTPUT := test_output UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest @@ -82,7 +83,8 @@ TERA_C_SO = libtera_c.so JNILIBRARY = libjni_tera.so BENCHMARK = tera_bench tera_mark TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \ - tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test + tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \ + thread_pool_test .PHONY: all clean cleanall test @@ -161,6 +163,9 @@ src/leveldb/libleveldb.a: FORCE tera_bench: # unit test +thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY) + $(CXX) -o $@ $^ $(LDFLAGS) + prop_tree_test: src/utils/test/prop_tree_test.o $(LIBRARY) $(CXX) -o $@ $^ $(LDFLAGS) diff --git a/src/common/mutex.h b/src/common/mutex.h index b29ba4942..46e89044f 100644 --- a/src/common/mutex.h +++ b/src/common/mutex.h @@ -120,10 +120,14 @@ class MutexLock { class CondVar { public: explicit CondVar(Mutex* mu) : mu_(mu) { - PthreadCall("init condvar", pthread_cond_init(&cond_, NULL)); + // use monotonic clock + PthreadCall("condattr init ", pthread_condattr_init(&attr_)); + PthreadCall("condattr setclock ", pthread_condattr_setclock(&attr_, CLOCK_MONOTONIC)); + PthreadCall("condvar init with attr", pthread_cond_init(&cond_, &attr_)); } ~CondVar() { - PthreadCall("destroy condvar", pthread_cond_destroy(&cond_)); + PthreadCall("condvar destroy", pthread_cond_destroy(&cond_)); + PthreadCall("condattr destroy", pthread_condattr_destroy(&attr_)); } void Wait(const char* msg = NULL) { int64_t msg_threshold = mu_->msg_threshold_; @@ -134,12 +138,13 @@ class CondVar { // Time wait in us // timeout < 0 would cause ETIMEOUT and return false immediately bool TimeWaitInUs(int timeout, const char* msg = NULL) { - timespec ts; - struct timeval tv; - gettimeofday(&tv, NULL); - int64_t usec = tv.tv_usec + timeout; - ts.tv_sec = tv.tv_sec + usec / 1000000; - ts.tv_nsec = (usec % 1000000) * 1000; + // ref: http://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.neutrino_lib_ref/p/pthread_cond_timedwait.html + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + int64_t nsec = ((int64_t)timeout) * 1000 + ts.tv_nsec; + ts.tv_sec += nsec / 1000000000; + ts.tv_nsec = nsec % 1000000000; + int64_t msg_threshold = mu_->msg_threshold_; mu_->BeforeUnlock(); int ret = pthread_cond_timedwait(&cond_, &mu_->mu_, &ts); @@ -163,6 +168,7 @@ class CondVar { void operator=(const CondVar&); Mutex* mu_; pthread_cond_t cond_; + pthread_condattr_t attr_; }; } // namespace common diff --git a/src/common/test/thread_pool_test.cc b/src/common/test/thread_pool_test.cc new file mode 100644 index 000000000..6c1e421cb --- /dev/null +++ b/src/common/test/thread_pool_test.cc @@ -0,0 +1,94 @@ +// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "gtest/gtest.h" + +#include "common/mutex.h" +#include "common/thread_pool.h" +#include "common/timer.h" + +namespace tera { + +TEST(TimerTest, Basic) { + struct timespec ts1, ts2, ts3; + struct timeval tv; + + clock_gettime(CLOCK_MONOTONIC, &ts1); + clock_gettime(CLOCK_MONOTONIC_RAW, &ts3); + clock_gettime(CLOCK_REALTIME, &ts2); + gettimeofday(&tv, NULL); + + std::cout << "ts1.tv_sec " << ts1.tv_sec + << ", ts1.tv_nsec " << ts1.tv_nsec + << std::endl; + std::cout << "ts2.tv_sec " << ts2.tv_sec + << ", ts2.tv_nsec " << ts2.tv_nsec + << std::endl; + std::cout << "ts3.tv_sec " << ts3.tv_sec + << ", ts3.tv_nsec " << ts3.tv_nsec + << std::endl; + std::cout << "tv.tv_sec " << tv.tv_sec + << ", tv.tv_usec " << tv.tv_usec + << std::endl; + + int delta = 0; + delta = ts2.tv_sec - tv.tv_sec; + ASSERT_TRUE(-1 <= delta && delta <= 1); + ASSERT_TRUE(ts1.tv_sec < ts2.tv_sec); + ASSERT_TRUE(ts1.tv_sec < tv.tv_sec); +} + +TEST(TimerTest, test1) { + struct timespec ts1; + struct timeval tv; + + clock_gettime(CLOCK_REALTIME, &ts1); + gettimeofday(&tv, NULL); + int64_t ts = common::timer::get_micros(); + + int delta = 0; + delta = ts1.tv_sec - tv.tv_sec; + ASSERT_TRUE(-1 <= delta && delta <= 1); + delta = ts / 1000000 - tv.tv_sec; + ASSERT_TRUE(-1 <= delta && delta <= 1); +} + +common::Mutex mu; +common::CondVar cv(&mu); + +void DelayTask_issue1(int32_t time, int32_t time_ms) { + struct timespec ts1; + clock_gettime(CLOCK_MONOTONIC, &ts1); + int delta = ts1.tv_sec - (time + time_ms / 1000); + ASSERT_TRUE(-1 <= delta && delta <= 1); + cv.Signal(); + return; +} + +TEST(ThreadPoolTest, Basic) { + mu.Lock(); + common::ThreadPool* pool = new common::ThreadPool(1000); + struct timespec ts1; + clock_gettime(CLOCK_MONOTONIC, &ts1); + ThreadPool::Task task = + std::bind(&DelayTask_issue1, ts1.tv_sec, 5000); + pool->DelayTask(5000, task); + + cv.Wait(); + mu.Unlock(); + delete pool; +} + +} // namespace tera diff --git a/src/common/thread_pool.h b/src/common/thread_pool.h index 934b98fd1..461c54fbe 100644 --- a/src/common/thread_pool.h +++ b/src/common/thread_pool.h @@ -16,7 +16,6 @@ #include #include "mutex.h" -#include "timer.h" namespace common { @@ -84,19 +83,19 @@ class ThreadPool { // Add a task to the thread pool. void AddTask(const Task& task) { MutexLock lock(&mutex_, "AddTask"); - queue_.push_back(BGItem(0, timer::get_micros(), task)); + queue_.push_back(BGItem(0, get_micros(), task)); ++pending_num_; work_cv_.Signal(); } void AddPriorityTask(const Task& task) { MutexLock lock(&mutex_); - queue_.push_front(BGItem(0, timer::get_micros(), task)); + queue_.push_front(BGItem(0, get_micros(), task)); ++pending_num_; work_cv_.Signal(); } int64_t DelayTask(int64_t delay, const Task& task) { MutexLock lock(&mutex_); - int64_t now_time = timer::get_micros(); + int64_t now_time = get_micros(); int64_t exe_time = now_time + delay * 1000; BGItem bg_item(++last_task_id_, exe_time, task); time_queue_.push(bg_item); @@ -172,6 +171,12 @@ class ThreadPool { ThreadPool(const ThreadPool&); void operator=(const ThreadPool&); + int64_t get_micros() { // get us before machine reboot + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000 + static_cast(ts.tv_nsec) / 1000; + } + static void* ThreadWrapper(void* arg) { reinterpret_cast(arg)->ThreadProc(); return NULL; @@ -188,7 +193,7 @@ class ThreadPool { } // Timer task if (!time_queue_.empty()) { - int64_t now_time = timer::get_micros(); + int64_t now_time = get_micros(); BGItem bg_item = time_queue_.top(); int64_t wait_time = bg_item.exe_time - now_time; // in us if (wait_time <= 0) { @@ -203,7 +208,7 @@ class ThreadPool { mutex_.Unlock(); task(bg_item.id); mutex_.Lock("ThreadProcRelock"); - task_cost_sum_ += timer::get_micros() - now_time; + task_cost_sum_ += get_micros() - now_time; task_count_++; running_task_ids_.erase(bg_item.id); } @@ -219,13 +224,13 @@ class ThreadPool { int64_t exe_time = queue_.front().exe_time; queue_.pop_front(); --pending_num_; - int64_t start_time = timer::get_micros(); + int64_t start_time = get_micros(); schedule_cost_sum_ += start_time - exe_time; schedule_count_++; mutex_.Unlock(); task(0); mutex_.Lock("ThreadProcRelock2"); - task_cost_sum_ += timer::get_micros() - start_time; + task_cost_sum_ += get_micros() - start_time; task_count_++; } } diff --git a/src/common/timer.h b/src/common/timer.h index 14ec8d62c..1b335bb6b 100644 --- a/src/common/timer.h +++ b/src/common/timer.h @@ -27,17 +27,15 @@ static inline std::string get_curtime_str() { } static inline int64_t get_micros() { - struct timeval tv; - gettimeofday(&tv, NULL); - return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return static_cast(ts.tv_sec) * 1000000 + static_cast(ts.tv_nsec) / 1000; } static inline int64_t get_unique_micros(int64_t ref) { - struct timeval tv; int64_t now; do { - gettimeofday(&tv, NULL); - now = static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + now = get_micros(); } while (now == ref); return now; } diff --git a/src/leveldb/Makefile b/src/leveldb/Makefile index c2a70a810..c9162d2eb 100644 --- a/src/leveldb/Makefile +++ b/src/leveldb/Makefile @@ -21,7 +21,7 @@ include build_config.mk CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -ldl -lsnappy +LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -lrt -ldl -lsnappy LIBS += $(PLATFORM_LIBS) LIBOBJECTS = $(SOURCES:.cc=.o) diff --git a/src/leveldb/bench/tera_bench.cc b/src/leveldb/bench/tera_bench.cc index 53ed6a01a..ea7ffe93b 100644 --- a/src/leveldb/bench/tera_bench.cc +++ b/src/leveldb/bench/tera_bench.cc @@ -222,8 +222,8 @@ class Benchmark { snprintf(msg, sizeof(msg), "(%d ops)", num_entries); message_ = msg; } - char ts[10]; - snprintf(ts, sizeof(ts), "%d", FLAGS_value_seed); + char ts[20]; + snprintf(ts, sizeof(ts), "%lld", ((long long int)FLAGS_value_seed) * 1000000); // Write to database int i = FLAGS_start_key; diff --git a/src/leveldb/port/port_posix.cc b/src/leveldb/port/port_posix.cc index 37ba0c3bb..0ad21d620 100644 --- a/src/leveldb/port/port_posix.cc +++ b/src/leveldb/port/port_posix.cc @@ -50,32 +50,39 @@ void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } CondVar::CondVar(Mutex* mu) : mu_(mu) { - PthreadCall("init cv", pthread_cond_init(&cv_, NULL)); + // use monotonic clock + PthreadCall("condattr init ", pthread_condattr_init(&attr_)); + PthreadCall("condattr setclock ", pthread_condattr_setclock(&attr_, CLOCK_MONOTONIC)); + PthreadCall("condvar init with attr", pthread_cond_init(&cond_, &attr_)); } -CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } +CondVar::~CondVar() { + PthreadCall("condvar destroy", pthread_cond_destroy(&cond_)); + PthreadCall("condattr destroy", pthread_condattr_destroy(&attr_)); +} void CondVar::Wait() { - PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); + PthreadCall("condvar wait", pthread_cond_wait(&cond_, &mu_->mu_)); } +// wait in ms bool CondVar::Wait(int64_t wait_millisec) { assert(wait_millisec >= 0); + // ref: http://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.neutrino_lib_ref/p/pthread_cond_timedwait.html struct timespec ts; - struct timeval tp; - gettimeofday(&tp, NULL); - uint64_t usec = tp.tv_usec + wait_millisec * 1000; - ts.tv_sec = tp.tv_sec + usec / 1000000; - ts.tv_nsec = (usec % 1000000) * 1000; - return (0 == pthread_cond_timedwait(&cv_, &mu_->mu_, &ts)); + clock_gettime(CLOCK_MONOTONIC, &ts); + int64_t nsec = ((int64_t)wait_millisec) * 1000000 + ts.tv_nsec; + ts.tv_sec += nsec / 1000000000; + ts.tv_nsec = nsec % 1000000000; + return (0 == pthread_cond_timedwait(&cond_, &mu_->mu_, &ts)); } void CondVar::Signal() { - PthreadCall("signal", pthread_cond_signal(&cv_)); + PthreadCall("signal", pthread_cond_signal(&cond_)); } void CondVar::SignalAll() { - PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); + PthreadCall("broadcast", pthread_cond_broadcast(&cond_)); } void InitOnce(OnceType* once, void (*initializer)()) { diff --git a/src/leveldb/port/port_posix.h b/src/leveldb/port/port_posix.h index bb80c31f9..ed19e222f 100644 --- a/src/leveldb/port/port_posix.h +++ b/src/leveldb/port/port_posix.h @@ -113,7 +113,8 @@ class CondVar { void Signal(); void SignalAll(); private: - pthread_cond_t cv_; + pthread_cond_t cond_; + pthread_condattr_t attr_; Mutex* mu_; }; diff --git a/src/leveldb/util/env_posix.cc b/src/leveldb/util/env_posix.cc index 2918a73f1..f50123141 100644 --- a/src/leveldb/util/env_posix.cc +++ b/src/leveldb/util/env_posix.cc @@ -809,9 +809,9 @@ class PosixEnv : public Env { } virtual uint64_t NowMicros() { - struct timeval tv; - gettimeofday(&tv, NULL); - return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return static_cast(ts.tv_sec) * 1000000 + static_cast(ts.tv_nsec) / 1000; } virtual void SleepForMicroseconds(int micros) { diff --git a/src/leveldb/util/raw_key_operator_test.cc b/src/leveldb/util/raw_key_operator_test.cc index 1704822eb..4288c3257 100644 --- a/src/leveldb/util/raw_key_operator_test.cc +++ b/src/leveldb/util/raw_key_operator_test.cc @@ -19,9 +19,9 @@ void print_bytes(const char* str, int len) { } int64_t get_micros() { - struct timeval tv; - gettimeofday(&tv, NULL); - return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return static_cast(ts.tv_sec) * 1000000 + static_cast(ts.tv_nsec) / 1000; } class RawKeyOperatorTest {}; diff --git a/src/utils/timer.h b/src/utils/timer.h index 4cae24137..62428c754 100644 --- a/src/utils/timer.h +++ b/src/utils/timer.h @@ -27,23 +27,19 @@ static inline std::string get_curtime_str_plain() { } static inline int64_t get_micros() { - struct timeval tv; - gettimeofday(&tv, NULL); - return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return static_cast(ts.tv_sec) * 1000000 + static_cast(ts.tv_nsec) / 1000; } static inline int64_t get_millis() { - struct timeval tv; - gettimeofday(&tv, NULL); - return static_cast(tv.tv_sec) * 1000 + tv.tv_usec / 1000; + return get_micros() / 1000; } static inline int64_t get_unique_micros(int64_t ref) { - struct timeval tv; int64_t now; do { - gettimeofday(&tv, NULL); - now = static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + now = get_micros(); } while (now == ref); return now; }