Skip to content

Commit

Permalink
Add a background server for RequestManager (#1223)
Browse files Browse the repository at this point in the history
* add a background server for RequestManager

* .

* make incr_decoding work

* make spec_infer work

* format

* update python inference

* fix python issues

* bug fix

* add a Legion future to capture the termination of the background server

* Add thread safety for background server.

* Simplify backend server design.

* resolve conflict.

* Add server task timeout.

* register callbacks to terminate background worker at exit or termination

* [Python] enable decoding multiple requests

* update README.md and default configuration

* [Python] no need to use the llm context environment to start/stop the background server

* require at least four cpu cores

* [Python] add back explict start_server()/stop_server().

* fix

* fix python chatgpt.json

---------

Co-authored-by: Gabriele Oliaro <[email protected]>
Co-authored-by: zwang86 <[email protected]>
Co-authored-by: Zeyu Wang <[email protected]>
Co-authored-by: xinhaoc <[email protected]>
  • Loading branch information
5 people authored Jan 14, 2024
1 parent ed4dbd8 commit 12fdbac
Show file tree
Hide file tree
Showing 36 changed files with 681 additions and 331 deletions.
24 changes: 19 additions & 5 deletions .github/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ ssms=[]
ssm = ff.SSM("JackFram/llama-68m")
ssms.append(ssm)
```
Next, we declare the generation configuration and compile both the LLM and SSMs. Note that all SSMs should run in the **beam search** mode, and the LLM should run in the **tree verification** mode to verify the speculated tokens from SSMs.
Next, we declare the generation configuration and compile both the LLM and SSMs. Note that all SSMs should run in the **beam search** mode, and the LLM should run in the **tree verification** mode to verify the speculated tokens from SSMs. You can also use the following arguments to specify serving configuration when compiling LLMs and SSMs:

* max\_requests\_per\_batch: the maximum number of requests to serve in a batch (default: 16)
* max\_seq\_length: the maximum number of tokens in a request (default: 256)
* max\_tokens\_per\_batch: the maximum number of tokens to process in a batch (default: 128)

```python
# Create the sampling configs
generation_config = ff.GenerationConfig(
Expand All @@ -91,11 +96,16 @@ for ssm in ssms:
ssm.compile(generation_config)

# Compile the LLM for inference and load the weights into memory
llm.compile(generation_config, ssms=ssms)
llm.compile(generation_config,
max_requests_per_batch = 16,
max_seq_length = 256,
max_tokens_per_batch = 128,
ssms=ssms)
```
Finally, we call `llm.generate` to generate the output, which is organized as a list of `GenerationResult`, which include the output tokens and text.
```python
result = llm.generate("Here are some travel tips for Tokyo:\n")
with llm:
result = llm.generate("Here are some travel tips for Tokyo:\n")
```

### Incremental decoding
Expand Down Expand Up @@ -124,10 +134,14 @@ generation_config = ff.GenerationConfig(
)

# Compile the LLM for inference and load the weights into memory
llm.compile(generation_config)
llm.compile(generation_config,
max_requests_per_batch = 16,
max_seq_length = 256,
max_tokens_per_batch = 128)

# Generation begins!
result = llm.generate("Here are some travel tips for Tokyo:\n")
with llm:
result = llm.generate("Here are some travel tips for Tokyo:\n")
```

</details>
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ if(NOT BUILD_LEGION_ONLY)
LIST_DIRECTORIES False
${FLEXFLOW_ROOT}/include/*.h)

list(APPEND FLEXFLOW_HDR ${FLEXFLOW_ROOT}/inference/file_loader.h)
#list(APPEND FLEXFLOW_HDR ${FLEXFLOW_ROOT}/inference/file_loader.h)

file(GLOB_RECURSE FLEXFLOW_SRC
LIST_DIRECTORIES False
${FLEXFLOW_ROOT}/src/*.cc)

list(REMOVE_ITEM FLEXFLOW_SRC "${FLEXFLOW_ROOT}/src/runtime/cpp_driver.cc")
list(APPEND FLEXFLOW_SRC ${FLEXFLOW_ROOT}/inference/file_loader.cc)
#list(APPEND FLEXFLOW_SRC ${FLEXFLOW_ROOT}/inference/file_loader.cc)

set(FLEXFLOW_CPP_DRV_SRC
${FLEXFLOW_ROOT}/src/runtime/cpp_driver.cc)
Expand Down
31 changes: 21 additions & 10 deletions include/flexflow/flexflow_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ flexflow_perf_metrics_t

void flexflow_model_set_transformer_layer_id(flexflow_model_t handle, int id);

flexflow_generation_result_t
flexflow_model_generate(flexflow_model_t handle_,
char const *input_text,
int max_num_chars,
char *output_text,
int max_seq_length,
int *output_length_and_tokens);
void flexflow_model_generate(flexflow_model_t handle_,
int num_requests,
char const **input_text,
int max_num_chars,
char **output_text,
int max_seq_length,
int **output_length_and_tokens);

void flexflow_model_set_position_offset(flexflow_model_t handle, int offset);

Expand Down Expand Up @@ -988,6 +988,12 @@ void flexflow_request_manager_register_output_filepath(
int flexflow_request_manager_register_ssm_model(
flexflow_request_manager_t handle_, flexflow_model_t model_handle_);

void flexflow_request_manager_start_background_server(
flexflow_request_manager_t handle_, flexflow_model_t model_handle_);

void flexflow_request_manager_terminate_background_server(
flexflow_request_manager_t handle_);

// -----------------------------------------------------------------------
// InferenceManager
// -----------------------------------------------------------------------
Expand All @@ -1004,6 +1010,11 @@ void flexflow_inference_manager_compile_model_and_allocate_buffer(
void flexflow_inference_manager_init_operators_inference(
flexflow_inference_manager_t handle_, flexflow_model_t model_handle);

void flexflow_inference_manager_register_model_weights_loader(
flexflow_inference_manager_t handle_,
flexflow_model_t model_handle,
flexflow_file_data_loader_t loader_handle);

// -----------------------------------------------------------------------
// FileDataLoader
// -----------------------------------------------------------------------
Expand All @@ -1014,13 +1025,13 @@ flexflow_file_data_loader_t
int num_kv_heads,
int hidden_dim,
int qkv_inner_dim,
int tensor_parallelism_degree);
int tensor_parallelism_degree,
bool use_full_precision);

void flexflow_file_data_loader_destroy(flexflow_file_data_loader_t handle_);

void flexflow_file_data_loader_load_weights(flexflow_file_data_loader_t handle_,
flexflow_model_t model_handle_,
bool use_full_precision);
flexflow_model_t model_handle_);

#ifdef __cplusplus
}
Expand Down
5 changes: 3 additions & 2 deletions include/flexflow/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ enum TaskIDs {
RM_PREPARE_NEXT_BATCH_INIT_TASK_ID,
RM_PREPARE_NEXT_BATCH_BEAM_TASK_ID,
RM_PREPARE_NEXT_BATCH_VERIFY_TASK_ID,
RM_BACKGROUND_SERVING_TASK_ID,
// Custom tasks
CUSTOM_GPU_TASK_ID_FIRST,
CUSTOM_GPU_TASK_ID_1,
Expand Down Expand Up @@ -806,8 +807,8 @@ class FFModel {
// ========================================
// Inference APIs
// ========================================
GenerationResult generate(std::vector<std::string> &prompts,
int max_seq_length);
std::vector<GenerationResult> generate(std::vector<std::string> &prompts,
int max_seq_length);

Tensor create_tensor_legion_ordering(int num_dim,
int const dims[],
Expand Down
76 changes: 53 additions & 23 deletions include/flexflow/request_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "flexflow/batch_config.h"
#include "flexflow/inference.h"
#include "flexflow/model.h"
#include "flexflow/utils/file_loader.h"
#include <future>
#include <mutex>
#include <tokenizers_cpp.h>

Expand All @@ -30,25 +32,29 @@ using tokenizers::Tokenizer;

class InferenceManager {
public:
InferenceManager(FFConfig const &config);
InferenceManager();
static InferenceManager *get_inference_manager();
void compile_model_and_allocate_buffer(FFModel *model);
void init_operators_inference(FFModel *model);
Legion::FutureMap inference(FFModel *model, int index, BatchConfig const &bc);
Legion::FutureMap
inference(FFModel *model, int index, BatchConfigFuture const &bc);
void load_input_tokens_from_batch_config(BatchConfigFuture const &bc,
void load_input_tokens_from_batch_config(FFModel *model,
BatchConfigFuture const &bc,
ParallelTensor const input,
FFHandler *handlers);
void load_positions(BatchConfigFuture const &bc,
void load_positions(FFModel *model,
BatchConfigFuture const &bc,
ParallelTensor position_input,
int offset);
void load_inference_metadata_batch_config(BatchConfigFuture const &bc,
void register_model_weights_loader(FFModel *, FileDataLoader *);
void load_inference_metadata_batch_config(FFModel *model,
BatchConfigFuture const &bc,
FFHandler *handlers);

public:
FFConfig ff_config;
std::unordered_map<ParallelTensor, std::vector<ParallelTensor>> tensor_buffer;
std::unordered_map<FFModel *, FileDataLoader *> model_weights_loaders;
int num_devices;
};

Expand Down Expand Up @@ -91,9 +97,15 @@ struct BeamTree {

class RequestManager {
public:
enum Status {
INITIALIZED = 1001,
SERVING = 1002,
TERMINATED = 1003,
};
using RequestGuid = BatchConfig::RequestGuid;
using TokenId = BatchConfig::TokenId;

static const RequestGuid INVALID_GUID = 0;
RequestManager();
static RequestManager *get_request_manager();
size_t get_num_processed_requests();
Expand Down Expand Up @@ -125,42 +137,54 @@ class RequestManager {
int initLength,
int non_tree_size);

FFModel *get_model(int model_id);
FFModel *get_ssm_model(int model_id);

GenerationResult generate_incr_decoding(FFModel *model,
std::vector<std::string> &prompts,
int max_seq_length);
GenerationResult generate_spec_infer(FFModel *model,
std::vector<std::string> &prompts,
int max_seq_length);
void serve_incr_decoding(FFModel *model);
void serve_spec_infer(FFModel *model);
GenerationResult get_generation_result(RequestGuid const &guid);
RequestGuid register_new_request(std::string const &prompt,
int max_sequence_length);
RequestGuid register_new_request(std::vector<TokenId> const &prompt,
int max_sequence_length);
// Methods to start and terminate request manager's background task
void start_background_server(FFModel *model);
bool is_background_server_terminated();
void terminate_background_server();
static void terminate_background_server_at_exit();
// Methods to check and mark request completion
bool is_request_completed(RequestGuid const &guid);
void trigger_request_completion_future(RequestGuid const &guid);
// Methods for preparing next batches
BatchConfig prepare_next_batch(BatchConfig const &bc,
InferenceResult const &result);
BatchConfigFuture prepare_next_batch(BatchConfigFuture const &bc,
InferenceResultFuture const &result);
InferenceResultFuture const &result,
Legion::Context ctx,
Legion::Runtime *runtime);
BeamSearchBatchConfig
prepare_next_batch_beam(BeamSearchBatchConfig const &old_bc,
BeamInferenceResult const &result);
BeamSearchBatchConfigFuture
prepare_next_batch_beam(BeamSearchBatchConfigFuture const &old_bc,
BeamInferenceResultFuture const &result);
BeamInferenceResultFuture const &result,
Legion::Context ctx,
Legion::Runtime *runtime);
BeamSearchBatchConfig
prepare_next_batch_init(TreeVerifyBatchConfig const &old_bc,
InferenceResult const &result,
int model_id);
BeamSearchBatchConfigFuture
prepare_next_batch_init(TreeVerifyBatchConfigFuture const &old_bc,
InferenceResultFuture const &result,
int model_id);
int model_id,
Legion::Context ctx,
Legion::Runtime *runtime);
TreeVerifyBatchConfig prepare_next_batch_verify(
std::vector<BeamSearchBatchConfig> const &old_batches);
TreeVerifyBatchConfigFuture prepare_next_batch_verify(
std::vector<BeamSearchBatchConfigFuture> const &old_batches);
std::vector<BeamSearchBatchConfigFuture> const &old_batches,
Legion::Context ctx,
Legion::Runtime *runtime);

void store_beam_metadata(BeamSearchBatchConfig const &old_bc,
BeamInferenceResult const &result);
Expand All @@ -187,7 +211,11 @@ class RequestManager {
&inputSerializedTree,
std::vector<std::pair<BatchConfig::TokenId, int>> const
&outputSerializedTree);

static void background_serving_task(
Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
static void
load_tokens_task(Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Expand Down Expand Up @@ -233,9 +261,11 @@ class RequestManager {
int max_requests_per_batch;
int max_tokens_per_batch;
int max_sequence_length;
Status request_manager_status;

// tree width in each speculative step, if not specified 1
std::vector<int> spec_infer_tree_width;

// private fields
std::unique_ptr<Tokenizer> tokenizer_;
bool verbose;
Expand All @@ -247,12 +277,9 @@ class RequestManager {
std::unordered_map<RequestGuid, Request> all_requests;
std::unordered_map<RequestGuid, GenerationResult> request_generation_results;
std::mutex request_queue_mutex;
std::unordered_map<RequestGuid, std::promise<void> *> request_to_promise;
std::mutex request_to_promise_mutex;
RequestGuid next_available_guid;
// Legion futures for inc_decoding and spec_infer
BatchConfigFuture last_bcf;
InferenceResultFuture last_irf;
TreeVerifyBatchConfigFuture last_tree_bcf;
InferenceResultFuture last_tree_irf;

// TODO: Move this two vector to request struct
std::unordered_map<RequestGuid,
Expand All @@ -262,11 +289,14 @@ class RequestManager {
committed_tokens;

// Multi-model support
std::vector<FFModel *> models;
std::vector<FFModel *> ssm_models;

// Performance profiling
size_t num_processed_requests;

// Background server handler
Legion::Future background_server_handler;

private:
struct ProfileInfo {
int llm_decoding_steps;
Expand Down
11 changes: 5 additions & 6 deletions inference/file_loader.h → include/flexflow/utils/file_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@ class FileDataLoader {
int _num_kv_heads,
size_t _hidden_dim,
size_t _qkv_inner_dim,
int _tensor_parallelism_degree);
int _tensor_parallelism_degree,
bool _use_full_precision);

BatchConfig::TokenId *generate_requests(int num, int length);

template <typename DT>
void load_single_weight_tensor(FFModel *ff, Layer *l, int weight_idx);

void load_quantization_weight(FFModel *ff,
Layer *l,
int weight_idx,
bool use_full_precision);
void load_weights(FFModel *ff, bool use_full_precision);
void load_quantization_weight(FFModel *ff, Layer *l, int weight_idx);
void load_weights(FFModel *ff);

void load_positions(FFModel *ff,
Tensor pt,
Expand All @@ -54,4 +52,5 @@ class FileDataLoader {
size_t hidden_dim, qkv_inner_dim;
std::string prompts_filepath;
std::string weights_folder;
bool use_full_precision;
};
1 change: 0 additions & 1 deletion inference/incr_decoding/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ set(project_target incr_decoding)
set(CPU_SRC
${FLEXFLOW_CPP_DRV_SRC}
incr_decoding.cc
../file_loader.cc
../models/llama.cc
../models/opt.cc
../models/falcon.cc
Expand Down
8 changes: 7 additions & 1 deletion inference/incr_decoding/incr_decoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <nlohmann/json.hpp>

using namespace FlexFlow;
using namespace Legion;
using json = nlohmann::json;

Expand Down Expand Up @@ -250,6 +251,8 @@ void FlexFlow::top_level_task(Task const *task,
assert(false && "unknow model type");
}

rm->start_background_server(&model);

int total_num_requests = 0;
{
using json = nlohmann::json;
Expand All @@ -266,10 +269,13 @@ void FlexFlow::top_level_task(Task const *task,
total_num_requests++;
prompts.push_back(text);
}
GenerationResult result =
std::vector<GenerationResult> result =
model.generate(prompts, 128 /*max_sequence_length*/);
}

// terminate the request manager by stopping the background thread
rm->terminate_background_server();

// Execution fence
{
Future future = runtime->issue_execution_fence(ctx);
Expand Down
Loading

0 comments on commit 12fdbac

Please sign in to comment.