diff --git a/benchmarking/debug.sh b/benchmarking/debug.sh new file mode 100755 index 0000000000..300bf80df2 --- /dev/null +++ b/benchmarking/debug.sh @@ -0,0 +1,26 @@ +#! /usr/bin/env bash +set -x +set -e + +# Cd into directory holding this script +cd "${BASH_SOURCE[0]%/*}/../build" + +# MODEL_NAME="meta-llama/Llama-3.1-8B-Instruct" +MODEL_NAME="JackFram/llama-160m" +NGPUS=1 + +python ../inference/utils/download_hf_model.py $MODEL_NAME + +export LEGION_BACKTRACE=1 + +./inference/incr_decoding/incr_decoding \ + -ll:cpu 16 -ll:gpu $NGPUS -ll:util 16 \ + -ll:fsize 20000 -ll:zsize 10000 \ + --fusion \ + -llm-model $MODEL_NAME \ + -prompt ../benchmarking/test.json \ + -tensor-parallelism-degree $NGPUS \ + -log-file ../inference/output/test.out \ + -output-file ../inference/output/test.json \ + --max-requests-per-batch 1 --max-tokens-per-batch 3000 --max-sequence-length 3000 + diff --git a/include/flexflow/batch_config.h b/include/flexflow/batch_config.h index fbf7ca751e..e3c0e50396 100644 --- a/include/flexflow/batch_config.h +++ b/include/flexflow/batch_config.h @@ -38,7 +38,7 @@ using BeamSearchBatchConfigFuture = Legion::Future; using TreeVerifyBatchConfigFuture = Legion::Future; using BeamInferenceResultFuture = Legion::Future; using FinetuningBwdFuture = Legion::Future; -using BatchConfigPairFuture = Legion::Future; +using ProcessWorkFromOldBatchesFuture = Legion::Future; struct OptimizerTasks { bool compute_gradients = true; diff --git a/include/flexflow/model.h b/include/flexflow/model.h index e352159af0..3bc2d03896 100644 --- a/include/flexflow/model.h +++ b/include/flexflow/model.h @@ -273,7 +273,9 @@ enum TaskIDs { RM_LOAD_TOKENS_TASK_ID, RM_LOAD_POSITION_TASK_ID, RM_LOAD_BATCH_CONFIG_TASK_ID, - RM_PREPARE_NEXT_BATCH_TASK_ID, + RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID, + RM_PREPARE_NEXT_FWD_BATCH_TASK_ID, + RM_PREPARE_NEXT_BWD_BATCH_TASK_ID, RM_PREPARE_NEXT_BATCH_INIT_TASK_ID, RM_PREPARE_NEXT_BATCH_BEAM_TASK_ID, RM_PREPARE_NEXT_BATCH_VERIFY_TASK_ID, diff --git a/include/flexflow/request_manager.h b/include/flexflow/request_manager.h index 9baac66869..f2d97af89f 100644 --- a/include/flexflow/request_manager.h +++ b/include/flexflow/request_manager.h @@ -245,12 +245,20 @@ class RequestManager { BatchConfig prepare_next_bwd_batch(); BatchConfig prepare_next_fwd_batch(BatchConfig const &old_fwd_bc, InferenceResult const &result); - BatchConfigPairFuture - prepare_next_batch(std::tuple &batch_pipeline_entry, - Context ctx, - Runtime *runtime); + std::pair prepare_next_batches( + std::tuple &batch_pipeline_entry, + Legion::Context ctx, + Legion::Runtime *runtime); + // BatchConfigPairFuture + // prepare_next_batch(std::tuple + // &batch_pipeline_entry, + // Context ctx, + // Runtime *runtime); // BatchConfig prepare_next_batch(BatchConfig const &bc, // InferenceResult const &result); // BatchConfigFuture prepare_next_batch(BatchConfigFuture const &bc, @@ -328,7 +336,24 @@ class RequestManager { std::vector const ®ions, Legion::Context ctx, Legion::Runtime *runtime); - static std::pair prepare_next_batch_task( + // static std::pair prepare_next_batch_task( + // Legion::Task const *task, + // std::vector const ®ions, + // Legion::Context ctx, + // Legion::Runtime *runtime); + static void process_work_from_old_batches_task( + Legion::Task const *task, + std::vector const ®ions, + Legion::Context ctx, + Legion::Runtime *runtime); + + static BatchConfig prepare_next_fwd_batch_task( + Legion::Task const *task, + std::vector const ®ions, + Legion::Context ctx, + Legion::Runtime *runtime); + + static BatchConfig prepare_next_bwd_batch_task( Legion::Task const *task, std::vector const ®ions, Legion::Context ctx, diff --git a/src/mapper/mapper.cc b/src/mapper/mapper.cc index c02f70f752..e684f1a9ae 100644 --- a/src/mapper/mapper.cc +++ b/src/mapper/mapper.cc @@ -280,7 +280,9 @@ void FFMapper::select_task_options(const MapperContext ctx, output.initial_proc = all_cpus[0]; return; } - if ((task.task_id == RM_PREPARE_NEXT_BATCH_TASK_ID) || + if ((task.task_id == RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID) || + (task.task_id == RM_PREPARE_NEXT_FWD_BATCH_TASK_ID) || + (task.task_id == RM_PREPARE_NEXT_BWD_BATCH_TASK_ID) || (task.task_id == RM_PREPARE_NEXT_BATCH_INIT_TASK_ID) || (task.task_id == RM_PREPARE_NEXT_BATCH_BEAM_TASK_ID) || (task.task_id == RM_PREPARE_NEXT_BATCH_VERIFY_TASK_ID) || diff --git a/src/runtime/model.cc b/src/runtime/model.cc index 292ed6cab1..5e2a188410 100644 --- a/src/runtime/model.cc +++ b/src/runtime/model.cc @@ -4682,24 +4682,65 @@ void register_flexflow_internal_tasks(Runtime *runtime, registrar); } } - // RequestManager prepare_next_batch + // RequestManager process_work_from_old_batches_task { - TaskVariantRegistrar registrar(RM_PREPARE_NEXT_BATCH_TASK_ID, - "RequestManager Prepare Next Batch"); + TaskVariantRegistrar registrar( + RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID, + "RequestManager Process Work from Old Batches"); registrar.add_constraint(ProcessorConstraint(Processor::LOC_PROC)); registrar.set_leaf(); if (pre_register) { Runtime::preregister_task_variant< - std::pair, - RequestManager::prepare_next_batch_task>( - registrar, "RequestManager Prepare Next Batch Task"); + RequestManager::process_work_from_old_batches_task>( + registrar, "RequestManager Process Work from Old Batches Task"); } else { if (enable_control_replication) { registrar.global_registration = false; } - runtime->register_task_variant, - RequestManager::prepare_next_batch_task>( - registrar); + runtime->register_task_variant< + RequestManager::process_work_from_old_batches_task>(registrar); + } + } + // RequestManager prepare_next_fwd_batch_task + { + TaskVariantRegistrar registrar(RM_PREPARE_NEXT_FWD_BATCH_TASK_ID, + "RequestManager Prepare Next FWB Batch"); + registrar.add_constraint(ProcessorConstraint(Processor::LOC_PROC)); + registrar.set_leaf(); + if (pre_register) { + Runtime::preregister_task_variant< + BatchConfig, + RequestManager::prepare_next_fwd_batch_task>( + registrar, "RequestManager Prepare Next FWD Batch Task"); + } else { + if (enable_control_replication) { + registrar.global_registration = false; + } + runtime + ->register_task_variant( + registrar); + } + } + // RequestManager prepare_next_bwd_batch_task + { + TaskVariantRegistrar registrar(RM_PREPARE_NEXT_BWD_BATCH_TASK_ID, + "RequestManager Prepare Next BWD Batch"); + registrar.add_constraint(ProcessorConstraint(Processor::LOC_PROC)); + registrar.set_leaf(); + if (pre_register) { + Runtime::preregister_task_variant< + BatchConfig, + RequestManager::prepare_next_bwd_batch_task>( + registrar, "RequestManager Prepare Next BWD Batch Task"); + } else { + if (enable_control_replication) { + registrar.global_registration = false; + } + runtime + ->register_task_variant( + registrar); } } // RequestManager prepare_next_batch_beam diff --git a/src/runtime/request_manager.cc b/src/runtime/request_manager.cc index 143a23aba3..c33c0c6e5e 100644 --- a/src/runtime/request_manager.cc +++ b/src/runtime/request_manager.cc @@ -619,23 +619,71 @@ size_t RequestManager::get_num_processed_requests() { return num_processed_requests; } -BatchConfigPairFuture RequestManager::prepare_next_batch( - std::tuple &batch_pipeline_entry, +std::pair + RequestManager::prepare_next_batches( + std::tuple &batch_pipeline_entry, + Context ctx, + Runtime *runtime) { + RequestManager *rm = this; + // Process work from old batchs + TaskLauncher launcher1(RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID, + TaskArgument(&rm, sizeof(RequestManager *))); + launcher1.add_future(std::get<0>(batch_pipeline_entry)); + launcher1.add_future(std::get<1>(batch_pipeline_entry)); + launcher1.add_future(std::get<2>(batch_pipeline_entry)); + launcher1.add_future(std::get<3>(batch_pipeline_entry)); + ProcessWorkFromOldBatchesFuture pwfobf = + runtime->execute_task(ctx, launcher1); + // Build new FWD batch + TaskLauncher launcher2(RM_PREPARE_NEXT_FWD_BATCH_TASK_ID, + TaskArgument(&rm, sizeof(RequestManager *))); + launcher2.add_future(std::get<0>(batch_pipeline_entry)); + launcher2.add_future(std::get<1>(batch_pipeline_entry)); + launcher2.add_future(std::get<2>(batch_pipeline_entry)); + launcher2.add_future(std::get<3>(batch_pipeline_entry)); + launcher2.add_future(pwfobf); + BatchConfigFuture bcff = runtime->execute_task(ctx, launcher2); + // Build new BWD batch + TaskLauncher launcher3(RM_PREPARE_NEXT_BWD_BATCH_TASK_ID, + TaskArgument(&rm, sizeof(RequestManager *))); + launcher3.add_future(std::get<0>(batch_pipeline_entry)); + launcher3.add_future(std::get<1>(batch_pipeline_entry)); + launcher3.add_future(std::get<2>(batch_pipeline_entry)); + launcher3.add_future(std::get<3>(batch_pipeline_entry)); + launcher3.add_future(pwfobf); + BatchConfigFuture bcbf = runtime->execute_task(ctx, launcher3); + // return pair of batch futures + return std::make_pair(bcff, bcbf); +} + +// future[0]: old_fwd_bc +// future[1]: old_bwd_bc +// future[2]: inference result +// future[3]: wait for bwd to finish +void RequestManager::process_work_from_old_batches_task( + Task const *task, + std::vector const ®ions, Context ctx, Runtime *runtime) { - RequestManager *rm = this; - TaskLauncher launcher(RM_PREPARE_NEXT_BATCH_TASK_ID, - TaskArgument(&rm, sizeof(RequestManager *))); - launcher.add_future(std::get<0>(batch_pipeline_entry)); - launcher.add_future(std::get<1>(batch_pipeline_entry)); - launcher.add_future(std::get<2>(batch_pipeline_entry)); - // launcher.add_future(std::get<3>(batch_pipeline_entry)); - return runtime->execute_task(ctx, launcher); + + RequestManager *rm = *((RequestManager **)task->args); + BatchConfig const *old_fwd_bc = BatchConfig::from_future(task->futures[0]); + BatchConfig const *old_bwd_bc = BatchConfig::from_future(task->futures[1]); + InferenceResult const &result = + Future(task->futures[2]).get_result(); + Future(task->futures[3]).get_void_result(); // wait until bwd is done + rm->process_work_from_old_batches(*old_fwd_bc, *old_bwd_bc, result); } -std::pair RequestManager::prepare_next_batch_task( +// future[0]: old_fwd_bc +// future[1]: old_bwd_bc +// future[2]: inference result +// future[3]: wait for bwd to finish +// future[4]: wait for process_work_from_old_batches to finish +BatchConfig RequestManager::prepare_next_fwd_batch_task( Task const *task, std::vector const ®ions, Context ctx, @@ -645,11 +693,31 @@ std::pair RequestManager::prepare_next_batch_task( BatchConfig const *old_bwd_bc = BatchConfig::from_future(task->futures[1]); InferenceResult const &result = Future(task->futures[2]).get_result(); - Future(task->futures[3]).get_void_result(); - rm->process_work_from_old_batches(*old_fwd_bc, *old_bwd_bc, result); - BatchConfig new_fwd_bc = rm->prepare_next_fwd_batch(*old_fwd_bc, result); - BatchConfig new_bwd_bc = rm->prepare_next_bwd_batch(); - return std::make_pair(new_fwd_bc, new_bwd_bc); + Future(task->futures[3]).get_void_result(); // wait until bwd is done + Future(task->futures[4]) + .get_void_result(); // wait until process_work_from_old_batches is done + return rm->prepare_next_fwd_batch(*old_fwd_bc, result); +} + +// future[0]: old_fwd_bc +// future[1]: old_bwd_bc +// future[2]: inference result +// future[3]: wait for bwd to finish +// future[4]: wait for process_work_from_old_batches to finish +BatchConfig RequestManager::prepare_next_bwd_batch_task( + Task const *task, + std::vector const ®ions, + Context ctx, + Runtime *runtime) { + RequestManager *rm = *((RequestManager **)task->args); + BatchConfig const *old_fwd_bc = BatchConfig::from_future(task->futures[0]); + BatchConfig const *old_bwd_bc = BatchConfig::from_future(task->futures[1]); + InferenceResult const &result = + Future(task->futures[2]).get_result(); + Future(task->futures[3]).get_void_result(); // wait until bwd is done + Future(task->futures[4]) + .get_void_result(); // wait until process_work_from_old_batches is done + return rm->prepare_next_bwd_batch(); } bool RequestManager::is_eos_token(int token_id) { @@ -3256,43 +3324,45 @@ void RequestManager::serve_incr_decoding(FFModel *llm) { // init operators im->init_operators_inference(llm); // Legion futures for inc_decoding and spec_infer - // BatchConfigFuture last_bcf_fwd, last_bcf_bwd; - BatchConfigPairFuture last_bcf; + BatchConfigFuture last_bcf_fwd, last_bcf_bwd; InferenceResultFuture last_irf; FinetuningBwdFuture last_bwd_f; { // Initialize futures for incr decoding BatchConfig bc_fwd, bc_bwd; InferenceResult ir; - // last_bcf_fwd = Future::from_value(bc_fwd); - // last_bcf_bwd = Future::from_value(bc_bwd); - last_bcf = Future::from_value>( - std::make_pair(bc_fwd, bc_bwd)); + last_bcf_fwd = Future::from_value(bc_fwd); + last_bcf_bwd = Future::from_value(bc_bwd); last_irf = Future::from_value(ir); last_bwd_f = Future::from_value(true); } - std::queue> batch_pipeline; - // tuple[0]: std::pair - // tuple[1]: inference result - // tuple[2]: bwd future - { batch_pipeline.push(std::make_tuple(last_bcf, last_irf, last_bwd_f)); } + // tuple[0]: fwd batch + // tuple[1]: bwd batch + // tuple[2]: inference result + // tuple[3]: bwd future + { + batch_pipeline.push( + std::make_tuple(last_bcf_fwd, last_bcf_bwd, last_irf, last_bwd_f)); + } while (!is_background_server_terminated()) { if (batch_pipeline.size() >= 4) { // Block here to avoid launching too many batches auto const &batch = batch_pipeline.front(); - std::get<1>(batch).get_void_result(); std::get<2>(batch).get_void_result(); + std::get<3>(batch).get_void_result(); } // deque finished batches while (batch_pipeline.size() > 1) { auto const &batch = batch_pipeline.front(); - if (std::get<1>(batch).is_ready() && std::get<2>(batch).is_ready()) { + if (std::get<2>(batch).is_ready() && std::get<3>(batch).is_ready()) { batch_pipeline.pop(); } else { break; @@ -3301,16 +3371,15 @@ void RequestManager::serve_incr_decoding(FFModel *llm) { runtime->begin_trace(ctx, 12346 /*trace_id*/); auto &batch_pipeline_entry = batch_pipeline.back(); - BatchConfigPairFuture bcf = - prepare_next_batch(batch_pipeline_entry, ctx, runtime); - // BatchConfigFuture bcf_fwd = next_batches.first; - // BatchConfigFuture bcf_bwd = next_batches.second; - InferenceResultFuture irf = im->inference(llm, 0, bcf); - FinetuningBwdFuture bwd_f = im->peft_bwd(llm, 0, bcf); - batch_pipeline.push(std::make_tuple(bcf, irf, bwd_f)); - // last_bcf_fwd = bcf_fwd; - // last_bcf_bwd = bcf_bwd; - last_bcf = bcf; + std::pair next_batches = + prepare_next_batches(batch_pipeline_entry, ctx, runtime); + BatchConfigFuture bcf_fwd = next_batches.first; + BatchConfigFuture bcf_bwd = next_batches.second; + InferenceResultFuture irf = im->inference(llm, 0, bcf_fwd); + FinetuningBwdFuture bwd_f = im->peft_bwd(llm, 0, bcf_bwd); + batch_pipeline.push(std::make_tuple(bcf_fwd, bcf_bwd, irf, bwd_f)); + last_bcf_fwd = bcf_fwd; + last_bcf_bwd = bcf_bwd; last_irf = irf; last_bwd_f = bwd_f; runtime->end_trace(ctx, 12346 /*trace_id*/);