Skip to content

Commit

Permalink
reworked pool logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Apr 5, 2024
1 parent fab4c47 commit 1803879
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 64 deletions.
89 changes: 31 additions & 58 deletions src/ustreamer/workers.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/list.h"


static void *_worker_thread(void *v_worker);
Expand All @@ -53,13 +54,13 @@ us_workers_pool_s *us_workers_pool_init(
atomic_init(&pool->stop, false);

pool->n_workers = n_workers;
US_CALLOC(pool->workers, pool->n_workers);

US_MUTEX_INIT(pool->free_workers_mutex);
US_COND_INIT(pool->free_workers_cond);

for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
us_worker_s *wr;
US_CALLOC(wr, 1);

wr->number = index;
US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index);
Expand All @@ -73,6 +74,8 @@ us_workers_pool_s *us_workers_pool_init(

US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr);
pool->free_workers += 1;

US_LIST_APPEND(pool->workers, wr);
}
return pool;
}
Expand All @@ -81,9 +84,7 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_LOG_INFO("Destroying workers pool %s ...", pool->name);

atomic_store(&pool->stop, true);
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];

US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true); // Final job: die
US_MUTEX_UNLOCK(wr->has_job_mutex);
Expand All @@ -93,83 +94,56 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_MUTEX_DESTROY(wr->has_job_mutex);
US_COND_DESTROY(wr->has_job_cond);

free(wr->name);

pool->job_destroy(wr->job);
}

free(wr->name);
free(wr);
});

US_MUTEX_DESTROY(pool->free_workers_mutex);
US_COND_DESTROY(pool->free_workers_cond);

free(pool->workers);
free(pool);
}

us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
us_worker_s *ready_wr = NULL;

US_MUTEX_LOCK(pool->free_workers_mutex);
US_COND_WAIT_FOR(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex);
US_MUTEX_UNLOCK(pool->free_workers_mutex);

if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
ready_wr = pool->oldest_wr;
ready_wr->job_timely = true;
} else {
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
if (
!atomic_load(&wr->has_job) && (
ready_wr == NULL
|| ready_wr->job_start_ts < wr->job_start_ts
)
) {
ready_wr = wr;
break;
}
us_worker_s *found = NULL;
US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
if (!atomic_load(&wr->has_job) && (found == NULL || found->job_start_ts <= wr->job_start_ts)) {
found = wr;
}
assert(ready_wr != NULL);
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
});
assert(found != NULL);
US_LIST_REMOVE(pool->workers, found);
US_LIST_APPEND(pool->workers, found); // Перемещаем в конец списка

found->job_timely = (found->job_start_ts > pool->job_timely_ts);
if (found->job_timely) {
pool->job_timely_ts = found->job_start_ts;
}
return ready_wr;
return found;
}

void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/) {
if (pool->oldest_wr == ready_wr) {
pool->oldest_wr = pool->oldest_wr->next_wr;
}
if (pool->oldest_wr == NULL) {
pool->oldest_wr = ready_wr;
pool->latest_wr = pool->oldest_wr;
} else {
if (ready_wr->next_wr != NULL) {
ready_wr->next_wr->prev_wr = ready_wr->prev_wr;
}
if (ready_wr->prev_wr != NULL) {
ready_wr->prev_wr->next_wr = ready_wr->next_wr;
}
ready_wr->prev_wr = pool->latest_wr;
pool->latest_wr->next_wr = ready_wr;
pool->latest_wr = ready_wr;
}
pool->latest_wr->next_wr = NULL;

US_MUTEX_LOCK(ready_wr->has_job_mutex);
//ready_wr->job = job;
atomic_store(&ready_wr->has_job, true);
US_MUTEX_UNLOCK(ready_wr->has_job_mutex);
US_COND_SIGNAL(ready_wr->has_job_cond);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *wr) {
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true);
US_MUTEX_UNLOCK(wr->has_job_mutex);
US_COND_SIGNAL(wr->has_job_cond);

US_MUTEX_LOCK(pool->free_workers_mutex);
pool->free_workers -= 1;
US_MUTEX_UNLOCK(pool->free_workers_mutex);
}

ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + wr->last_job_time * 0.1;

US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
pool->name, pool->approx_job_time, approx_job_time, wr->last_job_time);

pool->approx_job_time = approx_job_time;

Expand Down Expand Up @@ -203,7 +177,6 @@ static void *_worker_thread(void *v_worker) {
wr->job_start_ts = job_start_ts;
wr->last_job_time = us_get_now_monotonic() - wr->job_start_ts;
}
//wr->job = NULL;
atomic_store(&wr->has_job, false);
}

Expand Down
11 changes: 5 additions & 6 deletions src/ustreamer/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <pthread.h>

#include "../libs/types.h"
#include "../libs/list.h"


typedef struct us_worker_sx {
Expand All @@ -44,10 +45,9 @@ typedef struct us_worker_sx {
ldf job_start_ts;
pthread_cond_t has_job_cond;

struct us_worker_sx *prev_wr;
struct us_worker_sx *next_wr;

struct us_workers_pool_sx *pool;

US_LIST_DECLARE;
} us_worker_s;

typedef void *(*us_workers_pool_job_init_f)(void *arg);
Expand All @@ -63,8 +63,7 @@ typedef struct us_workers_pool_sx {

uint n_workers;
us_worker_s *workers;
us_worker_s *oldest_wr;
us_worker_s *latest_wr;
ldf job_timely_ts;

ldf approx_job_time;

Expand All @@ -85,6 +84,6 @@ us_workers_pool_s *us_workers_pool_init(
void us_workers_pool_destroy(us_workers_pool_s *pool);

us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr);

ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);

0 comments on commit 1803879

Please sign in to comment.