Skip to content

Commit

Permalink
Merge pull request apache#1270 from gydong/master
Browse files Browse the repository at this point in the history
coding style fix in brpc/trackme.cpp
  • Loading branch information
jamesge authored Oct 20, 2020
2 parents a4e28d6 + 123e444 commit b1f7ea0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 39 deletions.
44 changes: 22 additions & 22 deletions src/brpc/trackme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ static const int32_t TRACKME_MIN_INTERVAL = 30;
static const int32_t TRACKME_MAX_INTERVAL = 600;
static int32_t s_trackme_interval = TRACKME_MIN_INTERVAL;
// Protecting global vars on trackme
static pthread_mutex_t g_trackme_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t s_trackme_mutex = PTHREAD_MUTEX_INITIALIZER;
// For contacting with trackme_server.
static Channel* g_trackme_chan = NULL;
static Channel* s_trackme_chan = NULL;
// Any server address in this process.
static std::string* g_trackme_addr = NULL;
static std::string* s_trackme_addr = NULL;

// Information of bugs.
// Notice that this structure may be a combination of all affected bugs.
Expand All @@ -65,10 +65,10 @@ struct BugInfo {
// can avoid showing the same bug repeatly.
static BugInfo* g_bug_info = NULL;
// The timestamp(microseconds) that we sent TrackMeRequest.
static int64_t g_trackme_last_time = 0;
static int64_t s_trackme_last_time = 0;

// version of RPC.
// Since the code for getting BRPC_REVISION often fails,
// Since the code for getting BRPC_REVISION often fails,
// BRPC_REVISION must be defined to string and be converted to number
// within our code.
// The code running before main() may see g_rpc_version=0, should be OK.
Expand Down Expand Up @@ -116,8 +116,8 @@ int ReadJPaasHostPort(int container_port) {

// Called in server.cpp
void SetTrackMeAddress(butil::EndPoint pt) {
BAIDU_SCOPED_LOCK(g_trackme_mutex);
if (g_trackme_addr == NULL) {
BAIDU_SCOPED_LOCK(s_trackme_mutex);
if (s_trackme_addr == NULL) {
// JPAAS has NAT capabilities, read its log to figure out the open port
// accessible from outside.
const int jpaas_port = ReadJPaasHostPort(pt.port);
Expand All @@ -126,7 +126,7 @@ void SetTrackMeAddress(butil::EndPoint pt) {
<< " instead of jpaas_container_port=" << pt.port;
pt.port = jpaas_port;
}
g_trackme_addr = new std::string(butil::endpoint2str(pt).c_str());
s_trackme_addr = new std::string(butil::endpoint2str(pt).c_str());
}
}

Expand All @@ -139,7 +139,7 @@ static void HandleTrackMeResponse(Controller* cntl, TrackMeResponse* res) {
cur_info.error_text = res->error_text();
bool already_reported = false;
{
BAIDU_SCOPED_LOCK(g_trackme_mutex);
BAIDU_SCOPED_LOCK(s_trackme_mutex);
if (g_bug_info != NULL && *g_bug_info == cur_info) {
// we've shown the bug.
already_reported = true;
Expand Down Expand Up @@ -187,10 +187,10 @@ static void HandleTrackMeResponse(Controller* cntl, TrackMeResponse* res) {
}

static void TrackMeNow(std::unique_lock<pthread_mutex_t>& mu) {
if (g_trackme_addr == NULL) {
if (s_trackme_addr == NULL) {
return;
}
if (g_trackme_chan == NULL) {
if (s_trackme_chan == NULL) {
Channel* chan = new (std::nothrow) Channel;
if (chan == NULL) {
LOG(FATAL) << "Fail to new trackme channel";
Expand All @@ -204,17 +204,17 @@ static void TrackMeNow(std::unique_lock<pthread_mutex_t>& mu) {
delete chan;
return;
}
g_trackme_chan = chan;
s_trackme_chan = chan;
}
mu.unlock();
TrackMeService_Stub stub(g_trackme_chan);
TrackMeService_Stub stub(s_trackme_chan);
TrackMeRequest req;
req.set_rpc_version(g_rpc_version);
req.set_server_addr(*g_trackme_addr);
req.set_server_addr(*s_trackme_addr);
TrackMeResponse* res = new TrackMeResponse;
Controller* cntl = new Controller;
cntl->set_request_code(policy::MurmurHash32(g_trackme_addr->data(), g_trackme_addr->size()));
google::protobuf::Closure* done =
cntl->set_request_code(policy::MurmurHash32(s_trackme_addr->data(), s_trackme_addr->size()));
google::protobuf::Closure* done =
::brpc::NewCallback(&HandleTrackMeResponse, cntl, res);
stub.TrackMe(cntl, &req, res, done);
}
Expand All @@ -226,15 +226,15 @@ void TrackMe() {
return;
}
int64_t now = butil::gettimeofday_us();
std::unique_lock<pthread_mutex_t> mu(g_trackme_mutex);
if (g_trackme_last_time == 0) {
// Delay the first ping randomly within s_trackme_interval. This
std::unique_lock<pthread_mutex_t> mu(s_trackme_mutex);
if (s_trackme_last_time == 0) {
// Delay the first ping randomly within s_trackme_interval. This
// protects trackme_server from ping storms.
g_trackme_last_time =
s_trackme_last_time =
now + butil::fast_rand_less_than(s_trackme_interval) * 1000000L;
}
if (now > g_trackme_last_time + 1000000L * s_trackme_interval) {
g_trackme_last_time = now;
if (now > s_trackme_last_time + 1000000L * s_trackme_interval) {
s_trackme_last_time = now;
return TrackMeNow(mu);
}
}
Expand Down
33 changes: 16 additions & 17 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) {
Expand All @@ -169,7 +169,6 @@ void TaskGroup::run_main_task() {
(name, &cumulated_cputime, 1));
}
}
// stop_main_task() was called.
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}
Expand All @@ -188,7 +187,7 @@ TaskGroup::TaskGroup(TaskControl* c)
, _nswitch(0)
, _last_context_remained(NULL)
, _last_context_remained_arg(NULL)
, _pl(NULL)
, _pl(NULL)
, _main_stack(NULL)
, _main_tid(0)
, _remote_num_nosignal(0)
Expand Down Expand Up @@ -275,7 +274,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
// user function is never called, the variables will be unchanged
// however they'd better reflect failures because the task is stopped
// abnormally.

// Meta and identifier of the task is persistent in this run.
TaskMeta* const m = g->_cur_meta;

Expand All @@ -286,25 +285,25 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
g->_control->exposed_pending_time() <<
(butil::cpuwide_time_ns() - m->cpuwide_start_ns) / 1000L;
}

// Not catch exceptions except ExitException which is for implementing
// bthread_exit(). User code is intended to crash when an exception is
// not caught explicitly. This is consistent with other threading
// bthread_exit(). User code is intended to crash when an exception is
// not caught explicitly. This is consistent with other threading
// libraries.
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
}

// Group is probably changed
g = tls_task_group;

// TODO: Save thread_return
(void)thread_return;

// Logging must be done before returning the keytable, since the logging lib
// Logging must be done before returning the keytable, since the logging lib
// use bthread local storage internally, or will cause memory leak.
// FIXME: the time from quiting fn to here is not counted into cputime
if (m->attr.flags & BTHREAD_LOG_START_AND_FINISH) {
Expand All @@ -322,7 +321,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
tls_bls.keytable = NULL;
m->local_storage.keytable = NULL; // optional
}

// Increase the version and wake up all joiners, if resulting version
// is 0, change it to 1 to make bthread_t never be 0. Any access
// or join to the bthread after changing version will be rejected.
Expand All @@ -338,9 +337,9 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
g->_control->_nbthreads << -1;
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);

} while (g->_cur_meta->tid != g->_main_tid);

// Was called from a pthread and we don't have BTHREAD_STACKTYPE_PTHREAD
// tasks to run, quit for more tasks.
}
Expand Down Expand Up @@ -591,7 +590,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;

// Logging must be done after switching the local storage, since the logging lib
// Logging must be done after switching the local storage, since the logging lib
// use bthread local storage internally, or will cause memory leak.
if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
(next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
Expand Down Expand Up @@ -741,7 +740,7 @@ void TaskGroup::_add_sleep_event(void* void_args) {
// will be gone.
SleepArgs e = *static_cast<SleepArgs*>(void_args);
TaskGroup* g = e.group;

TimerThread::TaskId sleep_id;
sleep_id = get_global_timer_thread()->schedule(
ready_to_run_from_timer_thread, void_args,
Expand All @@ -752,7 +751,7 @@ void TaskGroup::_add_sleep_event(void* void_args) {
g->ready_to_run(e.tid);
return;
}

// Set TaskMeta::current_sleep which is for interruption.
const uint32_t given_ver = get_version(e.tid);
{
Expand Down Expand Up @@ -931,7 +930,7 @@ void print_task(std::ostream& os, bthread_t tid) {
<< "\narg=" << (void*)arg
<< "\nattr={stack_type=" << attr.stack_type
<< " flags=" << attr.flags
<< " keytable_pool=" << attr.keytable_pool
<< " keytable_pool=" << attr.keytable_pool
<< "}\nhas_tls=" << has_tls
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
<< "\ncputime_ns=" << stat.cputime_ns
Expand Down

0 comments on commit b1f7ea0

Please sign in to comment.