From 5d371e33780ceda8b597a6b912a49929de8a1f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 16 Sep 2024 11:51:57 +0200 Subject: [PATCH] WorkerThreadPool: Add safety point between languages finished and pool termination --- core/object/worker_thread_pool.cpp | 94 ++++++++++++++++++++++++------ core/object/worker_thread_pool.h | 22 ++++++- main/main.cpp | 2 + 3 files changed, 98 insertions(+), 20 deletions(-) diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 0c19fe06a433..08903d61964d 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -186,7 +186,7 @@ void WorkerThreadPool::_thread_function(void *p_user) { { MutexLock lock(singleton->task_mutex); - bool exit = singleton->_handle_runlevel(); + bool exit = singleton->_handle_runlevel(thread_data, lock); if (unlikely(exit)) { break; } @@ -207,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) { } } -void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) { +void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock) { // Fall back to processing on the calling thread if there are no worker threads. // Separated into its own variable to make it easier to extend this logic // in custom builds. bool process_on_calling_thread = threads.size() == 0; if (process_on_calling_thread) { - task_mutex.unlock(); + p_lock.temp_unlock(); for (uint32_t i = 0; i < p_count; i++) { _process_task(p_tasks[i]); } + p_lock.temp_relock(); return; } + while (runlevel == RUNLEVEL_EXIT_LANGUAGES) { + control_cond_var.wait(p_lock); + } + uint32_t to_process = 0; uint32_t to_promote = 0; @@ -241,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, } _notify_threads(caller_pool_thread, to_process, to_promote); - - task_mutex.unlock(); } void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) { @@ -326,7 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void * } WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) { - task_mutex.lock(); + MutexLock lock(task_mutex); + // Get a free task Task *task = task_allocator.alloc(); TaskID id = last_task++; @@ -338,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, task->template_userdata = p_template_userdata; tasks.insert(id, task); - _post_tasks_and_unlock(&task, 1, p_high_priority); + _post_tasks(&task, 1, p_high_priority, lock); return id; } @@ -454,7 +458,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T bool was_signaled = p_caller_pool_thread->signaled; p_caller_pool_thread->signaled = false; - bool exit = _handle_runlevel(); + bool exit = _handle_runlevel(p_caller_pool_thread, lock); if (unlikely(exit)) { break; } @@ -523,15 +527,44 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) { DEV_ASSERT(p_runlevel > runlevel); runlevel = p_runlevel; + memset(&runlevel_data, 0, sizeof(runlevel_data)); for (uint32_t i = 0; i < threads.size(); i++) { threads[i].cond_var.notify_one(); threads[i].signaled = true; } + control_cond_var.notify_all(); } // Returns whether threads have to exit. This may perform the check about handling needed. -bool WorkerThreadPool::_handle_runlevel() { - return runlevel == RUNLEVEL_EXIT; +bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock &p_lock) { + bool exit = false; + switch (runlevel) { + case RUNLEVEL_NORMAL: { + } break; + case RUNLEVEL_PRE_EXIT_LANGUAGES: { + if (!p_thread_data->pre_exited_languages) { + if (!task_queue.first() && !low_priority_task_queue.first()) { + p_thread_data->pre_exited_languages = true; + runlevel_data.pre_exit_languages.num_idle_threads++; + control_cond_var.notify_all(); + } + } + } break; + case RUNLEVEL_EXIT_LANGUAGES: { + if (!p_thread_data->exited_languages) { + p_lock.temp_unlock(); + ScriptServer::thread_exit(); + p_lock.temp_relock(); + p_thread_data->exited_languages = true; + runlevel_data.exit_languages.num_exited_threads++; + control_cond_var.notify_all(); + } + } break; + case RUNLEVEL_EXIT: { + exit = true; + } break; + } + return exit; } void WorkerThreadPool::yield() { @@ -539,11 +572,17 @@ void WorkerThreadPool::yield() { ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread."); _wait_collaboratively(&threads[th_index], ThreadData::YIELDING); - // If this long-lived task started before the scripting server was initialized, - // now is a good time to have scripting languages ready for the current thread. - // Otherwise, such a piece of setup won't happen unless another task has been - // run during the collaborative wait. - ScriptServer::thread_enter(); + task_mutex.lock(); + if (runlevel < RUNLEVEL_EXIT_LANGUAGES) { + // If this long-lived task started before the scripting server was initialized, + // now is a good time to have scripting languages ready for the current thread. + // Otherwise, such a piece of setup won't happen unless another task has been + // run during the collaborative wait. + task_mutex.unlock(); + ScriptServer::thread_enter(); + } else { + task_mutex.unlock(); + } } void WorkerThreadPool::notify_yield_over(TaskID p_task_id) { @@ -573,7 +612,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca p_tasks = MAX(1u, threads.size()); } - task_mutex.lock(); + MutexLock lock(task_mutex); + Group *group = group_allocator.alloc(); GroupID id = last_task++; group->max = p_elements; @@ -608,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca groups[id] = group; - _post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority); + _post_tasks(tasks_posted, p_tasks, p_high_priority, lock); return id; } @@ -731,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) } } +void WorkerThreadPool::exit_languages_threads() { + if (threads.size() == 0) { + return; + } + + MutexLock lock(task_mutex); + + // Wait until all threads are idle. + _switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES); + while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) { + control_cond_var.wait(lock); + } + + // Wait until all threads have detached from scripting languages. + _switch_runlevel(RUNLEVEL_EXIT_LANGUAGES); + while (runlevel_data.exit_languages.num_exited_threads != threads.size()) { + control_cond_var.wait(lock); + } +} + void WorkerThreadPool::finish() { if (threads.size() == 0) { return; diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index ba6efbb06508..62296ac04052 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -114,20 +114,35 @@ class WorkerThreadPool : public Object { Thread thread; bool signaled : 1; bool yield_is_over : 1; + bool pre_exited_languages : 1; + bool exited_languages : 1; Task *current_task = nullptr; Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING). ConditionVariable cond_var; ThreadData() : signaled(false), - yield_is_over(false) {} + yield_is_over(false), + pre_exited_languages(false), + exited_languages(false) {} }; TightLocalVector threads; enum Runlevel { RUNLEVEL_NORMAL, + RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks + RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads. RUNLEVEL_EXIT, } runlevel = RUNLEVEL_NORMAL; + union { // Cleared on every runlevel change. + struct { + uint32_t num_idle_threads; + } pre_exit_languages; + struct { + uint32_t num_exited_threads; + } exit_languages; + } runlevel_data; + ConditionVariable control_cond_var; HashMap thread_ids; HashMap< @@ -155,7 +170,7 @@ class WorkerThreadPool : public Object { void _process_task(Task *task); - void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority); + void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock &p_lock); void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count); bool _try_promote_low_priority_task(); @@ -197,7 +212,7 @@ class WorkerThreadPool : public Object { void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task); void _switch_runlevel(Runlevel p_runlevel); - bool _handle_runlevel(); + bool _handle_runlevel(ThreadData *p_thread_data, MutexLock &p_lock); #ifdef THREADS_ENABLED static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock &p_ulock); @@ -262,6 +277,7 @@ class WorkerThreadPool : public Object { #endif void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3); + void exit_languages_threads(); void finish(); WorkerThreadPool(); ~WorkerThreadPool(); diff --git a/main/main.cpp b/main/main.cpp index 18ffedef1877..f1ee4bf2a6bb 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -4501,6 +4501,8 @@ void Main::cleanup(bool p_force) { ResourceLoader::clear_translation_remaps(); ResourceLoader::clear_path_remaps(); + WorkerThreadPool::get_singleton()->exit_languages_threads(); + ScriptServer::finish_languages(); // Sync pending commands that may have been queued from a different thread during ScriptServer finalization