Skip to content

Commit

Permalink
Revert "Process : Replace task_arena with isolated_task_group"
Browse files Browse the repository at this point in the history
This reverts commit 148291d.
  • Loading branch information
johnhaddon committed Oct 25, 2023
1 parent 06dd220 commit a62f892
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 46 deletions.
1 change: 0 additions & 1 deletion include/Gaffer/Private/IECorePreview/TaskMutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "tbb/spin_rw_mutex.h"

#include "tbb/task_arena.h"
#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1
#include "tbb/task_group.h"

#include <iostream>
Expand Down
97 changes: 52 additions & 45 deletions include/Gaffer/Process.inl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

#include "tbb/concurrent_hash_map.h"
#include "tbb/spin_mutex.h"
#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1
#include "tbb/task_arena.h"
#include "tbb/task_group.h"

#include <unordered_set>
Expand Down Expand Up @@ -239,9 +239,10 @@ class GAFFER_API Process::Collaboration : public IECore::RefCounted

IE_CORE_DECLAREMEMBERPTR( Collaboration );

// Task group used to allow waiting threads to participate in
// collaborative work.
tbb::isolated_task_group taskGroup;
// Arena and task group used to allow waiting threads to participate
// in collaborative work.
tbb::task_arena arena;
tbb::task_group taskGroup;

using Set = std::unordered_set<const Collaboration *>;
// Collaborations depending directly on this one.
Expand Down Expand Up @@ -328,8 +329,8 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(
}

// We've found an in-flight process we can wait on without causing
// deadlock. Wait on the result, so we get to work on any TBB tasks it
// has created.
// deadlock. Join its `task_arena` and wait on the result, so we get to
// work on any TBB tasks it has created.
//
// > Note : We need to own a reference to `collaboration` because the
// thread that created it may drop its own reference as soon as we call
Expand All @@ -344,7 +345,9 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(
CollaborationTypePtr collaboration = candidate;
accessor.release();

collaboration->taskGroup.wait();
collaboration->arena.execute(
[&]{ return collaboration->taskGroup.wait(); }
);

if( collaboration->result )
{
Expand Down Expand Up @@ -375,45 +378,49 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(

std::exception_ptr exception;

auto status = collaboration->taskGroup.run_and_wait(
auto status = collaboration->arena.execute(
[&] {
// Publish ourselves so that other threads can collaborate
// by calling `collaboration->taskGroup.wait()`.
accessor->second.push_back( collaboration );
accessor.release();

try
{
ProcessType process( std::forward<ProcessArguments>( args )... );
process.m_collaboration = collaboration.get();
collaboration->result = process.run();
// Publish result to cache before we remove ourself from
// `g_pendingCollaborations`, so that other threads will
// be able to get the result one way or the other.
ProcessType::g_cache.setIfUncached(
cacheKey, *collaboration->result,
ProcessType::cacheCostFunction
);
}
catch( ... )
{
// Don't allow `task_group::wait()` to see exceptions,
// because then we'd hit a thread-safety bug in
// `tbb::task_group_context::reset()`.
exception = std::current_exception();
}

// Now we're done, remove `collaboration` from the pending collaborations.
[[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey );
assert( found );
auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration );
assert( toErase != accessor->second.end() );
accessor->second.erase( toErase );
if( accessor->second.empty() )
{
CollaborationType::g_pendingCollaborations.erase( accessor );
}
accessor.release();
return collaboration->taskGroup.run_and_wait(
[&] {
// Publish ourselves so that other threads can collaborate
// by calling `collaboration->taskGroup.wait()`.
accessor->second.push_back( collaboration );
accessor.release();

try
{
ProcessType process( std::forward<ProcessArguments>( args )... );
process.m_collaboration = collaboration.get();
collaboration->result = process.run();
// Publish result to cache before we remove ourself from
// `g_pendingCollaborations`, so that other threads will
// be able to get the result one way or the other.
ProcessType::g_cache.setIfUncached(
cacheKey, *collaboration->result,
ProcessType::cacheCostFunction
);
}
catch( ... )
{
// Don't allow `task_group::wait()` to see exceptions,
// because then we'd hit a thread-safety bug in
// `tbb::task_group_context::reset()`.
exception = std::current_exception();
}

// Now we're done, remove `collaboration` from the pending collaborations.
[[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey );
assert( found );
auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration );
assert( toErase != accessor->second.end() );
accessor->second.erase( toErase );
if( accessor->second.empty() )
{
CollaborationType::g_pendingCollaborations.erase( accessor );
}
accessor.release();
}
);
}
);

Expand Down

0 comments on commit a62f892

Please sign in to comment.