Skip to content

Commit

Permalink
manually update worker.cpp and worker.h
Browse files Browse the repository at this point in the history
  • Loading branch information
swjz committed Nov 13, 2018
1 parent 584dc5a commit 9b2457f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 32 deletions.
94 changes: 63 additions & 31 deletions scanner/engine/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,8 +854,12 @@ void WorkerImpl::stop_job_processor() {

bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
proto::Result* job_result) {
timepoint_t base_time(std::chrono::nanoseconds(job_params->base_time()));

Profiler profiler(base_time);
job_result->set_success(true);

auto setup_ops_start = now();
// Load Ops, register Ops, and register python kernels before running jobs
{
proto::Empty empty;
Expand Down Expand Up @@ -913,6 +917,7 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
}
}
}
profiler.add_interval("process_job:ops_setup", setup_ops_start, now());

auto finished_fn = [&]() {
if (!trigger_shutdown_.raised()) {
Expand Down Expand Up @@ -952,7 +957,6 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
// Controls if work should be distributed roundrobin or dynamically
bool distribute_work_dynamically = true;

timepoint_t base_time(std::chrono::nanoseconds(job_params->base_time()));
const i32 work_packet_size = job_params->work_packet_size();
const i32 io_packet_size = job_params->io_packet_size() != -1
? job_params->io_packet_size()
Expand All @@ -966,10 +970,11 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
job_params->ops().end());


auto meta_cache_start = now();
// Setup table metadata cache for use in other operations
DatabaseMetadata meta(job_params->db_meta());
TableMetaCache table_meta(storage_, meta);

profiler.add_interval("process_job:cache_table_metadata", meta_cache_start, now());
// Perform analysis on the graph to determine:
//
// - populate_analysis_info: Analayze the graph to build lookup structures to
Expand All @@ -985,6 +990,7 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
// - remap_input_op_edges: Remap multiple inputs to a single input op
//
// - perform_liveness_analysis: When to retire elements (liveness analysis)
auto dag_analysis_start = now();
DAGAnalysisInfo analysis_results;
populate_analysis_info(ops, analysis_results);
// Need slice input rows to know which slice we are in
Expand Down Expand Up @@ -1023,9 +1029,11 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
final_compression_options.push_back(o);
}
assert(final_output_columns.size() == final_compression_options.size());
profiler.add_interval("process_job:dag_analysis", dag_analysis_start, now());

// Setup kernel factories and the kernel configs that will be used
// to instantiate instances of the op pipeline
auto pipeline_setup_start = now();
KernelRegistry* kernel_registry = get_kernel_registry();
std::vector<KernelFactory*> kernel_factories;
std::vector<KernelConfig> kernel_configs;
Expand Down Expand Up @@ -1302,31 +1310,6 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
return false;
}

// Set up memory pool if different than previous memory pool
if (!memory_pool_initialized_ ||
job_params->memory_pool_config() != cached_memory_pool_config_) {
if (db_params_.num_cpus < pipeline_instances_per_node &&
job_params->memory_pool_config().cpu().use_pool()) {
RESULT_ERROR(job_result,
"Cannot oversubscribe CPUs and also use CPU memory pool");
finished_fn();
return false;
}
if (db_params_.gpu_ids.size() < pipeline_instances_per_node &&
job_params->memory_pool_config().gpu().use_pool()) {
RESULT_ERROR(job_result,
"Cannot oversubscribe GPUs and also use GPU memory pool");
finished_fn();
return false;
}
if (memory_pool_initialized_) {
destroy_memory_allocators();
}
init_memory_allocators(job_params->memory_pool_config(), gpu_ids);
cached_memory_pool_config_ = job_params->memory_pool_config();
memory_pool_initialized_ = true;
}

// Setup source factories and source configs that will be used
// to instantiate load worker instances
std::vector<SourceFactory*> source_factories;
Expand Down Expand Up @@ -1403,11 +1386,41 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
std::vector<u8>(so.args().begin(), so.args().end());
}
}
profiler.add_interval("process_job:pipeline_setup", pipeline_setup_start, now());

auto memory_pool_start = now();
// Set up memory pool if different than previous memory pool
if (!memory_pool_initialized_ ||
job_params->memory_pool_config() != cached_memory_pool_config_) {
if (db_params_.num_cpus < pipeline_instances_per_node &&
job_params->memory_pool_config().cpu().use_pool()) {
RESULT_ERROR(job_result,
"Cannot oversubscribe CPUs and also use CPU memory pool");
finished_fn();
return false;
}
if (db_params_.gpu_ids.size() < pipeline_instances_per_node &&
job_params->memory_pool_config().gpu().use_pool()) {
RESULT_ERROR(job_result,
"Cannot oversubscribe GPUs and also use GPU memory pool");
finished_fn();
return false;
}
if (memory_pool_initialized_) {
destroy_memory_allocators();
}
init_memory_allocators(job_params->memory_pool_config(), gpu_ids);
cached_memory_pool_config_ = job_params->memory_pool_config();
memory_pool_initialized_ = true;
}
profiler.add_interval("process_job:create_memory_pool", memory_pool_start, now());


#ifdef __linux__
omp_set_num_threads(std::thread::hardware_concurrency());
#endif

auto pipeline_create_start = now();
// Setup shared resources for distributing work to processing threads
i64 accepted_tasks = 0;
LoadInputQueue load_work;
Expand Down Expand Up @@ -1649,12 +1662,16 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
std::ref(retired_tasks), args);
}

profiler.add_interval("process_job:create_pipelines", pipeline_create_start, now());

if (job_params->profiling()) {
auto wait_for_others_start = now();
// Wait until all evaluate workers have started up
std::unique_lock<std::mutex> lk(startup_lock);
startup_cv.wait(lk, [&] {
return eval_total == startup_count;
});
profiler.add_interval("process_job:wait_for_pipelines", wait_for_others_start, now());
}

timepoint_t start_time = now();
Expand All @@ -1667,8 +1684,10 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
// This keeps track of the last time we received a "wait_for_work" message
// from the master. If less than 1 second have passed since this message, we
// shouldn't ask the master for more work to avoid overloading it.
const int SECONDS_TO_WAIT = 1;
auto last_wait_for_work_time = now() - std::chrono::seconds(SECONDS_TO_WAIT);
const int MILLISECONDS_TO_WAIT_ALPHA = 20;
const int MILLISECONDS_TO_WAIT_BETA = 1000;
const int MILLISECONDS_TO_WAIT_RAMP = 15000;
auto last_wait_for_work_time = now() - std::chrono::milliseconds(MILLISECONDS_TO_WAIT_ALPHA);
while (true) {
if (trigger_shutdown_.raised()) {
// Abandon ship!
Expand Down Expand Up @@ -1740,10 +1759,18 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
}
// If local amount of work is less than the amount of work we want
// queued up, then ask the master for more work.
i32 milliseconds_since_start = ms_since(start_time);
i32 milliseconds_to_wait = std::max(
MILLISECONDS_TO_WAIT_ALPHA,
std::min(
MILLISECONDS_TO_WAIT_BETA,
(MILLISECONDS_TO_WAIT_BETA - MILLISECONDS_TO_WAIT_ALPHA) *
(milliseconds_since_start / MILLISECONDS_TO_WAIT_RAMP) +
MILLISECONDS_TO_WAIT_ALPHA));
i32 local_work = accepted_tasks - total_tasks_processed;
if (local_work <
pipeline_instances_per_node * job_params->tasks_in_queue_per_pu() &&
seconds_since(last_wait_for_work_time) > SECONDS_TO_WAIT) {
ms_since(last_wait_for_work_time) > milliseconds_to_wait) {
proto::NextWorkRequest node_info;
node_info.set_node_id(node_id_);
node_info.set_bulk_job_id(active_bulk_job_id_);
Expand Down Expand Up @@ -1995,6 +2022,11 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
s_write(profiler_output.get(), end_time_ns);

i64 out_rank = node_id_;

// Write process job profiler
write_profiler_to_file(profiler_output.get(), out_rank, "process_job", "", 0,
profiler);

// Load worker profilers
u8 load_worker_count = num_load_workers;
s_write(profiler_output.get(), load_worker_count);
Expand Down Expand Up @@ -2049,4 +2081,4 @@ bool WorkerImpl::process_job(const proto::BulkJobParameters* job_params,
}

}
}
}
2 changes: 1 addition & 1 deletion scanner/engine/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ class WorkerImpl final : public proto::Worker::Service {
std::mutex work_mutex_;
};
}
}
}

0 comments on commit 9b2457f

Please sign in to comment.