Skip to content

Commit

Permalink
opt
Browse files Browse the repository at this point in the history
  • Loading branch information
xinhaoc committed Dec 21, 2023
1 parent fa1fffc commit 40d830c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 84 deletions.
12 changes: 6 additions & 6 deletions include/flexflow/optimizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Optimizer {
virtual void init(void) = 0;
virtual void next(void) = 0;
virtual void update(const ParallelTensor p) = 0;
virtual void unified_update(std::vector<ParallelTensor> parameters) = 0;
virtual void unified_update(std::vector<ParallelTensor> const parameters) = 0;
FFModel const *model;
};

Expand All @@ -44,7 +44,7 @@ class SGDOptimizer : public Optimizer {
void init(void);
void next(void);
void update(const ParallelTensor p);
void unified_update(std::vector<ParallelTensor> parameters);
void unified_update(std::vector<ParallelTensor> const parameters);
void set_weight_decay(double _weight_decay);
static void ps_update_task(Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Expand Down Expand Up @@ -92,7 +92,7 @@ class AdamOptimizer : public Optimizer {
void init(void);
void next(void);
void update(const ParallelTensor p);
void unified_update(std::vector<ParallelTensor> parameters);
void unified_update(std::vector<ParallelTensor> const parameters);
void set_weight_decay(double _weight_decay);
static void ps_update_task(Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Expand Down Expand Up @@ -134,9 +134,9 @@ class AdamOptimizer : public Optimizer {
double alpha, beta1, beta2, weight_decay, epsilon;
double alpha_t, beta1_t, beta2_t;
std::map<Legion::LogicalRegion, ParallelTensor> v_values, m_values;
size_t reservedWorkSpaceSize;
int parameters_num;
int processed_parameters_num;
size_t reservedWorkSpaceSize = 0;
int parameters_num = 0;
int processed_parameters_num = 0;
};

}; // namespace FlexFlow
Expand Down
111 changes: 45 additions & 66 deletions src/runtime/optimizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ void AdamOptimizer::update(const ParallelTensor p) {
assert(m_values.find(p->region) != m_values.end());
assert(p->owner_op != NULL);
reservedWorkSpaceSize += p->get_volume() * sizeof(float);
printf("update workspace: %d", reservedWorkSpaceSize);
if (p->sync_type == ParameterSyncType::PS) {
TaskLauncher launcher(ADAM_UPD_PS_TASK_ID,
TaskArgument(this, sizeof(AdamOptimizer)),
Expand Down Expand Up @@ -495,13 +494,13 @@ void AdamOptimizer::update(const ParallelTensor p) {
}
}

void AdamOptimizer::unified_update(std::vector<ParallelTensor> parameters) {
void SGDOptimizer::unified_update(std::vector<ParallelTensor> const parameters) {
//todo
}

void AdamOptimizer::unified_update(std::vector<ParallelTensor> const parameters) {
Context ctx = model->config.lg_ctx;
Runtime *runtime = model->config.lg_hlr;


printf("update workspace: %d", reservedWorkSpaceSize);

const ParallelTensor p0 = parameters.at(0);
ArgumentMap argmap;
Domain domain = runtime->get_index_space_domain(ctx, p0->parallel_is);
Expand All @@ -523,8 +522,10 @@ void AdamOptimizer::unified_update(std::vector<ParallelTensor> parameters) {
}

int offset = 0;
printf("param size: %d, %d\n", parameters.size(), parameters_num);

while(parameters_num < parameters.size()){
while(processed_parameters_num < parameters.size()){

for(int i = 0; i < parameters.size(); i++){
const ParallelTensor p = parameters.at(i);
assert(v_values.find(p->region) != v_values.end());
Expand All @@ -538,20 +539,22 @@ void AdamOptimizer::unified_update(std::vector<ParallelTensor> parameters) {
assert (p->sync_type == ParameterSyncType::NCCL);
assert(p->parallel_is != IndexSpace::NO_SPACE);
}

printf("parameters_num: %d %d, %d\n", parameters_num, reservedWorkSpaceSize, model->handlers->workSpaceSize);
assert(parameters_num <= parameters.size());

//launch a unified task
for(int j = 0; j < parameters_num; j++){
this->next();
const ParallelTensor p = parameters.at(processed_parameters_num + j);
IndexLauncher launcher(ADAM_UNIFY_UPD_NCCL_TASK_ID,
IndexLauncher launcher(ADAM_UNIFY_UPD_NCCL_TASK_ID,
p0->parallel_is,
TaskArgument(this, sizeof(AdamOptimizer)),
argmap,
Predicate::TRUE_PRED,
false /*must_epoch*/,
0 /*mapper_id*/,
p0->machine_view.hash());
//launch a unified task
for(int j = 0; j < parameters_num; j++){
const ParallelTensor p = parameters.at(processed_parameters_num + j);

// regions[0]: region_grad
launcher.add_region_requirement(RegionRequirement(p->part_grad,
0 /*projection id*/,
Expand Down Expand Up @@ -580,16 +583,19 @@ void AdamOptimizer::unified_update(std::vector<ParallelTensor> parameters) {
m_values[p->region]->region));
launcher.add_field(offset + 3, FID_DATA);
offset += 4;
launcher.concurrent = true;
FutureMap fm = runtime->execute_index_space(ctx, launcher);
// runtime->execute_must_epoch(ctx, must_epoch_launcher);
runtime->issue_execution_fence(ctx);
reservedWorkSpaceSize = 0;
offset = 0;
processed_parameters_num += parameters_num;
}
printf("offset: %d\n", offset);

//update alpha, beta
for(int i = 0; i < parameters_num; i++){
this->next();
}
launcher.concurrent = true;
FutureMap fm = runtime->execute_index_space(ctx, launcher);
// runtime->execute_must_epoch(ctx, must_epoch_launcher);
runtime->issue_execution_fence(ctx);
reservedWorkSpaceSize = 0;
offset = 0;
processed_parameters_num += parameters_num;
}

}
Expand Down Expand Up @@ -721,55 +727,28 @@ void AdamOptimizer::nccl_unified_update_task(Task const *task,
Domain domain = runtime->get_index_space_domain(
ctx, task->regions[1].region.get_index_space());

printf("parameters_num: %d\n", op->parameters_num);
float const *w_grad_ptr = NULL;
float *w_ptr = NULL, *v_ptr = NULL, *m_ptr = NULL;
size_t size = 0;
float const *w_grad_ptr[op->parameters_num];
float *w_ptr[op->parameters_num], *v_ptr[op->parameters_num], *m_ptr[op->parameters_num];
size_t size[op->parameters_num];
int offset = 0;

printf("parameters_num: %d\n", op->parameters_num);

for(int i = 0; i < op->parameters_num; i++){

switch (domain.get_dim()) {
#define DIMFUNC(DIM) \
case DIM: { \
TensorAccessorR<float, DIM> accWGrad( \
regions[0], task->regions[0], FID_DATA, ctx, runtime); \
TensorAccessorW<float, DIM> accW(regions[1], \
task->regions[1], \
FID_DATA, \
ctx, \
runtime, \
true /*readOutput*/); \
TensorAccessorW<float, DIM> accV(regions[2], \
task->regions[2], \
FID_DATA, \
ctx, \
runtime, \
true /*readOutput*/); \
TensorAccessorW<float, DIM> accM(regions[3], \
task->regions[3], \
FID_DATA, \
ctx, \
runtime, \
true /*readOutput*/); \
size = accW.rect.volume(); \
assert(accWGrad.rect == accW.rect); \
assert(accWGrad.rect == accV.rect); \
assert(accWGrad.rect == accM.rect); \
w_grad_ptr = accWGrad.ptr; \
w_ptr = accW.ptr; \
v_ptr = accV.ptr; \
m_ptr = accM.ptr; \
break; \
GenericTensorAccessorR accWGrad = helperGetGenericTensorAccessorRO(DataType::DT_FLOAT, regions[offset], task->regions[offset], FID_DATA, ctx, runtime);
GenericTensorAccessorW accW = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT, regions[offset+1], task->regions[offset+1], FID_DATA, ctx, runtime);
GenericTensorAccessorW accV = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT, regions[offset+2], task->regions[offset+2], FID_DATA, ctx, runtime);
GenericTensorAccessorW accM = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT, regions[offset+3], task->regions[offset+3], FID_DATA, ctx, runtime);
offset += 4;

size[i] = accW.domain.get_volume();
// assert(accWGrad.rect == accW.rect);
// assert(accWGrad.rect == accV.rect);
// assert(accWGrad.rect == accM.rect);
w_ptr[i] = accW.get_float_ptr();
v_ptr[i] = accV.get_float_ptr();
m_ptr[i] = accM.get_float_ptr();
}
LEGION_FOREACH_N(DIMFUNC)
#undef DIMFUNC
default: {
// Unsupported dims
assert(false);
}
}
}

nccl_unified_update_task_gpu(op, meta, w_grad_ptr, size, w_ptr, v_ptr, m_ptr);
}
#endif
Expand Down
33 changes: 21 additions & 12 deletions src/runtime/optimizer_kernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,27 @@ __host__ void AdamOptimizer::nccl_unified_update_task_gpu(AdamOptimizer const *o
float *w_ptr[],
float *v_ptr[],
float *m_ptr[]) {

hipStream_t stream;
checkCUDA(get_legion_stream(&stream));
assert(op->reservedWorkSpaceSize < meta->handle.workSpaceSize);

void *workSpace_ptr = static_cast<char *>(meta->handle.workSpace);
void *workSpace_ptr = meta->handle.workSpace;

for(int i = 0; i < op->parameters_num; i++){
checkCUDA(hipMemcpyAsync(workSpace_ptr,
w_grad_ptr[i],
size[i] * sizeof(float),
hipMemcpyDeviceToDevice,
stream));
workSpace_ptr += size[i] * sizeof(float);
// hipMemcpyAsync(static_cast<float*>(workSpace_ptr),
// w_grad_ptr[i],
// size[i] * sizeof(float),
// hipMemcpyDeviceToDevice,
// stream);
// hipError_t error = hipGetLastError();
// if(error != hipSuccess)
// {
// // print the CUDA error message and exit
// printf("CUDA error: %s\n", hipGetErrorString(error));
// }

workSpace_ptr = static_cast<char *>(workSpace_ptr) + size[i] * sizeof(float);
}

//do allreduce once
Expand All @@ -284,8 +292,9 @@ __host__ void AdamOptimizer::nccl_unified_update_task_gpu(AdamOptimizer const *o
float beta2_t = op->beta2_t;
for(int i = 0; i < op->parameters_num; i++){
// update
std::cout<<"update"<<"\n";
hipLaunchKernelGGL(HIP_KERNEL_NAME(adam_update),
GET_BLOCKS(size),
GET_BLOCKS(size[i]),
CUDA_NUM_THREADS,
0,
stream,
Expand All @@ -299,12 +308,12 @@ __host__ void AdamOptimizer::nccl_unified_update_task_gpu(AdamOptimizer const *o
m_ptr[i],
v_ptr[i],
w_ptr[i]);
workSpace_ptr += size[i] * sizeof(float);
workSpace_ptr = static_cast<char *>(workSpace_ptr) + size[i] * sizeof(float);

//update
beta1_t *= beta1;
beta2_t *= beta2;
alpha_t = alpha * sqrt(1 - beta2_t) / (1 - beta1_t);
beta1_t *= op->beta1;
beta2_t *= op->beta2;
alpha_t = op->alpha * sqrt(1 - beta2_t) / (1 - beta1_t);
}

// checkCUDA(hipDeviceSynchronize());
Expand Down

0 comments on commit 40d830c

Please sign in to comment.