Skip to content

Commit

Permalink
fix multinodes
Browse files Browse the repository at this point in the history
  • Loading branch information
xinhaoc committed Jan 19, 2024
1 parent c162d4c commit ea79317
Showing 1 changed file with 84 additions and 61 deletions.
145 changes: 84 additions & 61 deletions src/runtime/optimizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,27 +523,29 @@ void AdamOptimizer::unified_update(std::vector<ParallelTensor> const parameters)

int offset = 0;
int processed_parameters_num = 0;
parameters_num = 0;
// printf("param size: %d, %d\n", parameters.size(), parameters_num);

while(processed_parameters_num < parameters.size()){
parameters_num = 0;

for(int i = 0; i < parameters.size(); i++){
const ParallelTensor p = parameters.at(i);
assert(v_values.find(p->region) != v_values.end());
assert(m_values.find(p->region) != m_values.end());
assert(p->owner_op != NULL);
if(reservedWorkSpaceSize + p->get_volume() * sizeof(float) >= model->handlers->workSpaceSize){
break;
if (reservedWorkSpaceSize + p->get_volume() * sizeof(float) >=
model->handlers->workSpaceSize) {
break;
}
reservedWorkSpaceSize += p->get_volume() * sizeof(float);
parameters_num += 1;
assert (p->sync_type == ParameterSyncType::NCCL);
assert(p->sync_type == ParameterSyncType::NCCL);
assert(p->parallel_is != IndexSpace::NO_SPACE);
}

// printf("parameters_num: %d %d, %d\n", parameters_num, reservedWorkSpaceSize, model->handlers->workSpaceSize);
assert(parameters_num <= parameters.size());
// printf("parameters_num: %d %d, %d\n", parameters_num,
// reservedWorkSpaceSize, model->handlers->workSpaceSize);
assert(processed_parameters_num <= parameters.size());

IndexLauncher launcher(ADAM_UNIFY_UPD_NCCL_TASK_ID,
p0->parallel_is,
Expand All @@ -553,42 +555,42 @@ void AdamOptimizer::unified_update(std::vector<ParallelTensor> const parameters)
false /*must_epoch*/,
0 /*mapper_id*/,
p0->machine_view.hash());
//launch a unified task
for(int j = 0; j < parameters_num; j++){
// launch a unified task
for (int j = 0; j < parameters_num; j++) {
const ParallelTensor p = parameters.at(processed_parameters_num + j);

// regions[0]: region_grad
launcher.add_region_requirement(RegionRequirement(p->part_grad,
0 /*projection id*/,
READ_ONLY,
EXCLUSIVE,
p->region_grad));
launcher.add_field(offset, FID_DATA);
// regions[1]: region
launcher.add_region_requirement(RegionRequirement(
p->part, 0 /*projection id*/, READ_WRITE, EXCLUSIVE, p->region));
launcher.add_field(offset + 1, FID_DATA);
// regions[2]: w_region
launcher.add_region_requirement(
RegionRequirement(v_values[p->region]->part,
0 /*projection id*/,
READ_WRITE,
EXCLUSIVE,
v_values[p->region]->region));
launcher.add_field(offset + 2, FID_DATA);
// regions[3]: m_region
launcher.add_region_requirement(
RegionRequirement(m_values[p->region]->part,
0 /*projection id*/,
READ_WRITE,
EXCLUSIVE,
m_values[p->region]->region));
launcher.add_field(offset + 3, FID_DATA);
offset += 4;
// regions[0]: region_grad
launcher.add_region_requirement(RegionRequirement(p->part_grad,
0 /*projection id*/,
READ_ONLY,
EXCLUSIVE,
p->region_grad));
launcher.add_field(offset, FID_DATA);
// regions[1]: region
launcher.add_region_requirement(RegionRequirement(
p->part, 0 /*projection id*/, READ_WRITE, EXCLUSIVE, p->region));
launcher.add_field(offset + 1, FID_DATA);
// regions[2]: w_region
launcher.add_region_requirement(
RegionRequirement(v_values[p->region]->part,
0 /*projection id*/,
READ_WRITE,
EXCLUSIVE,
v_values[p->region]->region));
launcher.add_field(offset + 2, FID_DATA);
// regions[3]: m_region
launcher.add_region_requirement(
RegionRequirement(m_values[p->region]->part,
0 /*projection id*/,
READ_WRITE,
EXCLUSIVE,
m_values[p->region]->region));
launcher.add_field(offset + 3, FID_DATA);
offset += 4;
}
//update alpha, beta
for(int i = 0; i < parameters_num; i++){

// update alpha, beta
for (int i = 0; i < parameters_num; i++) {
this->next();
}
launcher.concurrent = true;
Expand All @@ -597,9 +599,9 @@ void AdamOptimizer::unified_update(std::vector<ParallelTensor> const parameters)
runtime->issue_execution_fence(ctx);
reservedWorkSpaceSize = 0;
offset = 0;
processed_parameters_num += parameters_num;
processed_parameters_num += parameters_num;
}

parameters_num = 0;
}

void AdamOptimizer::ps_update_task(Task const *task,
Expand Down Expand Up @@ -716,11 +718,11 @@ void AdamOptimizer::nccl_update_task(Task const *task,
nccl_update_task_gpu(op, meta, w_grad_ptr, size, w_ptr, v_ptr, m_ptr);
}


void AdamOptimizer::nccl_unified_update_task(Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Runtime *runtime) {
void AdamOptimizer::nccl_unified_update_task(
Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Runtime *runtime) {
// assert(regions.size() == 4);
// assert(task->regions.size() == 4);
AdamOptimizer const *op = (AdamOptimizer *)task->args;
Expand All @@ -730,7 +732,8 @@ void AdamOptimizer::nccl_unified_update_task(Task const *task,
ctx, task->regions[1].region.get_index_space());

// float const *w_grad_ptr[op->parameters_num];
// float *w_ptr[op->parameters_num], *v_ptr[op->parameters_num], *m_ptr[op->parameters_num];
// float *w_ptr[op->parameters_num], *v_ptr[op->parameters_num],
// *m_ptr[op->parameters_num];

// hipMalloc(w_grad_ptr, sizeof(float*) * op->parameters_num);
// hipMalloc(w_ptr, sizeof(float*) * op->parameters_num);
Expand All @@ -743,20 +746,40 @@ void AdamOptimizer::nccl_unified_update_task(Task const *task,
size_t *size = new size_t[op->parameters_num];
int offset = 0;

// printf("parameters_num: %d\n", op->parameters_num);

for(int i = 0; i < op->parameters_num; i++){
accWGrads[i] = helperGetGenericTensorAccessorRO(DataType::DT_FLOAT, regions[offset], task->regions[offset], FID_DATA, ctx, runtime);
accWs[i] = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT, regions[offset+1], task->regions[offset+1], FID_DATA, ctx, runtime);
accVs[i] = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT, regions[offset+2], task->regions[offset+2], FID_DATA, ctx, runtime);
accMs[i] = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT, regions[offset+3], task->regions[offset+3], FID_DATA, ctx, runtime);
offset += 4;

size[i] = accWGrads[i].domain.get_volume();
// w_grad_ptr[i] = accWGrad.get_float_ptr();
// w_ptr[i] = accW.get_float_ptr();
// v_ptr[i] = accV.get_float_ptr();
// m_ptr[i] = accM.get_float_ptr();
// printf("parameters_num: %d\n", op->parameters_num);

for (int i = 0; i < op->parameters_num; i++) {
accWGrads[i] = helperGetGenericTensorAccessorRO(DataType::DT_FLOAT,
regions[offset],
task->regions[offset],
FID_DATA,
ctx,
runtime);
accWs[i] = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT,
regions[offset + 1],
task->regions[offset + 1],
FID_DATA,
ctx,
runtime);
accVs[i] = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT,
regions[offset + 2],
task->regions[offset + 2],
FID_DATA,
ctx,
runtime);
accMs[i] = helperGetGenericTensorAccessorWO(DataType::DT_FLOAT,
regions[offset + 3],
task->regions[offset + 3],
FID_DATA,
ctx,
runtime);
offset += 4;

size[i] = accWGrads[i].domain.get_volume();
// w_grad_ptr[i] = accWGrad.get_float_ptr();
// w_ptr[i] = accW.get_float_ptr();
// v_ptr[i] = accV.get_float_ptr();
// m_ptr[i] = accM.get_float_ptr();
}
nccl_unified_update_task_gpu(op, meta, accWGrads, size, accWs, accVs, accMs);
}
Expand Down

0 comments on commit ea79317

Please sign in to comment.