From 0d974a5faf2c42bfffb768deb238f92c98dce386 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Thu, 4 Apr 2024 19:32:45 +0300 Subject: [PATCH] refactoring --- src/ustreamer/workers.c | 76 ++++++++++++++++++++++------------------- src/ustreamer/workers.h | 29 +++++++--------- 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/src/ustreamer/workers.c b/src/ustreamer/workers.c index d82745d3..a86f081e 100644 --- a/src/ustreamer/workers.c +++ b/src/ustreamer/workers.c @@ -22,12 +22,21 @@ #include "workers.h" +#include + +#include + +#include "../libs/types.h" +#include "../libs/tools.h" +#include "../libs/threading.h" +#include "../libs/logging.h" + static void *_worker_thread(void *v_worker); us_workers_pool_s *us_workers_pool_init( - const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval, + const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval, us_workers_pool_job_init_f job_init, void *job_init_arg, us_workers_pool_job_destroy_f job_destroy, us_workers_pool_run_job_f run_job) { @@ -49,23 +58,21 @@ us_workers_pool_s *us_workers_pool_init( US_MUTEX_INIT(pool->free_workers_mutex); US_COND_INIT(pool->free_workers_cond); - for (unsigned number = 0; number < pool->n_workers; ++number) { -# define WR(x_next) pool->workers[number].x_next + for (uint index = 0; index < pool->n_workers; ++index) { + us_worker_s *const wr = &pool->workers[index]; - WR(number) = number; - US_ASPRINTF(WR(name), "%s-%u", wr_prefix, number); + wr->number = index; + US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index); - US_MUTEX_INIT(WR(has_job_mutex)); - atomic_init(&WR(has_job), false); - US_COND_INIT(WR(has_job_cond)); + US_MUTEX_INIT(wr->has_job_mutex); + atomic_init(&wr->has_job, false); + US_COND_INIT(wr->has_job_cond); - WR(pool) = pool; - WR(job) = job_init(job_init_arg); + wr->pool = pool; + wr->job = job_init(job_init_arg); - US_THREAD_CREATE(WR(tid), _worker_thread, (void*)&(pool->workers[number])); + US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr); pool->free_workers += 1; - -# undef WR } return pool; } @@ -74,23 +81,21 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) { US_LOG_INFO("Destroying workers pool %s ...", pool->name); atomic_store(&pool->stop, true); - for (unsigned number = 0; number < pool->n_workers; ++number) { -# define WR(x_next) pool->workers[number].x_next + for (uint index = 0; index < pool->n_workers; ++index) { + us_worker_s *const wr = &pool->workers[index]; - US_MUTEX_LOCK(WR(has_job_mutex)); - atomic_store(&WR(has_job), true); // Final job: die - US_MUTEX_UNLOCK(WR(has_job_mutex)); - US_COND_SIGNAL(WR(has_job_cond)); - - US_THREAD_JOIN(WR(tid)); - US_MUTEX_DESTROY(WR(has_job_mutex)); - US_COND_DESTROY(WR(has_job_cond)); + US_MUTEX_LOCK(wr->has_job_mutex); + atomic_store(&wr->has_job, true); // Final job: die + US_MUTEX_UNLOCK(wr->has_job_mutex); + US_COND_SIGNAL(wr->has_job_cond); - free(WR(name)); + US_THREAD_JOIN(wr->tid); + US_MUTEX_DESTROY(wr->has_job_mutex); + US_COND_DESTROY(wr->has_job_cond); - pool->job_destroy(WR(job)); + free(wr->name); -# undef WR + pool->job_destroy(wr->job); } US_MUTEX_DESTROY(pool->free_workers_mutex); @@ -112,14 +117,15 @@ us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) { ready_wr->job_timely = true; pool->oldest_wr = pool->oldest_wr->next_wr; } else { - for (unsigned number = 0; number < pool->n_workers; ++number) { + for (uint index = 0; index < pool->n_workers; ++index) { + us_worker_s *const wr = &pool->workers[index]; if ( - !atomic_load(&pool->workers[number].has_job) && ( + !atomic_load(&wr->has_job) && ( ready_wr == NULL - || ready_wr->job_start_ts < pool->workers[number].job_start_ts + || ready_wr->job_start_ts < wr->job_start_ts ) ) { - ready_wr = &pool->workers[number]; + ready_wr = wr; break; } } @@ -157,15 +163,15 @@ void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, vo US_MUTEX_UNLOCK(pool->free_workers_mutex); } -long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) { - const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1; +ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) { + const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1; US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)", pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time); pool->approx_job_time = approx_job_time; - const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров + const ldf min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) { // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps @@ -176,7 +182,7 @@ long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_ } static void *_worker_thread(void *v_worker) { - us_worker_s *wr = v_worker; + us_worker_s *const wr = v_worker; US_THREAD_SETTLE("%s", wr->name); US_LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name); @@ -189,7 +195,7 @@ static void *_worker_thread(void *v_worker) { US_MUTEX_UNLOCK(wr->has_job_mutex); if (!atomic_load(&wr->pool->stop)) { - const long double job_start_ts = us_get_now_monotonic(); + const ldf job_start_ts = us_get_now_monotonic(); wr->job_failed = !wr->pool->run_job(wr); if (!wr->job_failed) { wr->job_start_ts = job_start_ts; diff --git a/src/ustreamer/workers.h b/src/ustreamer/workers.h index 4cbcbbb2..fe03cca5 100644 --- a/src/ustreamer/workers.h +++ b/src/ustreamer/workers.h @@ -22,31 +22,26 @@ #pragma once -#include #include -#include - #include -#include "../libs/tools.h" -#include "../libs/threading.h" -#include "../libs/logging.h" +#include "../libs/types.h" typedef struct us_worker_sx { - pthread_t tid; - unsigned number; - char *name; + pthread_t tid; + uint number; + char *name; - long double last_job_time; + ldf last_job_time; pthread_mutex_t has_job_mutex; void *job; atomic_bool has_job; bool job_timely; bool job_failed; - long double job_start_ts; + ldf job_start_ts; pthread_cond_t has_job_cond; struct us_worker_sx *prev_wr; @@ -61,20 +56,20 @@ typedef bool (*us_workers_pool_run_job_f)(us_worker_s *wr); typedef struct us_workers_pool_sx { const char *name; - long double desired_interval; + ldf desired_interval; us_workers_pool_job_destroy_f job_destroy; us_workers_pool_run_job_f run_job; - unsigned n_workers; + uint n_workers; us_worker_s *workers; us_worker_s *oldest_wr; us_worker_s *latest_wr; - long double approx_job_time; + ldf approx_job_time; pthread_mutex_t free_workers_mutex; - unsigned free_workers; + uint free_workers; pthread_cond_t free_workers_cond; atomic_bool stop; @@ -82,7 +77,7 @@ typedef struct us_workers_pool_sx { us_workers_pool_s *us_workers_pool_init( - const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval, + const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval, us_workers_pool_job_init_f job_init, void *job_init_arg, us_workers_pool_job_destroy_f job_destroy, us_workers_pool_run_job_f run_job); @@ -92,4 +87,4 @@ void us_workers_pool_destroy(us_workers_pool_s *pool); us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool); void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/); -long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr); +ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);