diff --git a/CHANGELOG.md b/CHANGELOG.md index f2e6610b..d3442238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ ## +- **CHANGED API** `Longtail_Job_CreateJobsFunc` now takes a `channel` parameter, can be either 0 or 1, 0 has higher priority than 1 +- **FIXED** Writing content to disk now has higher priority than reading blocks from store +- **CHANGED** Reworked logic for calculating the number of blocks to read per write operation when writing multi-block assets + +## 0.3.5 - **CHANGED** Function entry logging change to DEBUG - **CHANGED** Added info-debugging of read/written files (with size) - **CHANGED** Add log context for Longtail_WriteStoredBlock and Longtail_WriteStoredBlock in non-debug build diff --git a/lib/bikeshed/longtail_bikeshed.c b/lib/bikeshed/longtail_bikeshed.c index 06a09318..6f833d82 100644 --- a/lib/bikeshed/longtail_bikeshed.c +++ b/lib/bikeshed/longtail_bikeshed.c @@ -89,10 +89,15 @@ static int32_t ThreadWorker_Execute(void* context) LONGTAIL_FATAL_ASSERT(ctx, thread_worker->stop, return 0) while (*thread_worker->stop == 0) { - if (!Bikeshed_ExecuteOne(thread_worker->shed, 0)) + if (Bikeshed_ExecuteOne(thread_worker->shed, 0)) { - Longtail_WaitSema(thread_worker->semaphore, LONGTAIL_TIMEOUT_INFINITE); + continue; + } + if (Bikeshed_ExecuteOne(thread_worker->shed, 1)) + { + continue; } + Longtail_WaitSema(thread_worker->semaphore, LONGTAIL_TIMEOUT_INFINITE); } return 0; } @@ -299,6 +304,7 @@ static int Bikeshed_CreateJobs( uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], + uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs) { #if defined(LONGTAIL_ASSERTS) @@ -384,6 +390,7 @@ static int Bikeshed_CreateJobs( } Bikeshed_ExecuteOne(bikeshed_job_api->m_Shed, 0); } + Bikeshed_SetTasksChannel(bikeshed_job_api->m_Shed, job_count, task_ids, job_channel); *out_jobs = task_ids; err = 0; @@ -484,6 +491,10 @@ static int Bikeshed_WaitForAllJobs(struct Longtail_JobAPI* job_api, Longtail_Job { continue; } + if (Bikeshed_ExecuteOne(bikeshed_job_api->m_Shed, 1)) + { + continue; + } if (old_pending_count != bikeshed_job_group->m_PendingJobCount) { old_pending_count = bikeshed_job_group->m_PendingJobCount; @@ -607,7 +618,7 @@ static int Bikeshed_Init( return err; } - job_api->m_Shed = Bikeshed_Create(Longtail_Alloc("Bikeshed", BIKESHED_SIZE(BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 1)), BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 1, &job_api->m_ReadyCallback.cb); + job_api->m_Shed = Bikeshed_Create(Longtail_Alloc("Bikeshed", BIKESHED_SIZE(BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 1)), BIKESHED_MAX_TASK_COUNT, BIKESHED_MAX_DEPENDENCY_COUNT, 2, &job_api->m_ReadyCallback.cb); if (!job_api->m_Shed) { LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "Bikeshed_Create() failed with %d", ENOMEM) diff --git a/lib/blockstorestorage/longtail_blockstorestorage.c b/lib/blockstorestorage/longtail_blockstorestorage.c index a53e6d42..3addcfdd 100644 --- a/lib/blockstorestorage/longtail_blockstorestorage.c +++ b/lib/blockstorestorage/longtail_blockstorestorage.c @@ -629,7 +629,7 @@ static int BlockStoreStorageAPI_ReadFile( ctxs[b] = &job_datas[b]; } Longtail_JobAPI_Jobs jobs; - err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, block_count, funcs, ctxs, &jobs); + err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, block_count, funcs, ctxs, 0, &jobs); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) err = job_api->ReadyJobs(job_api, block_count, jobs); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) diff --git a/lib/fsblockstore/longtail_fsblockstore.c b/lib/fsblockstore/longtail_fsblockstore.c index 7fa77cc9..de00a7d3 100644 --- a/lib/fsblockstore/longtail_fsblockstore.c +++ b/lib/fsblockstore/longtail_fsblockstore.c @@ -532,7 +532,7 @@ static int ReadContent( Longtail_JobAPI_JobFunc job_func[] = {ScanBlock}; void* ctxs[] = {job}; Longtail_JobAPI_Jobs jobs; - err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_func, ctxs, &jobs); + err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_func, ctxs, 0, &jobs); LONGTAIL_FATAL_ASSERT(ctx, !err, return err) err = job_api->ReadyJobs(job_api, 1, jobs); LONGTAIL_FATAL_ASSERT(ctx, !err, return err) diff --git a/src/longtail.c b/src/longtail.c index db466e70..82d7d185 100644 --- a/src/longtail.c +++ b/src/longtail.c @@ -457,7 +457,7 @@ struct Longtail_JobAPI* Longtail_MakeJobAPI( uint32_t Longtail_Job_GetWorkerCount(struct Longtail_JobAPI* job_api) { return job_api->GetWorkerCount(job_api); } int Longtail_Job_ReserveJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Group* out_job_group) { return job_api->ReserveJobs(job_api, job_count, out_job_group); } -int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], Longtail_JobAPI_Jobs* out_jobs) { return job_api->CreateJobs(job_api, job_group, progressAPI, optional_cancel_api, optional_cancel_token, job_count, job_funcs, job_contexts, out_jobs); } +int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs) { return job_api->CreateJobs(job_api, job_group, progressAPI, optional_cancel_api, optional_cancel_token, job_count, job_funcs, job_contexts, job_channel, out_jobs); } int Longtail_Job_AddDependecies(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs, uint32_t dependency_job_count, Longtail_JobAPI_Jobs dependency_jobs) { return job_api->AddDependecies(job_api, job_count, jobs, dependency_job_count, dependency_jobs); } int Longtail_Job_ReadyJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs) { return job_api->ReadyJobs(job_api, job_count, jobs); } int Longtail_Job_WaitForAllJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token) { return job_api->WaitForAllJobs(job_api, job_group, progressAPI, optional_cancel_api, optional_cancel_token); } @@ -2141,7 +2141,7 @@ static int ChunkAssets( if (jobs_prepared + asset_part_count > max_job_batch_count) { Longtail_JobAPI_Jobs jobs; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], &jobs); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], 0, &jobs); LONGTAIL_FATAL_ASSERT(ctx, !err, return err) err = job_api->ReadyJobs(job_api, (uint32_t)jobs_prepared, jobs); LONGTAIL_FATAL_ASSERT(ctx, !err, return err) @@ -2182,7 +2182,7 @@ static int ChunkAssets( if (jobs_prepared > 0) { Longtail_JobAPI_Jobs jobs; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], &jobs); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, (uint32_t)jobs_prepared, &funcs[jobs_submitted], &ctxs[jobs_submitted], 0, &jobs); LONGTAIL_FATAL_ASSERT(ctx, !err, return err) err = job_api->ReadyJobs(job_api, (uint32_t)jobs_prepared, jobs); LONGTAIL_FATAL_ASSERT(ctx, !err, return err) @@ -4231,7 +4231,7 @@ int Longtail_WriteContent( uint32_t jobs_left = job_count - jobs_submitted; uint32_t jobs_batch = jobs_left > max_job_batch_count ? max_job_batch_count : jobs_left; Longtail_JobAPI_Jobs jobs; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, jobs_batch, &funcs[jobs_submitted], &ctxs[jobs_submitted], &jobs); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, jobs_batch, &funcs[jobs_submitted], &ctxs[jobs_submitted], 0, &jobs); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) err = job_api->ReadyJobs(job_api, jobs_batch, jobs); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) @@ -4392,6 +4392,21 @@ struct WritePartialAssetFromBlocksJob int WritePartialAssetFromBlocks(void* context, uint32_t job_id, int is_cancelled); +static uint32_t GetMaxParallelBlockReadJobs(struct Longtail_JobAPI* job_api) +{ + uint32_t worker_count = job_api->GetWorkerCount(job_api); + if (worker_count > 2) + { + worker_count--; + } + else if (worker_count == 0) + { + worker_count = 1; + } + const uint32_t max_parallell_block_read_jobs = worker_count < MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE ? worker_count : MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE; + return max_parallell_block_read_jobs; +} + // Returns the write sync task, or the write task if there is no need for reading new blocks static int CreatePartialAssetWriteJob( struct Longtail_BlockStoreAPI* block_store_api, @@ -4467,8 +4482,7 @@ static int CreatePartialAssetWriteJob( Longtail_JobAPI_JobFunc block_read_funcs[MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE]; void* block_read_ctx[MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE]; - const uint32_t worker_count = job_api->GetWorkerCount(job_api) + 1; - const uint32_t max_parallell_block_read_jobs = worker_count < MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE ? worker_count : MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE; + const uint32_t max_parallell_block_read_jobs = GetMaxParallelBlockReadJobs(job_api); while (chunk_index_offset != chunk_index_end && job->m_BlockReaderJobCount <= max_parallell_block_read_jobs) { @@ -4513,7 +4527,7 @@ static int CreatePartialAssetWriteJob( Longtail_JobAPI_JobFunc write_funcs[1] = { WritePartialAssetFromBlocks }; void* write_ctx[1] = { job }; Longtail_JobAPI_Jobs write_job; - int err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, write_funcs, write_ctx, &write_job); + int err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, write_funcs, write_ctx, 0, &write_job); if (err) { LONGTAIL_LOG(ctx, LONGTAIL_LOG_LEVEL_ERROR, "job_api->CreateJobs() failed with %d", err) @@ -4523,12 +4537,12 @@ static int CreatePartialAssetWriteJob( if (job->m_BlockReaderJobCount > 0) { Longtail_JobAPI_Jobs block_read_jobs; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, job->m_BlockReaderJobCount, block_read_funcs, block_read_ctx, &block_read_jobs); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, job->m_BlockReaderJobCount, block_read_funcs, block_read_ctx, 1, &block_read_jobs); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) Longtail_JobAPI_JobFunc sync_write_funcs[1] = { WriteReady }; void* sync_write_ctx[1] = { 0 }; Longtail_JobAPI_Jobs write_sync_job; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, sync_write_funcs, sync_write_ctx, &write_sync_job); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, sync_write_funcs, sync_write_ctx, 0, &write_sync_job); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) err = job_api->AddDependecies(job_api, 1, write_job, 1, write_sync_job); @@ -5606,8 +5620,7 @@ static int WriteAssets( } #endif // defined(LONGTAIL_ASSERTS) - const uint32_t worker_count = job_api->GetWorkerCount(job_api) + 1; - const uint32_t max_parallell_block_read_jobs = worker_count < MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE ? worker_count : MAX_BLOCKS_PER_PARTIAL_ASSET_WRITE; + const uint32_t max_parallell_block_read_jobs = GetMaxParallelBlockReadJobs(job_api); uint32_t asset_job_count = 0; for (uint32_t a = 0; a < awl->m_AssetJobCount; ++a) @@ -5742,7 +5755,7 @@ static int WriteAssets( Longtail_JobAPI_JobFunc block_read_funcs[1] = { BlockReader }; void* block_read_ctxs[1] = {block_job}; Longtail_JobAPI_Jobs block_read_job; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, block_read_funcs, block_read_ctxs, &block_read_job); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, block_read_funcs, block_read_ctxs, 1, &block_read_job); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) job->m_VersionStorageAPI = version_storage_api; @@ -5774,7 +5787,7 @@ static int WriteAssets( void* ctxs[1] = { job }; Longtail_JobAPI_Jobs block_write_job; - err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, funcs, ctxs, &block_write_job); + err = job_api->CreateJobs(job_api, job_group, progress_api, optional_cancel_api, optional_cancel_token, 1, funcs, ctxs, 0, &block_write_job); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) err = job_api->AddDependecies(job_api, 1, block_write_job, 1, block_read_job); LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err) diff --git a/src/longtail.h b/src/longtail.h index b9f413ac..d9ac5c25 100644 --- a/src/longtail.h +++ b/src/longtail.h @@ -477,7 +477,7 @@ typedef void* Longtail_JobAPI_Group; typedef uint32_t (*Longtail_Job_GetWorkerCountFunc)(struct Longtail_JobAPI* job_api); typedef int (*Longtail_Job_ReserveJobsFunc)(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Group* out_job_group); -typedef int (*Longtail_Job_CreateJobsFunc)(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], Longtail_JobAPI_Jobs* out_jobs); +typedef int (*Longtail_Job_CreateJobsFunc)(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs); typedef int (*Longtail_Job_AddDependeciesFunc)(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs, uint32_t dependency_job_count, Longtail_JobAPI_Jobs dependency_jobs); typedef int (*Longtail_Job_ReadyJobsFunc)(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs); typedef int (*Longtail_Job_WaitForAllJobsFunc)(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token); @@ -513,7 +513,7 @@ struct Longtail_JobAPI* Longtail_MakeJobAPI( LONGTAIL_EXPORT uint32_t Longtail_Job_GetWorkerCount(struct Longtail_JobAPI* job_api); LONGTAIL_EXPORT int Longtail_Job_ReserveJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Group* out_job_group); -LONGTAIL_EXPORT int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], Longtail_JobAPI_Jobs* out_jobs); +LONGTAIL_EXPORT int Longtail_Job_CreateJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token, uint32_t job_count, Longtail_JobAPI_JobFunc job_funcs[], void* job_contexts[], uint8_t job_channel, Longtail_JobAPI_Jobs* out_jobs); LONGTAIL_EXPORT int Longtail_Job_AddDependecies(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs, uint32_t dependency_job_count, Longtail_JobAPI_Jobs dependency_jobs); LONGTAIL_EXPORT int Longtail_Job_ReadyJobs(struct Longtail_JobAPI* job_api, uint32_t job_count, Longtail_JobAPI_Jobs jobs); LONGTAIL_EXPORT int Longtail_Job_WaitForAllJobs(struct Longtail_JobAPI* job_api, Longtail_JobAPI_Group job_group, struct Longtail_ProgressAPI* progressAPI, struct Longtail_CancelAPI* optional_cancel_api, Longtail_CancelAPI_HCancelToken optional_cancel_token); diff --git a/test/test.cpp b/test/test.cpp index fbded6aa..4f17445b 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -4661,7 +4661,7 @@ TEST(Longtail, TestCreateVersionCancelOperation) void* job_ctxs[1] = {&job_context}; Longtail_JobAPI_Jobs jobs; - ASSERT_EQ(0, job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_funcs, job_ctxs, &jobs)); + ASSERT_EQ(0, job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_funcs, job_ctxs, 0, &jobs)); ASSERT_EQ(0, cancel_api->Cancel(cancel_api, cancel_token)); ASSERT_EQ(0, job_api->ReadyJobs(job_api, 1, jobs)); ASSERT_EQ(0, Longtail_PostSema(sema, 1));