Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a background server for RequestManager #1223

Merged
merged 34 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c33ec8d
add a background server for RequestManager
jiazhihao Nov 2, 2023
9ec4cdb
.
jiazhihao Nov 4, 2023
8260fd8
make incr_decoding work
jiazhihao Nov 4, 2023
9bbc806
make spec_infer work
jiazhihao Nov 5, 2023
3b6f7a9
format
jiazhihao Nov 5, 2023
5ebc914
update python inference
jiazhihao Nov 5, 2023
e1d606f
resolve merge conflict
jiazhihao Nov 5, 2023
be42e20
fix python issues
jiazhihao Nov 5, 2023
400d5bd
bug fix
jiazhihao Nov 5, 2023
2a17173
Merge branch 'inference' into background_worker
goliaro Nov 6, 2023
56f9f2b
Merge branch 'inference' into background_worker
jiazhihao Nov 10, 2023
0713433
add a Legion future to capture the termination of the background server
jiazhihao Nov 10, 2023
499fab8
Merge branch 'inference' into background_worker
jiazhihao Nov 15, 2023
d908b1a
Merge branch 'inference' into background_worker
zwang86 Nov 17, 2023
938a2d6
Merge branch 'inference' into background_worker
zwang86 Nov 28, 2023
7125f95
Merge branch 'inference' into background_worker
zwang86 Dec 1, 2023
91c7e94
Merge branch 'inference' into background_worker
zwang86 Dec 11, 2023
6cdd948
Merge branch 'inference' into background_worker
zwang86 Dec 13, 2023
8485edd
Merge branch 'inference' into background_worker
zwang86 Jan 5, 2024
c497ec2
Add thread safety for background server.
zwang86 Jan 5, 2024
99cc9ac
Simplify backend server design.
zwang86 Jan 5, 2024
4b4d1a9
resolve conflict.
zwang86 Jan 5, 2024
70212f6
Merge branch 'inference' into background_worker
zwang86 Jan 12, 2024
a58aa6d
Add server task timeout.
zwang86 Jan 12, 2024
1725c81
Merge branch 'inference' of https://github.com/flexflow/FlexFlow into…
jiazhihao Jan 12, 2024
4dd98bb
Merge branch 'inference' of https://github.com/flexflow/FlexFlow into…
jiazhihao Jan 12, 2024
0bce49a
register callbacks to terminate background worker at exit or termination
jiazhihao Jan 12, 2024
058308c
[Python] enable decoding multiple requests
jiazhihao Jan 13, 2024
37feea4
update README.md and default configuration
jiazhihao Jan 13, 2024
240c532
[Python] no need to use the llm context environment to start/stop the…
jiazhihao Jan 13, 2024
0b3289c
require at least four cpu cores
jiazhihao Jan 13, 2024
1f1bffc
[Python] add back explict start_server()/stop_server().
zwang86 Jan 13, 2024
8bfaf6a
fix
xinhaoc Jan 13, 2024
8db2650
fix python chatgpt.json
xinhaoc Jan 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions .github/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ ssms=[]
ssm = ff.SSM("JackFram/llama-68m")
ssms.append(ssm)
```
Next, we declare the generation configuration and compile both the LLM and SSMs. Note that all SSMs should run in the **beam search** mode, and the LLM should run in the **tree verification** mode to verify the speculated tokens from SSMs.
Next, we declare the generation configuration and compile both the LLM and SSMs. Note that all SSMs should run in the **beam search** mode, and the LLM should run in the **tree verification** mode to verify the speculated tokens from SSMs. You can also use the following arguments to specify serving configuration when compiling LLMs and SSMs:

* max\_requests\_per\_batch: the maximum number of requests to serve in a batch (default: 16)
* max\_seq\_length: the maximum number of tokens in a request (default: 256)
* max\_tokens\_per\_batch: the maximum number of tokens to process in a batch (default: 128)

```python
# Create the sampling configs
generation_config = ff.GenerationConfig(
Expand All @@ -91,11 +96,16 @@ for ssm in ssms:
ssm.compile(generation_config)

# Compile the LLM for inference and load the weights into memory
llm.compile(generation_config, ssms=ssms)
llm.compile(generation_config,
max_requests_per_batch = 16,
max_seq_length = 256,
max_tokens_per_batch = 128,
ssms=ssms)
```
Finally, we call `llm.generate` to generate the output, which is organized as a list of `GenerationResult`, which include the output tokens and text.
```python
result = llm.generate("Here are some travel tips for Tokyo:\n")
with llm:
result = llm.generate("Here are some travel tips for Tokyo:\n")
```

### Incremental decoding
Expand Down Expand Up @@ -124,10 +134,14 @@ generation_config = ff.GenerationConfig(
)

# Compile the LLM for inference and load the weights into memory
llm.compile(generation_config)
llm.compile(generation_config,
max_requests_per_batch = 16,
max_seq_length = 256,
max_tokens_per_batch = 128)

# Generation begins!
result = llm.generate("Here are some travel tips for Tokyo:\n")
with llm:
result = llm.generate("Here are some travel tips for Tokyo:\n")
```

</details>
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ if(NOT BUILD_LEGION_ONLY)
LIST_DIRECTORIES False
${FLEXFLOW_ROOT}/include/*.h)

list(APPEND FLEXFLOW_HDR ${FLEXFLOW_ROOT}/inference/file_loader.h)
#list(APPEND FLEXFLOW_HDR ${FLEXFLOW_ROOT}/inference/file_loader.h)

file(GLOB_RECURSE FLEXFLOW_SRC
LIST_DIRECTORIES False
${FLEXFLOW_ROOT}/src/*.cc)

list(REMOVE_ITEM FLEXFLOW_SRC "${FLEXFLOW_ROOT}/src/runtime/cpp_driver.cc")
list(APPEND FLEXFLOW_SRC ${FLEXFLOW_ROOT}/inference/file_loader.cc)
#list(APPEND FLEXFLOW_SRC ${FLEXFLOW_ROOT}/inference/file_loader.cc)

set(FLEXFLOW_CPP_DRV_SRC
${FLEXFLOW_ROOT}/src/runtime/cpp_driver.cc)
Expand Down
31 changes: 21 additions & 10 deletions include/flexflow/flexflow_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ flexflow_perf_metrics_t

void flexflow_model_set_transformer_layer_id(flexflow_model_t handle, int id);

flexflow_generation_result_t
flexflow_model_generate(flexflow_model_t handle_,
char const *input_text,
int max_num_chars,
char *output_text,
int max_seq_length,
int *output_length_and_tokens);
void flexflow_model_generate(flexflow_model_t handle_,
int num_requests,
char const **input_text,
int max_num_chars,
char **output_text,
int max_seq_length,
int **output_length_and_tokens);

void flexflow_model_set_position_offset(flexflow_model_t handle, int offset);

Expand Down Expand Up @@ -988,6 +988,12 @@ void flexflow_request_manager_register_output_filepath(
int flexflow_request_manager_register_ssm_model(
flexflow_request_manager_t handle_, flexflow_model_t model_handle_);

void flexflow_request_manager_start_background_server(
flexflow_request_manager_t handle_, flexflow_model_t model_handle_);

void flexflow_request_manager_terminate_background_server(
flexflow_request_manager_t handle_);

// -----------------------------------------------------------------------
// InferenceManager
// -----------------------------------------------------------------------
Expand All @@ -1004,6 +1010,11 @@ void flexflow_inference_manager_compile_model_and_allocate_buffer(
void flexflow_inference_manager_init_operators_inference(
flexflow_inference_manager_t handle_, flexflow_model_t model_handle);

void flexflow_inference_manager_register_model_weights_loader(
flexflow_inference_manager_t handle_,
flexflow_model_t model_handle,
flexflow_file_data_loader_t loader_handle);

// -----------------------------------------------------------------------
// FileDataLoader
// -----------------------------------------------------------------------
Expand All @@ -1014,13 +1025,13 @@ flexflow_file_data_loader_t
int num_kv_heads,
int hidden_dim,
int qkv_inner_dim,
int tensor_parallelism_degree);
int tensor_parallelism_degree,
bool use_full_precision);

void flexflow_file_data_loader_destroy(flexflow_file_data_loader_t handle_);

void flexflow_file_data_loader_load_weights(flexflow_file_data_loader_t handle_,
flexflow_model_t model_handle_,
bool use_full_precision);
flexflow_model_t model_handle_);

#ifdef __cplusplus
}
Expand Down
5 changes: 3 additions & 2 deletions include/flexflow/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ enum TaskIDs {
RM_PREPARE_NEXT_BATCH_INIT_TASK_ID,
RM_PREPARE_NEXT_BATCH_BEAM_TASK_ID,
RM_PREPARE_NEXT_BATCH_VERIFY_TASK_ID,
RM_BACKGROUND_SERVING_TASK_ID,
// Custom tasks
CUSTOM_GPU_TASK_ID_FIRST,
CUSTOM_GPU_TASK_ID_1,
Expand Down Expand Up @@ -806,8 +807,8 @@ class FFModel {
// ========================================
// Inference APIs
// ========================================
GenerationResult generate(std::vector<std::string> &prompts,
int max_seq_length);
std::vector<GenerationResult> generate(std::vector<std::string> &prompts,
int max_seq_length);

Tensor create_tensor_legion_ordering(int num_dim,
int const dims[],
Expand Down
76 changes: 53 additions & 23 deletions include/flexflow/request_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "flexflow/batch_config.h"
#include "flexflow/inference.h"
#include "flexflow/model.h"
#include "flexflow/utils/file_loader.h"
#include <future>
#include <mutex>
#include <tokenizers_cpp.h>

Expand All @@ -30,25 +32,29 @@ using tokenizers::Tokenizer;

class InferenceManager {
public:
InferenceManager(FFConfig const &config);
InferenceManager();
static InferenceManager *get_inference_manager();
void compile_model_and_allocate_buffer(FFModel *model);
void init_operators_inference(FFModel *model);
Legion::FutureMap inference(FFModel *model, int index, BatchConfig const &bc);
Legion::FutureMap
inference(FFModel *model, int index, BatchConfigFuture const &bc);
void load_input_tokens_from_batch_config(BatchConfigFuture const &bc,
void load_input_tokens_from_batch_config(FFModel *model,
BatchConfigFuture const &bc,
ParallelTensor const input,
FFHandler *handlers);
void load_positions(BatchConfigFuture const &bc,
void load_positions(FFModel *model,
BatchConfigFuture const &bc,
ParallelTensor position_input,
int offset);
void load_inference_metadata_batch_config(BatchConfigFuture const &bc,
void register_model_weights_loader(FFModel *, FileDataLoader *);
void load_inference_metadata_batch_config(FFModel *model,
BatchConfigFuture const &bc,
FFHandler *handlers);

public:
FFConfig ff_config;
std::unordered_map<ParallelTensor, std::vector<ParallelTensor>> tensor_buffer;
std::unordered_map<FFModel *, FileDataLoader *> model_weights_loaders;
int num_devices;
};

Expand Down Expand Up @@ -91,9 +97,15 @@ struct BeamTree {

class RequestManager {
public:
enum Status {
INITIALIZED = 1001,
SERVING = 1002,
TERMINATED = 1003,
};
using RequestGuid = BatchConfig::RequestGuid;
using TokenId = BatchConfig::TokenId;

static const RequestGuid INVALID_GUID = 0;
RequestManager();
static RequestManager *get_request_manager();
size_t get_num_processed_requests();
Expand Down Expand Up @@ -125,42 +137,54 @@ class RequestManager {
int initLength,
int non_tree_size);

FFModel *get_model(int model_id);
FFModel *get_ssm_model(int model_id);

GenerationResult generate_incr_decoding(FFModel *model,
std::vector<std::string> &prompts,
int max_seq_length);
GenerationResult generate_spec_infer(FFModel *model,
std::vector<std::string> &prompts,
int max_seq_length);
void serve_incr_decoding(FFModel *model);
void serve_spec_infer(FFModel *model);
GenerationResult get_generation_result(RequestGuid const &guid);
RequestGuid register_new_request(std::string const &prompt,
int max_sequence_length);
RequestGuid register_new_request(std::vector<TokenId> const &prompt,
int max_sequence_length);
// Methods to start and terminate request manager's background task
void start_background_server(FFModel *model);
bool is_background_server_terminated();
void terminate_background_server();
static void terminate_background_server_at_exit();
// Methods to check and mark request completion
bool is_request_completed(RequestGuid const &guid);
void trigger_request_completion_future(RequestGuid const &guid);
// Methods for preparing next batches
BatchConfig prepare_next_batch(BatchConfig const &bc,
InferenceResult const &result);
BatchConfigFuture prepare_next_batch(BatchConfigFuture const &bc,
InferenceResultFuture const &result);
InferenceResultFuture const &result,
Legion::Context ctx,
Legion::Runtime *runtime);
BeamSearchBatchConfig
prepare_next_batch_beam(BeamSearchBatchConfig const &old_bc,
BeamInferenceResult const &result);
BeamSearchBatchConfigFuture
prepare_next_batch_beam(BeamSearchBatchConfigFuture const &old_bc,
BeamInferenceResultFuture const &result);
BeamInferenceResultFuture const &result,
Legion::Context ctx,
Legion::Runtime *runtime);
BeamSearchBatchConfig
prepare_next_batch_init(TreeVerifyBatchConfig const &old_bc,
InferenceResult const &result,
int model_id);
BeamSearchBatchConfigFuture
prepare_next_batch_init(TreeVerifyBatchConfigFuture const &old_bc,
InferenceResultFuture const &result,
int model_id);
int model_id,
Legion::Context ctx,
Legion::Runtime *runtime);
TreeVerifyBatchConfig prepare_next_batch_verify(
std::vector<BeamSearchBatchConfig> const &old_batches);
TreeVerifyBatchConfigFuture prepare_next_batch_verify(
std::vector<BeamSearchBatchConfigFuture> const &old_batches);
std::vector<BeamSearchBatchConfigFuture> const &old_batches,
Legion::Context ctx,
Legion::Runtime *runtime);

void store_beam_metadata(BeamSearchBatchConfig const &old_bc,
BeamInferenceResult const &result);
Expand All @@ -187,7 +211,11 @@ class RequestManager {
&inputSerializedTree,
std::vector<std::pair<BatchConfig::TokenId, int>> const
&outputSerializedTree);

static void background_serving_task(
Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
static void
load_tokens_task(Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Expand Down Expand Up @@ -233,9 +261,11 @@ class RequestManager {
int max_requests_per_batch;
int max_tokens_per_batch;
int max_sequence_length;
Status request_manager_status;

// tree width in each speculative step, if not specified 1
std::vector<int> spec_infer_tree_width;

// private fields
std::unique_ptr<Tokenizer> tokenizer_;
bool verbose;
Expand All @@ -247,12 +277,9 @@ class RequestManager {
std::unordered_map<RequestGuid, Request> all_requests;
std::unordered_map<RequestGuid, GenerationResult> request_generation_results;
std::mutex request_queue_mutex;
std::unordered_map<RequestGuid, std::promise<void> *> request_to_promise;
std::mutex request_to_promise_mutex;
RequestGuid next_available_guid;
// Legion futures for inc_decoding and spec_infer
BatchConfigFuture last_bcf;
InferenceResultFuture last_irf;
TreeVerifyBatchConfigFuture last_tree_bcf;
InferenceResultFuture last_tree_irf;

// TODO: Move this two vector to request struct
std::unordered_map<RequestGuid,
Expand All @@ -262,11 +289,14 @@ class RequestManager {
committed_tokens;

// Multi-model support
std::vector<FFModel *> models;
std::vector<FFModel *> ssm_models;

// Performance profiling
size_t num_processed_requests;

// Background server handler
Legion::Future background_server_handler;

private:
struct ProfileInfo {
int llm_decoding_steps;
Expand Down
11 changes: 5 additions & 6 deletions inference/file_loader.h → include/flexflow/utils/file_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@ class FileDataLoader {
int _num_kv_heads,
size_t _hidden_dim,
size_t _qkv_inner_dim,
int _tensor_parallelism_degree);
int _tensor_parallelism_degree,
bool _use_full_precision);

BatchConfig::TokenId *generate_requests(int num, int length);

template <typename DT>
void load_single_weight_tensor(FFModel *ff, Layer *l, int weight_idx);

void load_quantization_weight(FFModel *ff,
Layer *l,
int weight_idx,
bool use_full_precision);
void load_weights(FFModel *ff, bool use_full_precision);
void load_quantization_weight(FFModel *ff, Layer *l, int weight_idx);
void load_weights(FFModel *ff);

void load_positions(FFModel *ff,
Tensor pt,
Expand All @@ -54,4 +52,5 @@ class FileDataLoader {
size_t hidden_dim, qkv_inner_dim;
std::string prompts_filepath;
std::string weights_folder;
bool use_full_precision;
};
1 change: 0 additions & 1 deletion inference/incr_decoding/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ set(project_target incr_decoding)
set(CPU_SRC
${FLEXFLOW_CPP_DRV_SRC}
incr_decoding.cc
../file_loader.cc
../models/llama.cc
../models/opt.cc
../models/falcon.cc
Expand Down
8 changes: 7 additions & 1 deletion inference/incr_decoding/incr_decoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <nlohmann/json.hpp>

using namespace FlexFlow;
using namespace Legion;
using json = nlohmann::json;

Expand Down Expand Up @@ -250,6 +251,8 @@ void FlexFlow::top_level_task(Task const *task,
assert(false && "unknow model type");
}

rm->start_background_server(&model);

int total_num_requests = 0;
{
using json = nlohmann::json;
Expand All @@ -266,10 +269,13 @@ void FlexFlow::top_level_task(Task const *task,
total_num_requests++;
prompts.push_back(text);
}
GenerationResult result =
std::vector<GenerationResult> result =
model.generate(prompts, 128 /*max_sequence_length*/);
}

// terminate the request manager by stopping the background thread
rm->terminate_background_server();

// Execution fence
{
Future future = runtime->issue_execution_fence(ctx);
Expand Down
Loading
Loading