From 684a5b937b2e2d9a933c6106ceb9f87727a4fe4e Mon Sep 17 00:00:00 2001 From: John Haddon Date: Tue, 17 Oct 2023 13:06:03 +0100 Subject: [PATCH] Revert "Process : Replace `task_arena` with `isolated_task_group`" This reverts commit 148291d26b2fd8df07282ea7c75feba1d70ac7bb. --- .../Gaffer/Private/IECorePreview/TaskMutex.h | 1 - include/Gaffer/Process.inl | 97 ++++++++++--------- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/TaskMutex.h b/include/Gaffer/Private/IECorePreview/TaskMutex.h index 1ff73c75a79..8e8a96252c8 100644 --- a/include/Gaffer/Private/IECorePreview/TaskMutex.h +++ b/include/Gaffer/Private/IECorePreview/TaskMutex.h @@ -44,7 +44,6 @@ #include "tbb/spin_rw_mutex.h" #include "tbb/task_arena.h" -#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #include "tbb/task_group.h" #include diff --git a/include/Gaffer/Process.inl b/include/Gaffer/Process.inl index 25f81a60767..19d67cccbbc 100644 --- a/include/Gaffer/Process.inl +++ b/include/Gaffer/Process.inl @@ -36,7 +36,7 @@ #include "tbb/concurrent_hash_map.h" #include "tbb/spin_mutex.h" -#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 +#include "tbb/task_arena.h" #include "tbb/task_group.h" #include @@ -239,9 +239,10 @@ class GAFFER_API Process::Collaboration : public IECore::RefCounted IE_CORE_DECLAREMEMBERPTR( Collaboration ); - // Task group used to allow waiting threads to participate in - // collaborative work. - tbb::isolated_task_group taskGroup; + // Arena and task group used to allow waiting threads to participate + // in collaborative work. + tbb::task_arena arena; + tbb::task_group taskGroup; using Set = std::unordered_set; // Collaborations depending directly on this one. @@ -328,8 +329,8 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( } // We've found an in-flight process we can wait on without causing - // deadlock. Wait on the result, so we get to work on any TBB tasks it - // has created. + // deadlock. Join its `task_arena` and wait on the result, so we get to + // work on any TBB tasks it has created. // // > Note : We need to own a reference to `collaboration` because the // thread that created it may drop its own reference as soon as we call @@ -344,7 +345,9 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( CollaborationTypePtr collaboration = candidate; accessor.release(); - collaboration->taskGroup.wait(); + collaboration->arena.execute( + [&]{ return collaboration->taskGroup.wait(); } + ); if( collaboration->result ) { @@ -375,45 +378,49 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( std::exception_ptr exception; - auto status = collaboration->taskGroup.run_and_wait( + auto status = collaboration->arena.execute( [&] { - // Publish ourselves so that other threads can collaborate - // by calling `collaboration->taskGroup.wait()`. - accessor->second.push_back( collaboration ); - accessor.release(); - - try - { - ProcessType process( std::forward( args )... ); - process.m_collaboration = collaboration.get(); - collaboration->result = process.run(); - // Publish result to cache before we remove ourself from - // `g_pendingCollaborations`, so that other threads will - // be able to get the result one way or the other. - ProcessType::g_cache.setIfUncached( - cacheKey, *collaboration->result, - ProcessType::cacheCostFunction - ); - } - catch( ... ) - { - // Don't allow `task_group::wait()` to see exceptions, - // because then we'd hit a thread-safety bug in - // `tbb::task_group_context::reset()`. - exception = std::current_exception(); - } - - // Now we're done, remove `collaboration` from the pending collaborations. - [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); - assert( found ); - auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); - assert( toErase != accessor->second.end() ); - accessor->second.erase( toErase ); - if( accessor->second.empty() ) - { - CollaborationType::g_pendingCollaborations.erase( accessor ); - } - accessor.release(); + return collaboration->taskGroup.run_and_wait( + [&] { + // Publish ourselves so that other threads can collaborate + // by calling `collaboration->taskGroup.wait()`. + accessor->second.push_back( collaboration ); + accessor.release(); + + try + { + ProcessType process( std::forward( args )... ); + process.m_collaboration = collaboration.get(); + collaboration->result = process.run(); + // Publish result to cache before we remove ourself from + // `g_pendingCollaborations`, so that other threads will + // be able to get the result one way or the other. + ProcessType::g_cache.setIfUncached( + cacheKey, *collaboration->result, + ProcessType::cacheCostFunction + ); + } + catch( ... ) + { + // Don't allow `task_group::wait()` to see exceptions, + // because then we'd hit a thread-safety bug in + // `tbb::task_group_context::reset()`. + exception = std::current_exception(); + } + + // Now we're done, remove `collaboration` from the pending collaborations. + [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); + assert( found ); + auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); + assert( toErase != accessor->second.end() ); + accessor->second.erase( toErase ); + if( accessor->second.empty() ) + { + CollaborationType::g_pendingCollaborations.erase( accessor ); + } + accessor.release(); + } + ); } );