Skip to content

Commit

Permalink
Fix/better progress when many jobs (#177)
Browse files Browse the repository at this point in the history
* Add Longtail_GetCurrentThreadId()
* Changed Longtail_Job_CreateJobsFunc to take progress and cancel token
* Update bikeshed wrapper to handle progress and cancel token
* Adapt blockstorestorage and fsblockstore to new jobs api changes
* release notes
* master -> main branch fixup
  • Loading branch information
DanEngelbrecht authored Jan 16, 2022
1 parent 7029cdc commit bacfad3
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 137 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/create-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ jobs:
release_name: Release ${{ env.RELEASE_VERSION }}
body: |
# Changes in this Release
- **FIX** Windows platform now handles non-ascii file names
- **CHANGED API** JobAPI.CreateJobsFunc() now takes progress and cancel options. progress callback will only be called if it is same thread that made ReserveJobs
- **ADDED** Longtail_GetCurrentThreadId()
- **FIX** Smoother progress when indexing folders with many files
draft: false
prerelease: false
files: "*-x64.zip"
2 changes: 1 addition & 1 deletion .github/workflows/master-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Build Master

on:
push:
branches: [ master ]
branches: [ main ]

jobs:
linux:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Validate PR

on:
pull_request:
branches: [ master ]
branches: [ main ]

jobs:
linux:
Expand Down
25 changes: 23 additions & 2 deletions lib/bikeshed/longtail_bikeshed.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

#include <errno.h>

#define BIKESHED_MAX_TASK_COUNT 131072
#define BIKESHED_MAX_DEPENDENCY_COUNT 458752
#define BIKESHED_MAX_TASK_COUNT 65536
#define BIKESHED_MAX_DEPENDENCY_COUNT 262144

struct ReadyCallback
{
Expand Down Expand Up @@ -172,6 +172,7 @@ struct Bikeshed_JobAPI_Group
struct BikeshedJobAPI* m_API;
struct JobWrapper* m_ReservedJobs;
Bikeshed_TaskID* m_ReservedTasksIDs;
uint64_t m_ReservingThreadId;
uint32_t m_ReservedJobCount;
int32_t volatile m_Cancelled;
int32_t volatile m_SubmittedJobCount;
Expand Down Expand Up @@ -206,6 +207,7 @@ struct Bikeshed_JobAPI_Group* CreateJobGroup(struct BikeshedJobAPI* job_api, uin
job_group->m_ReservedTasksIDs = (Bikeshed_TaskID*)p;
p += sizeof(Bikeshed_TaskID) * job_count;
job_group->m_API = job_api;
job_group->m_ReservingThreadId = Longtail_GetCurrentThreadId();
job_group->m_ReservedJobCount = job_count;
job_group->m_Cancelled = 0;
job_group->m_PendingJobCount = 0;
Expand Down Expand Up @@ -291,6 +293,9 @@ static int Bikeshed_ReserveJobs(struct Longtail_JobAPI* job_api, uint32_t job_co
static int Bikeshed_CreateJobs(
struct Longtail_JobAPI* job_api,
Longtail_JobAPI_Group job_group,
struct Longtail_ProgressAPI* progressAPI,
struct Longtail_CancelAPI* optional_cancel_api,
Longtail_CancelAPI_HCancelToken optional_cancel_token,
uint32_t job_count,
Longtail_JobAPI_JobFunc job_funcs[],
void* job_contexts[],
Expand Down Expand Up @@ -325,6 +330,8 @@ static int Bikeshed_CreateJobs(
sizeof(BikeShed_TaskFunc) * job_count +
sizeof(void*) * job_count;

int is_reserve_thread = bikeshed_job_group->m_ReservingThreadId == Longtail_GetCurrentThreadId();

int32_t new_job_count = Longtail_AtomicAdd32(&bikeshed_job_group->m_SubmittedJobCount, (int32_t)job_count);
LONGTAIL_FATAL_ASSERT(ctx, new_job_count > 0, return EINVAL);
if (new_job_count > (int32_t)bikeshed_job_group->m_ReservedJobCount)
Expand Down Expand Up @@ -359,6 +366,20 @@ static int Bikeshed_CreateJobs(

while (!Bikeshed_CreateTasks(bikeshed_job_api->m_Shed, job_count, funcs, ctxs, task_ids))
{
if (bikeshed_job_group->m_Cancelled == 0)
{
if (progressAPI && is_reserve_thread)
{
progressAPI->OnProgress(progressAPI,(uint32_t)bikeshed_job_group->m_ReservedJobCount, (uint32_t)bikeshed_job_group->m_JobsCompleted);
}
if (optional_cancel_api && optional_cancel_token)
{
if (optional_cancel_api->IsCancelled(optional_cancel_api, optional_cancel_token) == ECANCELED)
{
Longtail_AtomicAdd32(&bikeshed_job_group->m_Cancelled, 1);
}
}
}
Bikeshed_ExecuteOne(bikeshed_job_api->m_Shed, 0);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/blockstorestorage/longtail_blockstorestorage.c
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ static int BlockStoreStorageAPI_ReadFile(
ctxs[b] = &job_datas[b];
}
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, block_count, funcs, ctxs, &jobs);
err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, block_count, funcs, ctxs, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
err = job_api->ReadyJobs(job_api, block_count, jobs);
LONGTAIL_FATAL_ASSERT(ctx, err == 0, return err)
Expand Down
2 changes: 1 addition & 1 deletion lib/fsblockstore/longtail_fsblockstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ static int ReadContent(
Longtail_JobAPI_JobFunc job_func[] = {ScanBlock};
void* ctxs[] = {job};
Longtail_JobAPI_Jobs jobs;
err = job_api->CreateJobs(job_api, job_group, 1, job_func, ctxs, &jobs);
err = job_api->CreateJobs(job_api, job_group, 0, 0, 0, 1, job_func, ctxs, &jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
err = job_api->ReadyJobs(job_api, 1, jobs);
LONGTAIL_FATAL_ASSERT(ctx, !err, return err)
Expand Down
12 changes: 12 additions & 0 deletions lib/longtail_platform.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ void Longtail_DeleteThread(HLongtail_Thread thread)
thread->m_Handle = INVALID_HANDLE_VALUE;
}

uint64_t Longtail_GetCurrentThreadId()
{
return (uint64_t)GetCurrentThreadId();
}


struct Longtail_Sema
{
HANDLE m_Handle;
Expand Down Expand Up @@ -1213,6 +1219,12 @@ void Longtail_DeleteThread(HLongtail_Thread thread)
pthread_mutex_destroy(&thread->m_ExitLock);
thread->m_Handle = 0;
}

uint64_t Longtail_GetCurrentThreadId()
{
return (uint64_t)pthread_self();
}

/*
struct stat path_stat;
int err = stat(path, &path_stat);
Expand Down
9 changes: 5 additions & 4 deletions lib/longtail_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ typedef struct Longtail_Thread* HLongtail_Thread;

typedef int (*Longtail_ThreadFunc)(void* context_data);

size_t Longtail_GetThreadSize();
int Longtail_CreateThread(void* mem, Longtail_ThreadFunc thread_func, size_t stack_size, void* context_data, int priority, HLongtail_Thread* out_thread);
int Longtail_JoinThread(HLongtail_Thread thread, uint64_t timeout_us);
void Longtail_DeleteThread(HLongtail_Thread thread);
size_t Longtail_GetThreadSize();
int Longtail_CreateThread(void* mem, Longtail_ThreadFunc thread_func, size_t stack_size, void* context_data, int priority, HLongtail_Thread* out_thread);
int Longtail_JoinThread(HLongtail_Thread thread, uint64_t timeout_us);
void Longtail_DeleteThread(HLongtail_Thread thread);
uint64_t Longtail_GetCurrentThreadId();

typedef struct Longtail_Sema* HLongtail_Sema;
size_t Longtail_GetSemaSize();
Expand Down
Loading

0 comments on commit bacfad3

Please sign in to comment.