Skip to content

Commit

Permalink
batch jobs in Longtail_ChangeVersion2 to handle jobs count higher tha…
Browse files Browse the repository at this point in the history
…n jobapi can handle in one go
  • Loading branch information
DanEngelbrecht committed Jan 9, 2024
1 parent d4eb67b commit 7c6cf67
Showing 1 changed file with 79 additions and 24 deletions.
103 changes: 79 additions & 24 deletions src/longtail.c
Original file line number Diff line number Diff line change
Expand Up @@ -8344,6 +8344,20 @@ static int CreateBlockWriteInfos(
return 0;
}

struct BatchProgressAPI
{
struct Longtail_ProgressAPI m_API;
struct Longtail_ProgressAPI* m_UnbatchedProgressAPI;
uint32_t m_TotalCount;
uint32_t m_CompletedCount;
};

void BatchProgressAPI_OnProgressFunc(struct Longtail_ProgressAPI* progressAPI, uint32_t total_count, uint32_t done_count)
{
struct BatchProgressAPI* api = (struct BatchProgressAPI*)progressAPI;
api->m_UnbatchedProgressAPI->OnProgress(api->m_UnbatchedProgressAPI, api->m_TotalCount, api->m_CompletedCount + done_count);
}

int Longtail_ChangeVersion2(
struct Longtail_BlockStoreAPI* block_store_api,
struct Longtail_StorageAPI* version_storage_api,
Expand Down Expand Up @@ -8510,41 +8524,82 @@ int Longtail_ChangeVersion2(
job_ctxs[i] = job;
}

Longtail_JobAPI_Group job_group = 0;
err = job_api->ReserveJobs(job_api, (uint32_t)block_write_info_count, &job_group);
uint32_t max_job_batch_count = 0;
int err = job_api->GetMaxBatchCountFunc(job_api, &max_job_batch_count, 0);
if (err)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->ReserveJobs() failed with %d", err)
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->GetMaxBatchCountFunc() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
}

Longtail_JobAPI_Jobs write_job;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)block_write_info_count, job_funcs, job_ctxs, 0, &write_job);
if (err)
struct BatchProgressAPI batch_progress;
struct Longtail_ProgressAPI* batch_progress_api = 0;
if (progress_api)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->CreateJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
}
err = job_api->ReadyJobs(job_api, (uint32_t)block_write_info_count, write_job);
if (err)
{
LONGTAIL_LOG(ctx, err == ECANCELED ? LONGTAIL_LOG_LEVEL_DEBUG : LONGTAIL_LOG_LEVEL_ERROR, "job_api->ReadyJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
batch_progress_api = Longtail_MakeProgressAPI(&batch_progress.m_API, 0, BatchProgressAPI_OnProgressFunc);
batch_progress.m_UnbatchedProgressAPI = progress_api;
batch_progress.m_CompletedCount = 0;
batch_progress.m_TotalCount = (uint32_t)block_write_info_count;
}

err = job_api->WaitForAllJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token);
if (err)
ptrdiff_t submitted_count = 0;
while (submitted_count < block_write_info_count)
{
LONGTAIL_LOG(ctx, err == ECANCELED ? LONGTAIL_LOG_LEVEL_DEBUG : LONGTAIL_LOG_LEVEL_ERROR, "job_api->WaitForAllJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
if (optional_cancel_api)
{
if (optional_cancel_api->IsCancelled(optional_cancel_api, optional_cancel_token) == ECANCELED)
{
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return ECANCELED;
}
}
ptrdiff_t submit_count = block_write_info_count - submitted_count;
if (submit_count > max_job_batch_count)
{
submit_count = max_job_batch_count;
}

Longtail_JobAPI_Group job_group = 0;
err = job_api->ReserveJobs(job_api, (uint32_t)submit_count, &job_group);
if (err)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->ReserveJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
}

Longtail_JobAPI_Jobs write_job;
err = job_api->CreateJobs(job_api, job_group, batch_progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)submit_count, &job_funcs[submitted_count], &job_ctxs[submitted_count], 0, &write_job);
if (err)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->CreateJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
}
err = job_api->ReadyJobs(job_api, (uint32_t)submit_count, write_job);
if (err)
{
LONGTAIL_LOG(ctx, err == ECANCELED ? LONGTAIL_LOG_LEVEL_DEBUG : LONGTAIL_LOG_LEVEL_ERROR, "job_api->ReadyJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
}

err = job_api->WaitForAllJobs(job_api, job_group, batch_progress_api, optional_cancel_api, optional_cancel_token);
if (err)
{
LONGTAIL_LOG(ctx, err == ECANCELED ? LONGTAIL_LOG_LEVEL_DEBUG : LONGTAIL_LOG_LEVEL_ERROR, "job_api->WaitForAllJobs() failed with %d", err)
Longtail_Free(job_mem);
SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
return err;
}
submitted_count += submit_count;
batch_progress.m_CompletedCount = (uint32_t)submitted_count;
}

SAVE_FREE_BLOCK_WRITE_INFOS(block_write_infos);
Expand Down

0 comments on commit 7c6cf67

Please sign in to comment.