Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
goliaro committed Nov 26, 2024
1 parent 784c8d9 commit ee48def
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 60 deletions.
26 changes: 26 additions & 0 deletions benchmarking/debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#! /usr/bin/env bash
set -x
set -e

# Cd into directory holding this script
cd "${BASH_SOURCE[0]%/*}/../build"

# MODEL_NAME="meta-llama/Llama-3.1-8B-Instruct"
MODEL_NAME="JackFram/llama-160m"
NGPUS=1

python ../inference/utils/download_hf_model.py $MODEL_NAME

export LEGION_BACKTRACE=1

./inference/incr_decoding/incr_decoding \
-ll:cpu 16 -ll:gpu $NGPUS -ll:util 16 \
-ll:fsize 20000 -ll:zsize 10000 \
--fusion \
-llm-model $MODEL_NAME \
-prompt ../benchmarking/test.json \
-tensor-parallelism-degree $NGPUS \
-log-file ../inference/output/test.out \
-output-file ../inference/output/test.json \
--max-requests-per-batch 1 --max-tokens-per-batch 3000 --max-sequence-length 3000

2 changes: 1 addition & 1 deletion include/flexflow/batch_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ using BeamSearchBatchConfigFuture = Legion::Future;
using TreeVerifyBatchConfigFuture = Legion::Future;
using BeamInferenceResultFuture = Legion::Future;
using FinetuningBwdFuture = Legion::Future;
using BatchConfigPairFuture = Legion::Future;
using ProcessWorkFromOldBatchesFuture = Legion::Future;

struct OptimizerTasks {
bool compute_gradients = true;
Expand Down
4 changes: 3 additions & 1 deletion include/flexflow/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ enum TaskIDs {
RM_LOAD_TOKENS_TASK_ID,
RM_LOAD_POSITION_TASK_ID,
RM_LOAD_BATCH_CONFIG_TASK_ID,
RM_PREPARE_NEXT_BATCH_TASK_ID,
RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID,
RM_PREPARE_NEXT_FWD_BATCH_TASK_ID,
RM_PREPARE_NEXT_BWD_BATCH_TASK_ID,
RM_PREPARE_NEXT_BATCH_INIT_TASK_ID,
RM_PREPARE_NEXT_BATCH_BEAM_TASK_ID,
RM_PREPARE_NEXT_BATCH_VERIFY_TASK_ID,
Expand Down
39 changes: 32 additions & 7 deletions include/flexflow/request_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,20 @@ class RequestManager {
BatchConfig prepare_next_bwd_batch();
BatchConfig prepare_next_fwd_batch(BatchConfig const &old_fwd_bc,
InferenceResult const &result);
BatchConfigPairFuture
prepare_next_batch(std::tuple<BatchConfigPairFuture,
InferenceResultFuture,
FinetuningBwdFuture> &batch_pipeline_entry,
Context ctx,
Runtime *runtime);
std::pair<BatchConfigFuture, BatchConfigFuture> prepare_next_batches(
std::tuple<BatchConfigFuture,
BatchConfigFuture,
InferenceResultFuture,
FinetuningBwdFuture> &batch_pipeline_entry,
Legion::Context ctx,
Legion::Runtime *runtime);
// BatchConfigPairFuture
// prepare_next_batch(std::tuple<BatchConfigPairFuture,
// InferenceResultFuture,
// FinetuningBwdFuture>
// &batch_pipeline_entry,
// Context ctx,
// Runtime *runtime);
// BatchConfig prepare_next_batch(BatchConfig const &bc,
// InferenceResult const &result);
// BatchConfigFuture prepare_next_batch(BatchConfigFuture const &bc,
Expand Down Expand Up @@ -328,7 +336,24 @@ class RequestManager {
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
static std::pair<BatchConfig, BatchConfig> prepare_next_batch_task(
// static std::pair<BatchConfig, BatchConfig> prepare_next_batch_task(
// Legion::Task const *task,
// std::vector<Legion::PhysicalRegion> const &regions,
// Legion::Context ctx,
// Legion::Runtime *runtime);
static void process_work_from_old_batches_task(
Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);

static BatchConfig prepare_next_fwd_batch_task(
Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);

static BatchConfig prepare_next_bwd_batch_task(
Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Expand Down
4 changes: 3 additions & 1 deletion src/mapper/mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ void FFMapper::select_task_options(const MapperContext ctx,
output.initial_proc = all_cpus[0];
return;
}
if ((task.task_id == RM_PREPARE_NEXT_BATCH_TASK_ID) ||
if ((task.task_id == RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID) ||
(task.task_id == RM_PREPARE_NEXT_FWD_BATCH_TASK_ID) ||
(task.task_id == RM_PREPARE_NEXT_BWD_BATCH_TASK_ID) ||
(task.task_id == RM_PREPARE_NEXT_BATCH_INIT_TASK_ID) ||
(task.task_id == RM_PREPARE_NEXT_BATCH_BEAM_TASK_ID) ||
(task.task_id == RM_PREPARE_NEXT_BATCH_VERIFY_TASK_ID) ||
Expand Down
59 changes: 50 additions & 9 deletions src/runtime/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4682,24 +4682,65 @@ void register_flexflow_internal_tasks(Runtime *runtime,
registrar);
}
}
// RequestManager prepare_next_batch
// RequestManager process_work_from_old_batches_task
{
TaskVariantRegistrar registrar(RM_PREPARE_NEXT_BATCH_TASK_ID,
"RequestManager Prepare Next Batch");
TaskVariantRegistrar registrar(
RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID,
"RequestManager Process Work from Old Batches");
registrar.add_constraint(ProcessorConstraint(Processor::LOC_PROC));
registrar.set_leaf();
if (pre_register) {
Runtime::preregister_task_variant<
std::pair<BatchConfig, BatchConfig>,
RequestManager::prepare_next_batch_task>(
registrar, "RequestManager Prepare Next Batch Task");
RequestManager::process_work_from_old_batches_task>(
registrar, "RequestManager Process Work from Old Batches Task");
} else {
if (enable_control_replication) {
registrar.global_registration = false;
}
runtime->register_task_variant<std::pair<BatchConfig, BatchConfig>,
RequestManager::prepare_next_batch_task>(
registrar);
runtime->register_task_variant<
RequestManager::process_work_from_old_batches_task>(registrar);
}
}
// RequestManager prepare_next_fwd_batch_task
{
TaskVariantRegistrar registrar(RM_PREPARE_NEXT_FWD_BATCH_TASK_ID,
"RequestManager Prepare Next FWB Batch");
registrar.add_constraint(ProcessorConstraint(Processor::LOC_PROC));
registrar.set_leaf();
if (pre_register) {
Runtime::preregister_task_variant<
BatchConfig,
RequestManager::prepare_next_fwd_batch_task>(
registrar, "RequestManager Prepare Next FWD Batch Task");
} else {
if (enable_control_replication) {
registrar.global_registration = false;
}
runtime
->register_task_variant<BatchConfig,
RequestManager::prepare_next_fwd_batch_task>(
registrar);
}
}
// RequestManager prepare_next_bwd_batch_task
{
TaskVariantRegistrar registrar(RM_PREPARE_NEXT_BWD_BATCH_TASK_ID,
"RequestManager Prepare Next BWD Batch");
registrar.add_constraint(ProcessorConstraint(Processor::LOC_PROC));
registrar.set_leaf();
if (pre_register) {
Runtime::preregister_task_variant<
BatchConfig,
RequestManager::prepare_next_bwd_batch_task>(
registrar, "RequestManager Prepare Next BWD Batch Task");
} else {
if (enable_control_replication) {
registrar.global_registration = false;
}
runtime
->register_task_variant<BatchConfig,
RequestManager::prepare_next_bwd_batch_task>(
registrar);
}
}
// RequestManager prepare_next_batch_beam
Expand Down
151 changes: 110 additions & 41 deletions src/runtime/request_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,23 +619,71 @@ size_t RequestManager::get_num_processed_requests() {
return num_processed_requests;
}

BatchConfigPairFuture RequestManager::prepare_next_batch(
std::tuple<BatchConfigPairFuture,
InferenceResultFuture,
FinetuningBwdFuture> &batch_pipeline_entry,
std::pair<BatchConfigFuture, BatchConfigFuture>
RequestManager::prepare_next_batches(
std::tuple<BatchConfigFuture,
BatchConfigFuture,
InferenceResultFuture,
FinetuningBwdFuture> &batch_pipeline_entry,
Context ctx,
Runtime *runtime) {
RequestManager *rm = this;
// Process work from old batchs
TaskLauncher launcher1(RM_PROCESS_WORK_FROM_OLD_BATCHES_TASK_ID,
TaskArgument(&rm, sizeof(RequestManager *)));
launcher1.add_future(std::get<0>(batch_pipeline_entry));
launcher1.add_future(std::get<1>(batch_pipeline_entry));
launcher1.add_future(std::get<2>(batch_pipeline_entry));
launcher1.add_future(std::get<3>(batch_pipeline_entry));
ProcessWorkFromOldBatchesFuture pwfobf =
runtime->execute_task(ctx, launcher1);
// Build new FWD batch
TaskLauncher launcher2(RM_PREPARE_NEXT_FWD_BATCH_TASK_ID,
TaskArgument(&rm, sizeof(RequestManager *)));
launcher2.add_future(std::get<0>(batch_pipeline_entry));
launcher2.add_future(std::get<1>(batch_pipeline_entry));
launcher2.add_future(std::get<2>(batch_pipeline_entry));
launcher2.add_future(std::get<3>(batch_pipeline_entry));
launcher2.add_future(pwfobf);
BatchConfigFuture bcff = runtime->execute_task(ctx, launcher2);
// Build new BWD batch
TaskLauncher launcher3(RM_PREPARE_NEXT_BWD_BATCH_TASK_ID,
TaskArgument(&rm, sizeof(RequestManager *)));
launcher3.add_future(std::get<0>(batch_pipeline_entry));
launcher3.add_future(std::get<1>(batch_pipeline_entry));
launcher3.add_future(std::get<2>(batch_pipeline_entry));
launcher3.add_future(std::get<3>(batch_pipeline_entry));
launcher3.add_future(pwfobf);
BatchConfigFuture bcbf = runtime->execute_task(ctx, launcher3);
// return pair of batch futures
return std::make_pair(bcff, bcbf);
}

// future[0]: old_fwd_bc
// future[1]: old_bwd_bc
// future[2]: inference result
// future[3]: wait for bwd to finish
void RequestManager::process_work_from_old_batches_task(
Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Runtime *runtime) {
RequestManager *rm = this;
TaskLauncher launcher(RM_PREPARE_NEXT_BATCH_TASK_ID,
TaskArgument(&rm, sizeof(RequestManager *)));
launcher.add_future(std::get<0>(batch_pipeline_entry));
launcher.add_future(std::get<1>(batch_pipeline_entry));
launcher.add_future(std::get<2>(batch_pipeline_entry));
// launcher.add_future(std::get<3>(batch_pipeline_entry));
return runtime->execute_task(ctx, launcher);

RequestManager *rm = *((RequestManager **)task->args);
BatchConfig const *old_fwd_bc = BatchConfig::from_future(task->futures[0]);
BatchConfig const *old_bwd_bc = BatchConfig::from_future(task->futures[1]);
InferenceResult const &result =
Future(task->futures[2]).get_result<InferenceResult>();
Future(task->futures[3]).get_void_result(); // wait until bwd is done
rm->process_work_from_old_batches(*old_fwd_bc, *old_bwd_bc, result);
}

std::pair<BatchConfig, BatchConfig> RequestManager::prepare_next_batch_task(
// future[0]: old_fwd_bc
// future[1]: old_bwd_bc
// future[2]: inference result
// future[3]: wait for bwd to finish
// future[4]: wait for process_work_from_old_batches to finish
BatchConfig RequestManager::prepare_next_fwd_batch_task(
Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Expand All @@ -645,11 +693,31 @@ std::pair<BatchConfig, BatchConfig> RequestManager::prepare_next_batch_task(
BatchConfig const *old_bwd_bc = BatchConfig::from_future(task->futures[1]);
InferenceResult const &result =
Future(task->futures[2]).get_result<InferenceResult>();
Future(task->futures[3]).get_void_result();
rm->process_work_from_old_batches(*old_fwd_bc, *old_bwd_bc, result);
BatchConfig new_fwd_bc = rm->prepare_next_fwd_batch(*old_fwd_bc, result);
BatchConfig new_bwd_bc = rm->prepare_next_bwd_batch();
return std::make_pair(new_fwd_bc, new_bwd_bc);
Future(task->futures[3]).get_void_result(); // wait until bwd is done
Future(task->futures[4])
.get_void_result(); // wait until process_work_from_old_batches is done
return rm->prepare_next_fwd_batch(*old_fwd_bc, result);
}

// future[0]: old_fwd_bc
// future[1]: old_bwd_bc
// future[2]: inference result
// future[3]: wait for bwd to finish
// future[4]: wait for process_work_from_old_batches to finish
BatchConfig RequestManager::prepare_next_bwd_batch_task(
Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Runtime *runtime) {
RequestManager *rm = *((RequestManager **)task->args);
BatchConfig const *old_fwd_bc = BatchConfig::from_future(task->futures[0]);
BatchConfig const *old_bwd_bc = BatchConfig::from_future(task->futures[1]);
InferenceResult const &result =
Future(task->futures[2]).get_result<InferenceResult>();
Future(task->futures[3]).get_void_result(); // wait until bwd is done
Future(task->futures[4])
.get_void_result(); // wait until process_work_from_old_batches is done
return rm->prepare_next_bwd_batch();
}

bool RequestManager::is_eos_token(int token_id) {
Expand Down Expand Up @@ -3256,43 +3324,45 @@ void RequestManager::serve_incr_decoding(FFModel *llm) {
// init operators
im->init_operators_inference(llm);
// Legion futures for inc_decoding and spec_infer
// BatchConfigFuture last_bcf_fwd, last_bcf_bwd;
BatchConfigPairFuture last_bcf;
BatchConfigFuture last_bcf_fwd, last_bcf_bwd;
InferenceResultFuture last_irf;
FinetuningBwdFuture last_bwd_f;
{
// Initialize futures for incr decoding
BatchConfig bc_fwd, bc_bwd;
InferenceResult ir;
// last_bcf_fwd = Future::from_value<BatchConfig>(bc_fwd);
// last_bcf_bwd = Future::from_value<BatchConfig>(bc_bwd);
last_bcf = Future::from_value<std::pair<BatchConfig, BatchConfig>>(
std::make_pair(bc_fwd, bc_bwd));
last_bcf_fwd = Future::from_value<BatchConfig>(bc_fwd);
last_bcf_bwd = Future::from_value<BatchConfig>(bc_bwd);
last_irf = Future::from_value<InferenceResult>(ir);
last_bwd_f = Future::from_value<bool>(true);
}

std::queue<std::tuple<BatchConfigPairFuture,
std::queue<std::tuple<BatchConfigFuture,
BatchConfigFuture,
InferenceResultFuture,
FinetuningBwdFuture>>
batch_pipeline;
// tuple[0]: std::pair<fwd batch, bwd batch>
// tuple[1]: inference result
// tuple[2]: bwd future
{ batch_pipeline.push(std::make_tuple(last_bcf, last_irf, last_bwd_f)); }
// tuple[0]: fwd batch
// tuple[1]: bwd batch
// tuple[2]: inference result
// tuple[3]: bwd future
{
batch_pipeline.push(
std::make_tuple(last_bcf_fwd, last_bcf_bwd, last_irf, last_bwd_f));
}

while (!is_background_server_terminated()) {

if (batch_pipeline.size() >= 4) {
// Block here to avoid launching too many batches
auto const &batch = batch_pipeline.front();
std::get<1>(batch).get_void_result();
std::get<2>(batch).get_void_result();
std::get<3>(batch).get_void_result();
}
// deque finished batches
while (batch_pipeline.size() > 1) {
auto const &batch = batch_pipeline.front();
if (std::get<1>(batch).is_ready() && std::get<2>(batch).is_ready()) {
if (std::get<2>(batch).is_ready() && std::get<3>(batch).is_ready()) {
batch_pipeline.pop();
} else {
break;
Expand All @@ -3301,16 +3371,15 @@ void RequestManager::serve_incr_decoding(FFModel *llm) {

runtime->begin_trace(ctx, 12346 /*trace_id*/);
auto &batch_pipeline_entry = batch_pipeline.back();
BatchConfigPairFuture bcf =
prepare_next_batch(batch_pipeline_entry, ctx, runtime);
// BatchConfigFuture bcf_fwd = next_batches.first;
// BatchConfigFuture bcf_bwd = next_batches.second;
InferenceResultFuture irf = im->inference(llm, 0, bcf);
FinetuningBwdFuture bwd_f = im->peft_bwd(llm, 0, bcf);
batch_pipeline.push(std::make_tuple(bcf, irf, bwd_f));
// last_bcf_fwd = bcf_fwd;
// last_bcf_bwd = bcf_bwd;
last_bcf = bcf;
std::pair<BatchConfigFuture, BatchConfigFuture> next_batches =
prepare_next_batches(batch_pipeline_entry, ctx, runtime);
BatchConfigFuture bcf_fwd = next_batches.first;
BatchConfigFuture bcf_bwd = next_batches.second;
InferenceResultFuture irf = im->inference(llm, 0, bcf_fwd);
FinetuningBwdFuture bwd_f = im->peft_bwd(llm, 0, bcf_bwd);
batch_pipeline.push(std::make_tuple(bcf_fwd, bcf_bwd, irf, bwd_f));
last_bcf_fwd = bcf_fwd;
last_bcf_bwd = bcf_bwd;
last_irf = irf;
last_bwd_f = bwd_f;
runtime->end_trace(ctx, 12346 /*trace_id*/);
Expand Down

0 comments on commit ee48def

Please sign in to comment.