Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskCollaboration improvements #5489

24 changes: 10 additions & 14 deletions include/Gaffer/Private/IECorePreview/LRUCache.inl
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ class Serial
// always be OK to write. But we return false for recursive
// calls to avoid unnecessary overhead updating the LRU list
// for inner calls.
/// \todo Is this distinction worth it, and do we really need
/// to support recursion on a single item in the Serial policy?
/// This is a remnant of a more complex system that allowed recursion
/// in the TaskParallel policy, but that has since been removed.
return m_it->handleCount == 1;
}

Expand Down Expand Up @@ -733,20 +737,19 @@ class TaskParallel

CacheEntry &writable()
{
assert( m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write );
assert( m_itemLock.isWriter() );
return m_item->cacheEntry;
}

// May return false for AcquireMode::Insert if a GetterFunction recurses.
bool isWritable() const
{
return m_itemLock.lockType() == Item::Mutex::ScopedLock::LockType::Write;
return m_itemLock.isWriter();
}

template<typename F>
void execute( F &&f )
{
if( m_spawnsTasks && m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write )
if( m_spawnsTasks )
{
// The getter function will spawn tasks. Execute
// it via the TaskMutex, so that other threads trying
Expand Down Expand Up @@ -814,16 +817,9 @@ class TaskParallel
// found a pre-existing item we optimistically take
// just a read lock, because it is faster when
// many threads just need to read from the same
// cached item. We accept WorkerRead locks when necessary,
// to support Getter recursion.
TaskMutex::ScopedLock::LockType lockType = TaskMutex::ScopedLock::LockType::WorkerRead;
if( inserted || mode == FindWritable || mode == InsertWritable )
{
lockType = TaskMutex::ScopedLock::LockType::Write;
}

// cached item.
const bool acquired = m_itemLock.acquireOr(
it->mutex, lockType,
it->mutex, /* write = */ inserted || mode == FindWritable || mode == InsertWritable,
// Work accepter
[&binLock, canceller] ( bool workAvailable ) {
// Release the bin lock prior to accepting work, because
Expand Down Expand Up @@ -855,7 +851,7 @@ class TaskParallel
if( acquired )
{
if(
m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Read &&
!m_itemLock.isWriter() &&
mode == Insert && it->cacheEntry.status() == LRUCache::Uncached
)
{
Expand Down
130 changes: 17 additions & 113 deletions include/Gaffer/Private/IECorePreview/TaskMutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@

#include "tbb/task_arena.h"
#include "tbb/task_group.h"
// Enable preview feature that allows us to construct a `task_scheduler_observer`
// for a specific `task_arena`. This feature becomes officially supported in
// Intel TBB 2019 Update 5, so it is not going to be removed.
#define TBB_PREVIEW_LOCAL_OBSERVER 1
#include "tbb/task_scheduler_observer.h"

#include <iostream>
#include <optional>
Expand Down Expand Up @@ -101,6 +96,9 @@ namespace IECorePreview
/// }
/// // Use resource here, while lock is still held.
/// ```
///
/// \todo Investigate `tbb::collaborative_call_once`, once VFXPlatform has moved to OneTBB.
/// It appears to provide very similar functionality.
class TaskMutex : boost::noncopyable
{

Expand All @@ -121,7 +119,7 @@ class TaskMutex : boost::noncopyable
public :

ScopedLock()
: m_mutex( nullptr ), m_lockType( LockType::None )
: m_mutex( nullptr ), m_writer( false )
{
}

Expand All @@ -143,9 +141,8 @@ class TaskMutex : boost::noncopyable
/// work on behalf of `execute()` while waiting.
void acquire( TaskMutex &mutex, bool write = true, bool acceptWork = true )
{
const LockType l = write ? LockType::Write : LockType::Read;
tbb::internal::atomic_backoff backoff;
while( !acquireOr( mutex, l, [acceptWork]( bool workAvailable ){ return acceptWork; } ) )
while( !acquireOr( mutex, write, [acceptWork]( bool workAvailable ){ return acceptWork; } ) )
{
backoff.pause();
}
Expand All @@ -156,8 +153,8 @@ class TaskMutex : boost::noncopyable
/// temporarily releasing the lock, and false otherwise.
bool upgradeToWriter()
{
assert( m_mutex && (m_lockType == LockType::Read) );
m_lockType = LockType::Write;
assert( m_mutex && !m_writer );
m_writer = true;
return m_lock.upgrade_to_writer();
}

Expand All @@ -166,7 +163,7 @@ class TaskMutex : boost::noncopyable
template<typename F>
void execute( F &&f )
{
assert( m_mutex && m_lockType == LockType::Write );
assert( m_mutex && m_writer );

ExecutionStateMutex::scoped_lock executionStateLock( m_mutex->m_executionStateMutex );
assert( !m_mutex->m_executionState );
Expand Down Expand Up @@ -219,8 +216,7 @@ class TaskMutex : boost::noncopyable
/// Acquires mutex or returns false. Never does TBB tasks.
bool tryAcquire( TaskMutex &mutex, bool write = true )
{
const LockType l = write ? LockType::Write : LockType::Read;
return acquireOr( mutex, l, []( bool workAvailable ){ return false; } );
return acquireOr( mutex, write, []( bool workAvailable ){ return false; } );
}

/// Releases the lock. This will be done automatically
Expand All @@ -229,12 +225,9 @@ class TaskMutex : boost::noncopyable
void release()
{
assert( m_mutex );
if( m_lockType != LockType::WorkerRead )
{
m_lock.release();
}
m_lock.release();
m_mutex = nullptr;
m_lockType = LockType::None;
m_writer = false;
}

/// Advanced API
Expand All @@ -244,40 +237,20 @@ class TaskMutex : boost::noncopyable
/// in Gaffer's LRUCache. They should not be considered part of the canonical
/// API.

enum class LockType
{
None,
// Multiple readers may coexist.
Read,
// Only a single writer can exist at a time, and the presence
// of a writer prevents read locks being acquired.
Write,
// Artificial read lock, available only to threads performing
// TBB tasks on behalf of `execute()`. These readers are
// protected only by the original write lock held by the caller
// of `execute()`. This means the caller of `execute()` _must_
// delay any writes until _after_ `execute()` has returned.
// A WorkerRead lock can not be upgraded via `upgradeToWriter()`.
WorkerRead,
};

/// Tries to acquire the mutex, returning true on success. On failure,
/// calls `workNotifier( bool workAvailable )`. If work is available and
/// `workNotifier` returns true, then this thread will perform TBB tasks
/// spawned by `execute()` until the work is complete. Returns false on
/// failure regardless of whether or not work is done.
template<typename WorkNotifier>
bool acquireOr( TaskMutex &mutex, LockType lockType, WorkNotifier &&workNotifier )
bool acquireOr( TaskMutex &mutex, bool write, WorkNotifier &&workNotifier )
{
assert( !m_mutex );
assert( m_lockType == LockType::None );
assert( lockType != LockType::None );

if( m_lock.try_acquire( mutex.m_mutex, /* write = */ lockType == LockType::Write ) )
if( m_lock.try_acquire( mutex.m_mutex, write ) )
{
// Success!
m_mutex = &mutex;
m_lockType = lockType == LockType::WorkerRead ? LockType::Read : lockType;
m_writer = write;
return true;
}

Expand All @@ -286,14 +259,6 @@ class TaskMutex : boost::noncopyable
// current call to `execute()`.

ExecutionStateMutex::scoped_lock executionStateLock( mutex.m_executionStateMutex );
if( lockType == LockType::WorkerRead && mutex.m_executionState && mutex.m_executionState->arenaObserver.containsThisThread() )
{
// We're already doing work on behalf of `execute()`, so we can
// take a WorkerRead lock.
m_mutex = &mutex;
m_lockType = lockType;
return true;
}

const bool workAvailable = mutex.m_executionState.get();
if( !workNotifier( workAvailable ) || !workAvailable )
Expand All @@ -313,20 +278,16 @@ class TaskMutex : boost::noncopyable
return false;
}

/// Returns the type of the lock currently held. If `acquireOr( WorkerRead )`
/// is called successfully, this will return `Read` for an external lock and
/// `WorkerRead` for an internal lock acquired by virtue of performing tasks
/// for `execute()`.
LockType lockType() const
bool isWriter() const
{
return m_lockType;
return m_writer;
}

private :

InternalMutex::scoped_lock m_lock;
TaskMutex *m_mutex;
LockType m_lockType;
bool m_writer;

};

Expand All @@ -335,64 +296,10 @@ class TaskMutex : boost::noncopyable
// The actual mutex that is held by the ScopedLock.
InternalMutex m_mutex;

// Tracks worker threads as they enter and exit an arena, so we can determine
// whether or not the current thread is inside the arena. We use this to detect
// recursion and allow any worker thread to obtain a recursive lock provided
// they are currently performing work in service of `ScopedLock::execute()`.
class ArenaObserver : public tbb::task_scheduler_observer
{

public :

ArenaObserver( tbb::task_arena &arena )
: tbb::task_scheduler_observer( arena )
{
observe( true );
}

~ArenaObserver() override
{
observe( false );
}

bool containsThisThread()
{
Mutex::scoped_lock lock( m_mutex );
return m_threadIdSet.find( std::this_thread::get_id() ) != m_threadIdSet.end();
}

private :

void on_scheduler_entry( bool isWorker ) override
{
assert( !containsThisThread() );
Mutex::scoped_lock lock( m_mutex );
m_threadIdSet.insert( std::this_thread::get_id() );
}

void on_scheduler_exit( bool isWorker ) override
{
assert( containsThisThread() );
Mutex::scoped_lock lock( m_mutex );
m_threadIdSet.erase( std::this_thread::get_id() );
}

using Mutex = tbb::spin_mutex;
using ThreadIdSet = boost::container::flat_set<std::thread::id>;
Mutex m_mutex;
ThreadIdSet m_threadIdSet;

};

// The mechanism we use to allow waiting threads
// to participate in the work done by `execute()`.
struct ExecutionState : public IECore::RefCounted
{
ExecutionState()
: arenaObserver( arena )
{
}

// Work around https://bugs.llvm.org/show_bug.cgi?id=32978
~ExecutionState() noexcept( true ) override
{
Expand All @@ -402,9 +309,6 @@ class TaskMutex : boost::noncopyable
// waiting threads to participate in work.
tbb::task_arena arena;
tbb::task_group taskGroup;
// Observer used to track which threads are
// currently inside the arena.
ArenaObserver arenaObserver;
};
IE_CORE_DECLAREPTR( ExecutionState );

Expand Down
27 changes: 27 additions & 0 deletions include/Gaffer/Process.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,34 @@ class GAFFER_API Process : private ThreadState::Scope
/// \todo This just exists for ABI compatibility. Remove it.
void handleException();

/// Searches for an in-flight process and waits for its result, collaborating
/// on any TBB tasks it spawns. If no such process exists, constructs one
/// using `args` and makes it available for collaboration by other threads,
/// publishing the result to a cache when the process completes.
///
/// Requirements :
///
/// - `ProcessType( args... )` constructs a process suitable for computing
/// the result for `cacheKey`.
/// - `ProcessType::ResultType` defines the result type for the process.
/// - `ProcessType::run()` does the work for the process and returns the
/// result.
/// - `ProcessType::g_cache` is a static LRUCache of type `ProcessType::CacheType`
/// to be used for the caching of the result.
/// - `ProcessType::cacheCostFunction()` is a static function suitable
/// for use with `CacheType::setIfUncached()`.
///
template<typename ProcessType, typename... ProcessArguments>
static typename ProcessType::ResultType acquireCollaborativeResult(
const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments... args
);

private :

class Collaboration;
template<typename T>
class TypedCollaboration;

static bool forceMonitoringInternal( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType );

void emitError( const std::string &error, const Plug *source = nullptr ) const;
Expand All @@ -111,6 +137,7 @@ class GAFFER_API Process : private ThreadState::Scope
const Plug *m_plug;
const Plug *m_destinationPlug;
const Process *m_parent;
const Collaboration *m_collaboration;

};

Expand Down
Loading