Skip to content

Commit

Permalink
issue=1251, use clock_gettime(CLOCK_MONOTONIC) to get current time (#…
Browse files Browse the repository at this point in the history
…1252)

bugfix for system time rollback
  • Loading branch information
ajie authored and taocp committed May 11, 2017
1 parent b65f535 commit 6032e59
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 54 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ MONITOR_SRC := src/monitor/teramo_main.cc
MARK_SRC := src/benchmark/mark.cc src/benchmark/mark_main.cc
TEST_SRC := src/utils/test/prop_tree_test.cc src/utils/test/tprinter_test.cc \
src/io/test/tablet_io_test.cc src/io/test/tablet_scanner_test.cc \
src/master/test/master_impl_test.cc src/io/test/load_test.cc
src/master/test/master_impl_test.cc src/io/test/load_test.cc \
src/common/test/thread_pool_test.cc

TEST_OUTPUT := test_output
UNITTEST_OUTPUT := $(TEST_OUTPUT)/unittest
Expand Down Expand Up @@ -82,7 +83,8 @@ TERA_C_SO = libtera_c.so
JNILIBRARY = libjni_tera.so
BENCHMARK = tera_bench tera_mark
TESTS = prop_tree_test tprinter_test string_util_test tablet_io_test \
tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test
tablet_scanner_test fragment_test progress_bar_test master_impl_test load_test \
thread_pool_test

.PHONY: all clean cleanall test

Expand Down Expand Up @@ -161,6 +163,9 @@ src/leveldb/libleveldb.a: FORCE
tera_bench:

# unit test
thread_pool_test: src/common/test/thread_pool_test.o $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

prop_tree_test: src/utils/test/prop_tree_test.o $(LIBRARY)
$(CXX) -o $@ $^ $(LDFLAGS)

Expand Down
22 changes: 14 additions & 8 deletions src/common/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ class MutexLock {
class CondVar {
public:
explicit CondVar(Mutex* mu) : mu_(mu) {
PthreadCall("init condvar", pthread_cond_init(&cond_, NULL));
// use monotonic clock
PthreadCall("condattr init ", pthread_condattr_init(&attr_));
PthreadCall("condattr setclock ", pthread_condattr_setclock(&attr_, CLOCK_MONOTONIC));
PthreadCall("condvar init with attr", pthread_cond_init(&cond_, &attr_));
}
~CondVar() {
PthreadCall("destroy condvar", pthread_cond_destroy(&cond_));
PthreadCall("condvar destroy", pthread_cond_destroy(&cond_));
PthreadCall("condattr destroy", pthread_condattr_destroy(&attr_));
}
void Wait(const char* msg = NULL) {
int64_t msg_threshold = mu_->msg_threshold_;
Expand All @@ -134,12 +138,13 @@ class CondVar {
// Time wait in us
// timeout < 0 would cause ETIMEOUT and return false immediately
bool TimeWaitInUs(int timeout, const char* msg = NULL) {
timespec ts;
struct timeval tv;
gettimeofday(&tv, NULL);
int64_t usec = tv.tv_usec + timeout;
ts.tv_sec = tv.tv_sec + usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
// ref: http://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.neutrino_lib_ref/p/pthread_cond_timedwait.html
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
int64_t nsec = ((int64_t)timeout) * 1000 + ts.tv_nsec;
ts.tv_sec += nsec / 1000000000;
ts.tv_nsec = nsec % 1000000000;

int64_t msg_threshold = mu_->msg_threshold_;
mu_->BeforeUnlock();
int ret = pthread_cond_timedwait(&cond_, &mu_->mu_, &ts);
Expand All @@ -163,6 +168,7 @@ class CondVar {
void operator=(const CondVar&);
Mutex* mu_;
pthread_cond_t cond_;
pthread_condattr_t attr_;
};
} // namespace common

Expand Down
94 changes: 94 additions & 0 deletions src/common/test/thread_pool_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>

#include <functional>
#include <iostream>

#include "gtest/gtest.h"

#include "common/mutex.h"
#include "common/thread_pool.h"
#include "common/timer.h"

namespace tera {

TEST(TimerTest, Basic) {
struct timespec ts1, ts2, ts3;
struct timeval tv;

clock_gettime(CLOCK_MONOTONIC, &ts1);
clock_gettime(CLOCK_MONOTONIC_RAW, &ts3);
clock_gettime(CLOCK_REALTIME, &ts2);
gettimeofday(&tv, NULL);

std::cout << "ts1.tv_sec " << ts1.tv_sec
<< ", ts1.tv_nsec " << ts1.tv_nsec
<< std::endl;
std::cout << "ts2.tv_sec " << ts2.tv_sec
<< ", ts2.tv_nsec " << ts2.tv_nsec
<< std::endl;
std::cout << "ts3.tv_sec " << ts3.tv_sec
<< ", ts3.tv_nsec " << ts3.tv_nsec
<< std::endl;
std::cout << "tv.tv_sec " << tv.tv_sec
<< ", tv.tv_usec " << tv.tv_usec
<< std::endl;

int delta = 0;
delta = ts2.tv_sec - tv.tv_sec;
ASSERT_TRUE(-1 <= delta && delta <= 1);
ASSERT_TRUE(ts1.tv_sec < ts2.tv_sec);
ASSERT_TRUE(ts1.tv_sec < tv.tv_sec);
}

TEST(TimerTest, test1) {
struct timespec ts1;
struct timeval tv;

clock_gettime(CLOCK_REALTIME, &ts1);
gettimeofday(&tv, NULL);
int64_t ts = common::timer::get_micros();

int delta = 0;
delta = ts1.tv_sec - tv.tv_sec;
ASSERT_TRUE(-1 <= delta && delta <= 1);
delta = ts / 1000000 - tv.tv_sec;
ASSERT_TRUE(-1 <= delta && delta <= 1);
}

common::Mutex mu;
common::CondVar cv(&mu);

void DelayTask_issue1(int32_t time, int32_t time_ms) {
struct timespec ts1;
clock_gettime(CLOCK_MONOTONIC, &ts1);
int delta = ts1.tv_sec - (time + time_ms / 1000);
ASSERT_TRUE(-1 <= delta && delta <= 1);
cv.Signal();
return;
}

TEST(ThreadPoolTest, Basic) {
mu.Lock();
common::ThreadPool* pool = new common::ThreadPool(1000);
struct timespec ts1;
clock_gettime(CLOCK_MONOTONIC, &ts1);
ThreadPool::Task task =
std::bind(&DelayTask_issue1, ts1.tv_sec, 5000);
pool->DelayTask(5000, task);

cv.Wait();
mu.Unlock();
delete pool;
}

} // namespace tera
21 changes: 13 additions & 8 deletions src/common/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <vector>

#include "mutex.h"
#include "timer.h"

namespace common {

Expand Down Expand Up @@ -84,19 +83,19 @@ class ThreadPool {
// Add a task to the thread pool.
void AddTask(const Task& task) {
MutexLock lock(&mutex_, "AddTask");
queue_.push_back(BGItem(0, timer::get_micros(), task));
queue_.push_back(BGItem(0, get_micros(), task));
++pending_num_;
work_cv_.Signal();
}
void AddPriorityTask(const Task& task) {
MutexLock lock(&mutex_);
queue_.push_front(BGItem(0, timer::get_micros(), task));
queue_.push_front(BGItem(0, get_micros(), task));
++pending_num_;
work_cv_.Signal();
}
int64_t DelayTask(int64_t delay, const Task& task) {
MutexLock lock(&mutex_);
int64_t now_time = timer::get_micros();
int64_t now_time = get_micros();
int64_t exe_time = now_time + delay * 1000;
BGItem bg_item(++last_task_id_, exe_time, task);
time_queue_.push(bg_item);
Expand Down Expand Up @@ -172,6 +171,12 @@ class ThreadPool {
ThreadPool(const ThreadPool&);
void operator=(const ThreadPool&);

int64_t get_micros() { // get us before machine reboot
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

static void* ThreadWrapper(void* arg) {
reinterpret_cast<ThreadPool*>(arg)->ThreadProc();
return NULL;
Expand All @@ -188,7 +193,7 @@ class ThreadPool {
}
// Timer task
if (!time_queue_.empty()) {
int64_t now_time = timer::get_micros();
int64_t now_time = get_micros();
BGItem bg_item = time_queue_.top();
int64_t wait_time = bg_item.exe_time - now_time; // in us
if (wait_time <= 0) {
Expand All @@ -203,7 +208,7 @@ class ThreadPool {
mutex_.Unlock();
task(bg_item.id);
mutex_.Lock("ThreadProcRelock");
task_cost_sum_ += timer::get_micros() - now_time;
task_cost_sum_ += get_micros() - now_time;
task_count_++;
running_task_ids_.erase(bg_item.id);
}
Expand All @@ -219,13 +224,13 @@ class ThreadPool {
int64_t exe_time = queue_.front().exe_time;
queue_.pop_front();
--pending_num_;
int64_t start_time = timer::get_micros();
int64_t start_time = get_micros();
schedule_cost_sum_ += start_time - exe_time;
schedule_count_++;
mutex_.Unlock();
task(0);
mutex_.Lock("ThreadProcRelock2");
task_cost_sum_ += timer::get_micros() - start_time;
task_cost_sum_ += get_micros() - start_time;
task_count_++;
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/common/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ static inline std::string get_curtime_str() {
}

static inline int64_t get_micros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

static inline int64_t get_unique_micros(int64_t ref) {
struct timeval tv;
int64_t now;
do {
gettimeofday(&tv, NULL);
now = static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
now = get_micros();
} while (now == ref);
return now;
}
Expand Down
2 changes: 1 addition & 1 deletion src/leveldb/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ include build_config.mk
CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT)

LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -ldl -lsnappy
LDFLAGS += $(PLATFORM_LDFLAGS) -L$(SNAPPY_LIBDIR) -lrt -ldl -lsnappy
LIBS += $(PLATFORM_LIBS)

LIBOBJECTS = $(SOURCES:.cc=.o)
Expand Down
4 changes: 2 additions & 2 deletions src/leveldb/bench/tera_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ class Benchmark {
snprintf(msg, sizeof(msg), "(%d ops)", num_entries);
message_ = msg;
}
char ts[10];
snprintf(ts, sizeof(ts), "%d", FLAGS_value_seed);
char ts[20];
snprintf(ts, sizeof(ts), "%lld", ((long long int)FLAGS_value_seed) * 1000000);

// Write to database
int i = FLAGS_start_key;
Expand Down
29 changes: 18 additions & 11 deletions src/leveldb/port/port_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,39 @@ void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); }

CondVar::CondVar(Mutex* mu)
: mu_(mu) {
PthreadCall("init cv", pthread_cond_init(&cv_, NULL));
// use monotonic clock
PthreadCall("condattr init ", pthread_condattr_init(&attr_));
PthreadCall("condattr setclock ", pthread_condattr_setclock(&attr_, CLOCK_MONOTONIC));
PthreadCall("condvar init with attr", pthread_cond_init(&cond_, &attr_));
}

CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }
CondVar::~CondVar() {
PthreadCall("condvar destroy", pthread_cond_destroy(&cond_));
PthreadCall("condattr destroy", pthread_condattr_destroy(&attr_));
}

void CondVar::Wait() {
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
PthreadCall("condvar wait", pthread_cond_wait(&cond_, &mu_->mu_));
}

// wait in ms
bool CondVar::Wait(int64_t wait_millisec) {
assert(wait_millisec >= 0);
// ref: http://www.qnx.com/developers/docs/6.5.0SP1.update/com.qnx.doc.neutrino_lib_ref/p/pthread_cond_timedwait.html
struct timespec ts;
struct timeval tp;
gettimeofday(&tp, NULL);
uint64_t usec = tp.tv_usec + wait_millisec * 1000;
ts.tv_sec = tp.tv_sec + usec / 1000000;
ts.tv_nsec = (usec % 1000000) * 1000;
return (0 == pthread_cond_timedwait(&cv_, &mu_->mu_, &ts));
clock_gettime(CLOCK_MONOTONIC, &ts);
int64_t nsec = ((int64_t)wait_millisec) * 1000000 + ts.tv_nsec;
ts.tv_sec += nsec / 1000000000;
ts.tv_nsec = nsec % 1000000000;
return (0 == pthread_cond_timedwait(&cond_, &mu_->mu_, &ts));
}

void CondVar::Signal() {
PthreadCall("signal", pthread_cond_signal(&cv_));
PthreadCall("signal", pthread_cond_signal(&cond_));
}

void CondVar::SignalAll() {
PthreadCall("broadcast", pthread_cond_broadcast(&cv_));
PthreadCall("broadcast", pthread_cond_broadcast(&cond_));
}

void InitOnce(OnceType* once, void (*initializer)()) {
Expand Down
3 changes: 2 additions & 1 deletion src/leveldb/port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class CondVar {
void Signal();
void SignalAll();
private:
pthread_cond_t cv_;
pthread_cond_t cond_;
pthread_condattr_t attr_;
Mutex* mu_;
};

Expand Down
6 changes: 3 additions & 3 deletions src/leveldb/util/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,9 @@ class PosixEnv : public Env {
}

virtual uint64_t NowMicros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

virtual void SleepForMicroseconds(int micros) {
Expand Down
6 changes: 3 additions & 3 deletions src/leveldb/util/raw_key_operator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ void print_bytes(const char* str, int len) {
}

int64_t get_micros() {
struct timeval tv;
gettimeofday(&tv, NULL);
return static_cast<int64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return static_cast<int64_t>(ts.tv_sec) * 1000000 + static_cast<int64_t>(ts.tv_nsec) / 1000;
}

class RawKeyOperatorTest {};
Expand Down
Loading

0 comments on commit 6032e59

Please sign in to comment.