Skip to content

Commit

Permalink
Writing content to disk now has higher priority than reading blocks f…
Browse files Browse the repository at this point in the history
…rom store (#206)

* - **CHANGED API** `Longtail_Job_CreateJobsFunc` now takes a `channel` parameter, can be either 0 or 1, 0 has higher priority than 1
- **FIXED** Writing content to disk now has higher priority than reading blocks from store
* **FIXED** reworked max_parallell_block_read_jobs for CreatePartialAssetWriteJob
changelog
  • Loading branch information
DanEngelbrecht authored Sep 2, 2022
1 parent f0a9685 commit ee53715
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 21 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
##
- **CHANGED API** `Longtail_Job_CreateJobsFunc` now takes a `channel` parameter, can be either 0 or 1, 0 has higher priority than 1
- **FIXED** Writing content to disk now has higher priority than reading blocks from store
- **CHANGED** Reworked logic for calculating the number of blocks to read per write operation when writing multi-block assets

## 0.3.5
- **CHANGED** Function entry logging change to DEBUG
- **CHANGED** Added info-debugging of read/written files (with size)
- **CHANGED** Add log context for Longtail_WriteStoredBlock and Longtail_WriteStoredBlock in non-debug build
Expand Down
17 changes: 14 additions & 3 deletions lib/bikeshed/longtail_bikeshed.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ static int32_t ThreadWorker_Execute(void* context)
LONGTAIL_FATAL_ASSERT(ctx, thread_worker->stop, return 0)
while (*thread_worker->stop == 0)
{
if (!Bikeshed_ExecuteOne(thread_worker->shed, 0))
if (Bikeshed_ExecuteOne(thread_worker->shed, 0))
{
Longtail_WaitSema(thread_worker->semaphore, LONGTAIL_TIMEOUT_INFINITE);
continue;
}
if (Bikeshed_ExecuteOne(thread_worker->shed, 1))
{
continue;
}
Longtail_WaitSema(thread_worker->semaphore, LONGTAIL_TIMEOUT_INFINITE);
}
return 0;
}
Expand Down Expand Up @@ -299,6 +304,7 @@ static int Bikeshed_CreateJobs(
uint32_t job_count,
Longtail_JobAPI_JobFunc job_funcs[],
void* job_contexts[],
uint8_t job_channel,
Longtail_JobAPI_Jobs* out_jobs)
{
#if defined(LONGTAIL_ASSERTS)
Expand Down Expand Up @@ -384,6 +390,7 @@ static int Bikeshed_CreateJobs(
}
Bikeshed_ExecuteOne(bikeshed_job_api->m_Shed, 0);
}
Bikeshed_SetTasksChannel(bikeshed_job_api->m_Shed, job_count, task_ids, job_channel);

*out_jobs = task_ids;
err = 0;
Expand Down Expand Up @@ -484,6 +491,10 @@ static int Bikeshed_WaitForAllJobs(struct Longtail_JobAPI* job_api, Longtail_Job
{
continue;
}
if (Bikeshed_ExecuteOne(bikeshed_job_api->m_Shed, 1))
{
continue;
}
if (old_pending_count != bikeshed_job_group->m_PendingJobCount)
{
old_pending_count = bikeshed_job_group->m_PendingJobCount;
Expand Down Expand Up @@ -607,7 +618,7 @@ static int Bikeshed_Init(
return err;
}

job_api->m_Shed = Bikeshed_Create(Longtail_Alloc("Bikeshed", BIKESHED_SIZE(BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 1)), BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 1, &job_api->m_ReadyCallback.cb);
job_api->m_Shed = Bikeshed_Create(Longtail_Alloc("Bikeshed", BIKESHED_SIZE(BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 1)), BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 2, &job_api->m_ReadyCallback.cb);
if (!job_api->m_Shed)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "Bikeshed_Create() failed with %d", ENOMEM)
Expand Down
2 changes: 1 addition & 1 deletion lib/blockstorestorage/longtail_blockstorestorage.c
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ static int BlockStoreStorageAPI_ReadFile(
ctxs[b] = &job_datas[b];
}
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, block_count, funcs, ctxs, &jobs);
err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, block_count, funcs, ctxs, 0, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
err = job_api->ReadyJobs(job_api, block_count, jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
Expand Down
2 changes: 1 addition & 1 deletion lib/fsblockstore/longtail_fsblockstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ static int ReadContent(
Longtail_JobAPI_JobFunc job_func[] = {ScanBlock};
void* ctxs[] = {job};
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_func, ctxs, &jobs);
err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_func, ctxs, 0, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
err = job_api->ReadyJobs(job_api, 1, jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
Expand Down
39 changes: 26 additions & 13 deletions src/longtail.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ struct Longtail_JobAPI* Longtail_MakeJobAPI(

uint32_t Longtail_Job_GetWorkerCount(struct Longtail_JobAPI* job_api) { return job_api->GetWorkerCount(job_api); }
int Longtail_Job_ReserveJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Group* out_job_group) { return job_api->ReserveJobs(job_api, job_count, out_job_group); }
int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], Longtail_JobAPI_Jobs* out_jobs) { return job_api->CreateJobs(job_api, job_group, progressAPI, optional_cancel_api, optional_cancel_token, job_count, job_funcs, job_contexts, out_jobs); }
int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs) { return job_api->CreateJobs(job_api, job_group, progressAPI, optional_cancel_api, optional_cancel_token, job_count, job_funcs, job_contexts, job_channel, out_jobs); }
int Longtail_Job_AddDependecies(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs, uint32_t dependency_job_count, Longtail_JobAPI_Jobs dependency_jobs) { return job_api->AddDependecies(job_api, job_count, jobs, dependency_job_count, dependency_jobs); }
int Longtail_Job_ReadyJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs) { return job_api->ReadyJobs(job_api, job_count, jobs); }
int Longtail_Job_WaitForAllJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token) { return job_api->WaitForAllJobs(job_api, job_group, progressAPI, optional_cancel_api, optional_cancel_token); }
Expand Down Expand Up @@ -2141,7 +2141,7 @@ static int ChunkAssets(
if (jobs_prepared + asset_part_count > max_job_batch_count)
{
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], &jobs);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], 0, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
err = job_api->ReadyJobs(job_api, (uint32_t)jobs_prepared, jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
Expand Down Expand Up @@ -2182,7 +2182,7 @@ static int ChunkAssets(
if (jobs_prepared > 0)
{
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], &jobs);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], 0, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
err = job_api->ReadyJobs(job_api, (uint32_t)jobs_prepared, jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
Expand Down Expand Up @@ -4231,7 +4231,7 @@ int Longtail_WriteContent(
uint32_t jobs_left = job_count - jobs_submitted;
uint32_t jobs_batch = jobs_left > max_job_batch_count ? max_job_batch_count : jobs_left;
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, jobs_batch, &funcs[jobs_submitted], &ctxs[jobs_submitted], &jobs);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, jobs_batch, &funcs[jobs_submitted], &ctxs[jobs_submitted], 0, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
err = job_api->ReadyJobs(job_api, jobs_batch, jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
Expand Down Expand Up @@ -4392,6 +4392,21 @@ struct WritePartialAssetFromBlocksJob

int WritePartialAssetFromBlocks(void* context, uint32_t job_id, int is_cancelled);

static uint32_t GetMaxParallelBlockReadJobs(struct Longtail_JobAPI* job_api)
{
uint32_t worker_count = job_api->GetWorkerCount(job_api);
if (worker_count > 2)
{
worker_count--;
}
else if (worker_count == 0)
{
worker_count = 1;
}
const uint32_t max_parallell_block_read_jobs = worker_count < MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE ? worker_count : MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE;
return max_parallell_block_read_jobs;
}

// Returns the write sync task, or the write task if there is no need for reading new blocks
static int CreatePartialAssetWriteJob(
struct Longtail_BlockStoreAPI* block_store_api,
Expand Down Expand Up @@ -4467,8 +4482,7 @@ static int CreatePartialAssetWriteJob(
Longtail_JobAPI_JobFunc block_read_funcs[MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE];
void* block_read_ctx[MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE];

const uint32_t worker_count = job_api->GetWorkerCount(job_api) + 1;
const uint32_t max_parallell_block_read_jobs = worker_count < MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE ? worker_count : MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE;
const uint32_t max_parallell_block_read_jobs = GetMaxParallelBlockReadJobs(job_api);

while (chunk_index_offset != chunk_index_end && job->m_BlockReaderJobCount <= max_parallell_block_read_jobs)
{
Expand Down Expand Up @@ -4513,7 +4527,7 @@ static int CreatePartialAssetWriteJob(
Longtail_JobAPI_JobFunc write_funcs[1] = { WritePartialAssetFromBlocks };
void* write_ctx[1] = { job };
Longtail_JobAPI_Jobs write_job;
int err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, write_funcs, write_ctx, &write_job);
int err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, write_funcs, write_ctx, 0, &write_job);
if (err)
{
LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->CreateJobs() failed with %d", err)
Expand All @@ -4523,12 +4537,12 @@ static int CreatePartialAssetWriteJob(
if (job->m_BlockReaderJobCount > 0)
{
Longtail_JobAPI_Jobs block_read_jobs;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, job->m_BlockReaderJobCount, block_read_funcs, block_read_ctx, &block_read_jobs);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, job->m_BlockReaderJobCount, block_read_funcs, block_read_ctx, 1, &block_read_jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
Longtail_JobAPI_JobFunc sync_write_funcs[1] = { WriteReady };
void* sync_write_ctx[1] = { 0 };
Longtail_JobAPI_Jobs write_sync_job;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, sync_write_funcs, sync_write_ctx, &write_sync_job);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, sync_write_funcs, sync_write_ctx, 0, &write_sync_job);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)

err = job_api->AddDependecies(job_api, 1, write_job, 1, write_sync_job);
Expand Down Expand Up @@ -5606,8 +5620,7 @@ static int WriteAssets(
}
#endif // defined(LONGTAIL_ASSERTS)

const uint32_t worker_count = job_api->GetWorkerCount(job_api) + 1;
const uint32_t max_parallell_block_read_jobs = worker_count < MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE ? worker_count : MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE;
const uint32_t max_parallell_block_read_jobs = GetMaxParallelBlockReadJobs(job_api);

uint32_t asset_job_count = 0;
for (uint32_t a = 0; a < awl->m_AssetJobCount; ++a)
Expand Down Expand Up @@ -5742,7 +5755,7 @@ static int WriteAssets(
Longtail_JobAPI_JobFunc block_read_funcs[1] = { BlockReader };
void* block_read_ctxs[1] = {block_job};
Longtail_JobAPI_Jobs block_read_job;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, block_read_funcs, block_read_ctxs, &block_read_job);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, block_read_funcs, block_read_ctxs, 1, &block_read_job);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)

job->m_VersionStorageAPI = version_storage_api;
Expand Down Expand Up @@ -5774,7 +5787,7 @@ static int WriteAssets(
void* ctxs[1] = { job };

Longtail_JobAPI_Jobs block_write_job;
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, funcs, ctxs, &block_write_job);
err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, funcs, ctxs, 0, &block_write_job);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
err = job_api->AddDependecies(job_api, 1, block_write_job, 1, block_read_job);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
Expand Down
4 changes: 2 additions & 2 deletions src/longtail.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ typedef void* Longtail_JobAPI_Group;

typedef uint32_t (*Longtail_Job_GetWorkerCountFunc)(struct Longtail_JobAPI* job_api);
typedef int (*Longtail_Job_ReserveJobsFunc)(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Group* out_job_group);
typedef int (*Longtail_Job_CreateJobsFunc)(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], Longtail_JobAPI_Jobs* out_jobs);
typedef int (*Longtail_Job_CreateJobsFunc)(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs);
typedef int (*Longtail_Job_AddDependeciesFunc)(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs, uint32_t dependency_job_count, Longtail_JobAPI_Jobs dependency_jobs);
typedef int (*Longtail_Job_ReadyJobsFunc)(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs);
typedef int (*Longtail_Job_WaitForAllJobsFunc)(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token);
Expand Down Expand Up @@ -513,7 +513,7 @@ struct Longtail_JobAPI* Longtail_MakeJobAPI(

LONGTAIL_EXPORT uint32_t Longtail_Job_GetWorkerCount(struct Longtail_JobAPI* job_api);
LONGTAIL_EXPORT int Longtail_Job_ReserveJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Group* out_job_group);
LONGTAIL_EXPORT int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], Longtail_JobAPI_Jobs* out_jobs);
LONGTAIL_EXPORT int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs);
LONGTAIL_EXPORT int Longtail_Job_AddDependecies(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs, uint32_t dependency_job_count, Longtail_JobAPI_Jobs dependency_jobs);
LONGTAIL_EXPORT int Longtail_Job_ReadyJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs);
LONGTAIL_EXPORT int Longtail_Job_WaitForAllJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token);
Expand Down
2 changes: 1 addition & 1 deletion test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4661,7 +4661,7 @@ TEST(Longtail, TestCreateVersionCancelOperation)
void* job_ctxs[1] = {&job_context};
Longtail_JobAPI_Jobs jobs;

ASSERT_EQ(0, job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_funcs, job_ctxs, &jobs));
ASSERT_EQ(0, job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_funcs, job_ctxs, 0, &jobs));
ASSERT_EQ(0, cancel_api->Cancel(cancel_api, cancel_token));
ASSERT_EQ(0, job_api->ReadyJobs(job_api, 1, jobs));
ASSERT_EQ(0, Longtail_PostSema(sema, 1));
Expand Down

0 comments on commit ee53715

Please sign in to comment.