diff --git a/Changes.md b/Changes.md index 55e383e8d38..a0f9179b5e8 100644 --- a/Changes.md +++ b/Changes.md @@ -16,12 +16,19 @@ Improvements Fixes ----- +- ValuePlug : Fixed hangs and poor performance caused by plugs depending on upstream plugs with an identical hash (#4978). - Filter : Fixed bug which allowed the `scene:path` context variable to "leak" upstream via the `Filter.enabled` plug. This caused unnecessary evaluations of the input, and also provided a loophole via which the filter result could be made inconsistent with respect to descendant and ancestor matches. - Windows : - Fixed a bug preventing anything except strings from being copied and pasted. - Fixed likely cause of crash when resizing Spreadsheet column width (#5296). - Reference : Fixed rare reloading error. +API +--- + +- Process : Added `acquireCollaborativeResult()` method, providing an improved mechanism for multiple threads to collaborate on TBB tasks spawned by a single process they all depend on. +- ValuePlug : Added `Default` CachePolicy and deprecated `Standard`, `TaskIsolation` and `Legacy` policies. + 1.3.5.0 (relative to 1.3.4.0) ======= diff --git a/include/Gaffer/Private/IECorePreview/LRUCache.inl b/include/Gaffer/Private/IECorePreview/LRUCache.inl index 86895fb2061..49a12bbf197 100644 --- a/include/Gaffer/Private/IECorePreview/LRUCache.inl +++ b/include/Gaffer/Private/IECorePreview/LRUCache.inl @@ -167,17 +167,12 @@ class Serial return m_it->cacheEntry; } - // Returns true if it is OK to call `writable()`. - // This is typically determined by the AcquireMode - // passed to `acquire()`, with special cases for - // recursion. + // Returns true if it is OK to call `writable()`. This is determined + // by the AcquireMode passed to `acquire()`. bool isWritable() const { - // Because this policy is serial, it would technically - // always be OK to write. But we return false for recursive - // calls to avoid unnecessary overhead updating the LRU list - // for inner calls. - return m_it->handleCount == 1; + // Because this policy is serial, it is always OK to write + return true; } // Executes the functor F. This is used to @@ -733,20 +728,19 @@ class TaskParallel CacheEntry &writable() { - assert( m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write ); + assert( m_itemLock.isWriter() ); return m_item->cacheEntry; } - // May return false for AcquireMode::Insert if a GetterFunction recurses. bool isWritable() const { - return m_itemLock.lockType() == Item::Mutex::ScopedLock::LockType::Write; + return m_itemLock.isWriter(); } template void execute( F &&f ) { - if( m_spawnsTasks && m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + if( m_spawnsTasks ) { // The getter function will spawn tasks. Execute // it via the TaskMutex, so that other threads trying @@ -814,16 +808,9 @@ class TaskParallel // found a pre-existing item we optimistically take // just a read lock, because it is faster when // many threads just need to read from the same - // cached item. We accept WorkerRead locks when necessary, - // to support Getter recursion. - TaskMutex::ScopedLock::LockType lockType = TaskMutex::ScopedLock::LockType::WorkerRead; - if( inserted || mode == FindWritable || mode == InsertWritable ) - { - lockType = TaskMutex::ScopedLock::LockType::Write; - } - + // cached item. const bool acquired = m_itemLock.acquireOr( - it->mutex, lockType, + it->mutex, /* write = */ inserted || mode == FindWritable || mode == InsertWritable, // Work accepter [&binLock, canceller] ( bool workAvailable ) { // Release the bin lock prior to accepting work, because @@ -836,18 +823,7 @@ class TaskParallel // The only canceller being checked at that // point will be the one passed to the // `LRUCache::get()` call that we work in - // service of. This isn't ideal, as it can cause - // UI stalls if one UI element is waiting to - // cancel an operation, but it's tasks have been - // "captured" by collaboration on a compute - // started by another UI element (which hasn't - // requested cancellation). One alternative is - // that we would only accept work if our - // canceller matches the one in use by the - // original caller. This would rule out - // collaboration between UI elements, but would - // still allow diamond dependencies in graph - // evaluation to use collaboration. + // service of. return (!canceller || !canceller->cancelled()); } ); @@ -855,7 +831,7 @@ class TaskParallel if( acquired ) { if( - m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Read && + !m_itemLock.isWriter() && mode == Insert && it->cacheEntry.status() == LRUCache::Uncached ) { @@ -1108,6 +1084,7 @@ Value LRUCache::get( const GetterKey &key, const if( status==Uncached ) { + assert( handle.isWritable() ); Value value = Value(); Cost cost = 0; try @@ -1120,24 +1097,21 @@ Value LRUCache::get( const GetterKey &key, const } catch( ... ) { - if( handle.isWritable() && m_cacheErrors ) + if( m_cacheErrors ) { handle.writable().state = std::current_exception(); } throw; } - if( handle.isWritable() ) - { - assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow - assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention. + assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow + assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention. - setInternal( key, handle.writable(), value, cost ); - m_policy.push( handle ); + setInternal( key, handle.writable(), value, cost ); + m_policy.push( handle ); - handle.release(); - limitCost( m_maxCost ); - } + handle.release(); + limitCost( m_maxCost ); return value; } @@ -1202,8 +1176,9 @@ bool LRUCache::setIfUncached( const Key &key, con const Status status = cacheEntry.status(); bool result = false; - if( status == Uncached && handle.isWritable() ) + if( status == Uncached ) { + assert( handle.isWritable() ); result = setInternal( key, handle.writable(), value, costFunction( value ) ); m_policy.push( handle ); diff --git a/include/Gaffer/Private/IECorePreview/TaskMutex.h b/include/Gaffer/Private/IECorePreview/TaskMutex.h index da9474fa73d..8e8a96252c8 100644 --- a/include/Gaffer/Private/IECorePreview/TaskMutex.h +++ b/include/Gaffer/Private/IECorePreview/TaskMutex.h @@ -45,11 +45,6 @@ #include "tbb/task_arena.h" #include "tbb/task_group.h" -// Enable preview feature that allows us to construct a `task_scheduler_observer` -// for a specific `task_arena`. This feature becomes officially supported in -// Intel TBB 2019 Update 5, so it is not going to be removed. -#define TBB_PREVIEW_LOCAL_OBSERVER 1 -#include "tbb/task_scheduler_observer.h" #include #include @@ -101,6 +96,9 @@ namespace IECorePreview /// } /// // Use resource here, while lock is still held. /// ``` +/// +/// \todo Investigate `tbb::collaborative_call_once`, once VFXPlatform has moved to OneTBB. +/// It appears to provide very similar functionality. class TaskMutex : boost::noncopyable { @@ -121,7 +119,7 @@ class TaskMutex : boost::noncopyable public : ScopedLock() - : m_mutex( nullptr ), m_lockType( LockType::None ) + : m_mutex( nullptr ), m_writer( false ) { } @@ -143,9 +141,8 @@ class TaskMutex : boost::noncopyable /// work on behalf of `execute()` while waiting. void acquire( TaskMutex &mutex, bool write = true, bool acceptWork = true ) { - const LockType l = write ? LockType::Write : LockType::Read; tbb::internal::atomic_backoff backoff; - while( !acquireOr( mutex, l, [acceptWork]( bool workAvailable ){ return acceptWork; } ) ) + while( !acquireOr( mutex, write, [acceptWork]( bool workAvailable ){ return acceptWork; } ) ) { backoff.pause(); } @@ -156,8 +153,8 @@ class TaskMutex : boost::noncopyable /// temporarily releasing the lock, and false otherwise. bool upgradeToWriter() { - assert( m_mutex && (m_lockType == LockType::Read) ); - m_lockType = LockType::Write; + assert( m_mutex && !m_writer ); + m_writer = true; return m_lock.upgrade_to_writer(); } @@ -166,7 +163,7 @@ class TaskMutex : boost::noncopyable template void execute( F &&f ) { - assert( m_mutex && m_lockType == LockType::Write ); + assert( m_mutex && m_writer ); ExecutionStateMutex::scoped_lock executionStateLock( m_mutex->m_executionStateMutex ); assert( !m_mutex->m_executionState ); @@ -219,8 +216,7 @@ class TaskMutex : boost::noncopyable /// Acquires mutex or returns false. Never does TBB tasks. bool tryAcquire( TaskMutex &mutex, bool write = true ) { - const LockType l = write ? LockType::Write : LockType::Read; - return acquireOr( mutex, l, []( bool workAvailable ){ return false; } ); + return acquireOr( mutex, write, []( bool workAvailable ){ return false; } ); } /// Releases the lock. This will be done automatically @@ -229,12 +225,9 @@ class TaskMutex : boost::noncopyable void release() { assert( m_mutex ); - if( m_lockType != LockType::WorkerRead ) - { - m_lock.release(); - } + m_lock.release(); m_mutex = nullptr; - m_lockType = LockType::None; + m_writer = false; } /// Advanced API @@ -244,40 +237,20 @@ class TaskMutex : boost::noncopyable /// in Gaffer's LRUCache. They should not be considered part of the canonical /// API. - enum class LockType - { - None, - // Multiple readers may coexist. - Read, - // Only a single writer can exist at a time, and the presence - // of a writer prevents read locks being acquired. - Write, - // Artificial read lock, available only to threads performing - // TBB tasks on behalf of `execute()`. These readers are - // protected only by the original write lock held by the caller - // of `execute()`. This means the caller of `execute()` _must_ - // delay any writes until _after_ `execute()` has returned. - // A WorkerRead lock can not be upgraded via `upgradeToWriter()`. - WorkerRead, - }; - /// Tries to acquire the mutex, returning true on success. On failure, /// calls `workNotifier( bool workAvailable )`. If work is available and /// `workNotifier` returns true, then this thread will perform TBB tasks /// spawned by `execute()` until the work is complete. Returns false on /// failure regardless of whether or not work is done. template - bool acquireOr( TaskMutex &mutex, LockType lockType, WorkNotifier &&workNotifier ) + bool acquireOr( TaskMutex &mutex, bool write, WorkNotifier &&workNotifier ) { assert( !m_mutex ); - assert( m_lockType == LockType::None ); - assert( lockType != LockType::None ); - - if( m_lock.try_acquire( mutex.m_mutex, /* write = */ lockType == LockType::Write ) ) + if( m_lock.try_acquire( mutex.m_mutex, write ) ) { // Success! m_mutex = &mutex; - m_lockType = lockType == LockType::WorkerRead ? LockType::Read : lockType; + m_writer = write; return true; } @@ -286,14 +259,6 @@ class TaskMutex : boost::noncopyable // current call to `execute()`. ExecutionStateMutex::scoped_lock executionStateLock( mutex.m_executionStateMutex ); - if( lockType == LockType::WorkerRead && mutex.m_executionState && mutex.m_executionState->arenaObserver.containsThisThread() ) - { - // We're already doing work on behalf of `execute()`, so we can - // take a WorkerRead lock. - m_mutex = &mutex; - m_lockType = lockType; - return true; - } const bool workAvailable = mutex.m_executionState.get(); if( !workNotifier( workAvailable ) || !workAvailable ) @@ -313,20 +278,16 @@ class TaskMutex : boost::noncopyable return false; } - /// Returns the type of the lock currently held. If `acquireOr( WorkerRead )` - /// is called successfully, this will return `Read` for an external lock and - /// `WorkerRead` for an internal lock acquired by virtue of performing tasks - /// for `execute()`. - LockType lockType() const + bool isWriter() const { - return m_lockType; + return m_writer; } private : InternalMutex::scoped_lock m_lock; TaskMutex *m_mutex; - LockType m_lockType; + bool m_writer; }; @@ -335,64 +296,10 @@ class TaskMutex : boost::noncopyable // The actual mutex that is held by the ScopedLock. InternalMutex m_mutex; - // Tracks worker threads as they enter and exit an arena, so we can determine - // whether or not the current thread is inside the arena. We use this to detect - // recursion and allow any worker thread to obtain a recursive lock provided - // they are currently performing work in service of `ScopedLock::execute()`. - class ArenaObserver : public tbb::task_scheduler_observer - { - - public : - - ArenaObserver( tbb::task_arena &arena ) - : tbb::task_scheduler_observer( arena ) - { - observe( true ); - } - - ~ArenaObserver() override - { - observe( false ); - } - - bool containsThisThread() - { - Mutex::scoped_lock lock( m_mutex ); - return m_threadIdSet.find( std::this_thread::get_id() ) != m_threadIdSet.end(); - } - - private : - - void on_scheduler_entry( bool isWorker ) override - { - assert( !containsThisThread() ); - Mutex::scoped_lock lock( m_mutex ); - m_threadIdSet.insert( std::this_thread::get_id() ); - } - - void on_scheduler_exit( bool isWorker ) override - { - assert( containsThisThread() ); - Mutex::scoped_lock lock( m_mutex ); - m_threadIdSet.erase( std::this_thread::get_id() ); - } - - using Mutex = tbb::spin_mutex; - using ThreadIdSet = boost::container::flat_set; - Mutex m_mutex; - ThreadIdSet m_threadIdSet; - - }; - // The mechanism we use to allow waiting threads // to participate in the work done by `execute()`. struct ExecutionState : public IECore::RefCounted { - ExecutionState() - : arenaObserver( arena ) - { - } - // Work around https://bugs.llvm.org/show_bug.cgi?id=32978 ~ExecutionState() noexcept( true ) override { @@ -402,9 +309,6 @@ class TaskMutex : boost::noncopyable // waiting threads to participate in work. tbb::task_arena arena; tbb::task_group taskGroup; - // Observer used to track which threads are - // currently inside the arena. - ArenaObserver arenaObserver; }; IE_CORE_DECLAREPTR( ExecutionState ); diff --git a/include/Gaffer/Process.h b/include/Gaffer/Process.h index e69eb11f8b1..629a36b73b9 100644 --- a/include/Gaffer/Process.h +++ b/include/Gaffer/Process.h @@ -101,8 +101,34 @@ class GAFFER_API Process : private ThreadState::Scope /// \todo This just exists for ABI compatibility. Remove it. void handleException(); + /// Searches for an in-flight process and waits for its result, collaborating + /// on any TBB tasks it spawns. If no such process exists, constructs one + /// using `args` and makes it available for collaboration by other threads, + /// publishing the result to a cache when the process completes. + /// + /// Requirements : + /// + /// - `ProcessType( args... )` constructs a process suitable for computing + /// the result for `cacheKey`. + /// - `ProcessType::ResultType` defines the result type for the process. + /// - `ProcessType::run()` does the work for the process and returns the + /// result. + /// - `ProcessType::g_cache` is a static LRUCache of type `ProcessType::CacheType` + /// to be used for the caching of the result. + /// - `ProcessType::cacheCostFunction()` is a static function suitable + /// for use with `CacheType::setIfUncached()`. + /// + template + static typename ProcessType::ResultType acquireCollaborativeResult( + const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments&&... args + ); + private : + class Collaboration; + template + class TypedCollaboration; + static bool forceMonitoringInternal( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType ); void emitError( const std::string &error, const Plug *source = nullptr ) const; @@ -111,6 +137,7 @@ class GAFFER_API Process : private ThreadState::Scope const Plug *m_plug; const Plug *m_destinationPlug; const Process *m_parent; + const Collaboration *m_collaboration; }; diff --git a/include/Gaffer/Process.inl b/include/Gaffer/Process.inl index c8d8a71861b..19d67cccbbc 100644 --- a/include/Gaffer/Process.inl +++ b/include/Gaffer/Process.inl @@ -34,9 +34,408 @@ // ////////////////////////////////////////////////////////////////////////// +#include "tbb/concurrent_hash_map.h" +#include "tbb/spin_mutex.h" +#include "tbb/task_arena.h" +#include "tbb/task_group.h" + +#include + namespace Gaffer { +/// Process Graph Overview +/// ====================== +/// +/// > Note : These notes (and the Process design itself) are heavily biased +/// towards ValuePlug and ComputeNode, and their associated ComputeProcess and +/// HashProcess. +/// +/// It's tempting to think that because processes are stack-allocated, they each +/// have a single parent process waiting for them to complete, and that each +/// process is only waiting on a single child. It's also tempting to think that +/// there is a one-to-one correspondence between nodes and processes. +/// +/// Node graph Process graph +/// ---------- ------------- +/// +/// AddNode1 o current process +/// | | +/// AddNode2 o waiting process (lower in stack) +/// | | +/// AddNode3 o waiting process (even lower in stack) +/// +/// While that is true for the simple case shown above, the reality is far more +/// complicated due to contexts, multithreading, task collaboration and hash +/// aliasing. +/// +/// Contexts +/// -------- +/// +/// Processes are operations being performed by a node for a particular plug, in +/// a _particular context_. The topology of the process graph does not +/// correspond directly to the topology of the node graph itself. Rather, the +/// process graph is generated dynamically in response to each process launching +/// upstream processes it depends on. +/// +/// Loop <--- o Loop, loop:index=0 +/// | | | +/// v | o AddNode, loop:index=0 +/// AddNode-- | +/// o Loop, loop:index=1 +/// | +/// o AddNode, loop:index=1 +/// | +/// o ... +/// +/// As this example shows, cyclic _connections_ between plugs are even OK +/// provided that each process launches child _processes_ in a different context, +/// meaning that there are no cyclic dependencies between _processes_. +/// Even in this case, every process has only a single child and a single +/// parent, all living on the stack of a single thread, so the topology of +/// our process graph remains completely linear. But that ends as soon as +/// we consider multithreading. +/// +/// Multithreading +/// -------------- +/// +/// A single process can use TBB tasks to launch many child processes that may +/// each be run on a different thread : +/// +/// Random o o o current processes, one per thread +/// | \ | / +/// Collect o waiting process +/// +/// In this case, a single parent process may be waiting for multiple children +/// to complete. Our simple linear "graph" is now a directed tree (I'm using +/// terminology loosely here, I think the official term would be an +/// "arborescence"). +/// +/// This doesn't present any great obstacle in itself - the only new requirement +/// is that each TBB task scopes the ThreadState from the parent process, so +/// that we can associate the tasks's processes with the correct parent and run them in +/// the correct context. But it does highlight that a parent process may have +/// many children, and that processes may perform arbitrarily expensive amounts +/// of work. +/// +/// Task collaboration +/// ------------------ +/// +/// Now that we know there can be processes in-flight on each thread, we need to +/// consider what happens if two or more threads simultaneously want a result +/// from the same not-yet-run upstream process. Gaffer cannot query the upstream +/// dependencies for a process before launching it, and therefore cannot perform +/// any up-front task scheduling. So in the example below, when two threads are +/// each running their own process and they dynamically turn out to require the +/// same upstream dependency, we need to deal with it dynamically. +/// +/// AddNode1 ? ? +/// / \ | | +/// AddNode2 AddNode3 o o +/// +/// One approach is to simply allow each thread to run their own copy of the +/// process redundantly, and in fact this is a reasonable strategy that we do use +/// for lightweight processes. +/// +/// AddNode1 o o +/// / \ | | +/// AddNode2 AddNode3 o o +/// +/// But where a process is expensive, duplication is not +/// an option. We need to arrange things such that we launch the upstream +/// compute on one thread, and have the other wait for its completion. +/// +/// Collect o +/// / \ / \ < second thread waiting for process +/// AddNode2 AddNode3 o o launched by first thread +/// +/// Ideally we don't want the waiting thread to simply block or spin though, as +/// that quickly reduces to only a single thread doing useful work. Instead we +/// want to provide the facility for waiting threads to _collaborate_, by +/// working on any TBB tasks spawned by the upstream process. We now have a new +/// requirement : we need to track the in-flight processes that are available +/// for collaboration, which we do in `Process::acquireCollaborativeResult()`. +/// And our process graphs can now contain diamond connections at collaboration +/// points, making them general directed acyclic graphs rather than simple +/// trees. +/// +/// Hash aliasing +/// ------------- +/// +/// To track in-flight processes we need a way of identifying them, and we do +/// this using the same key that is used to cache their results. In the case of +/// ComputeProcess, the key is a hash generated by `ComputeNode::hash()`, which +/// must uniquely identify the result of the process. +/// +/// But we have a problem : this hash can _alias_, and indeed it is encouraged +/// to. By aliasing, we mean that two processes can have the same hash provided +/// that they will generate the same result. For example, two different +/// SceneReader nodes will share hashes if they are each reading from the same +/// file. Or two locations within a scene will share hashes if they are known to +/// generate identical objects. In both cases, aliasing the hashes allows us to +/// avoid redundant computes and the creation of redundant cache entries. But this +/// adds complexity to the process graph - through hash aliasing, processes can +/// end up collaborating on nodes they have no actual connection to. +/// +/// Collect1 Collect2 o < Collect1 and Collect2 have the same +/// | | / \ < hash, so Expression2 is now +/// Expression1 Expression2 o o < collaborating on Collect1! +/// +/// Again, this is desirable as it reduces redundant work. But hashes can also +/// alias in less predictable ways. As `ExpressionTest.testHashAliasing` +/// shows, it's possible to create a node network such that a downstream node +/// depends on an upstream node with an _identical hash_. If we attempt process +/// collaboration in this case, we create a cyclic dependency that results in +/// a form of deadlock. +/// +/// Expression1 +/// | +/// Expression2 o----- +/// | | | +/// Expression3 o<---- +/// +/// This is _the_ key problem in our management of threaded collaborative +/// processes. We want node authors to be free to alias hashes without +/// constraint, to reduce redundant computes and cache pressure to the maximum +/// extent possible. But with the right node graph, _any_ aliasing may +/// lead to a cyclic dependency evolving dynamically in the corresponding +/// process graph. +/// +/// In practice, such cyclic dependencies are rare, but not rare enough +/// that we can neglect them completely. Our stragegy is therefore to +/// perform collaboration wherever we can, but to replace it with one +/// additional "redundant" process where collaboration would cause a +/// cycle. +/// +/// Expression1 o < this process has the same hash... +/// | | +/// Expression2 o +/// | | +/// Expression3 o < ...as this one +/// +/// Conceptually this is relatively simple, but it is made trickier by the +/// constantly mutating nature of the process graph. Although all new processes +/// are always added at the leafs of the process "tree", collaboration can insert +/// arbitrary diamond dependencies between existing processes anywhere in the +/// graph, at any time, and from any thread, and our cycle checking must account +/// for this without introducing excessive overhead. +/// +/// > Tip : At this point it is useful to forget about nodes and plugs and +/// connections and to instead consider the process graph largely in the +/// abstract. Processes are vertices in the graph. Dependencies are directed +/// edges between processes. Edge insertion may be attempted anywhere by +/// collaboration at any time, and cycles must be avoided. + +/// A "vertex" in the process graph where collaboration may be performed. We +/// only track collaborative processes because non-collaborative processes can't +/// introduce edges that could lead to cycles. +class GAFFER_API Process::Collaboration : public IECore::RefCounted +{ + + public : + + // Work around https://bugs.llvm.org/show_bug.cgi?id=32978 + ~Collaboration() noexcept( true ) override; + + IE_CORE_DECLAREMEMBERPTR( Collaboration ); + + // Arena and task group used to allow waiting threads to participate + // in collaborative work. + tbb::task_arena arena; + tbb::task_group taskGroup; + + using Set = std::unordered_set; + // Collaborations depending directly on this one. + Set dependents; + + // Returns true if this collaboration depends on `collaboration`, either + // directly or indirectly via other collaborations it depends on. + // The caller of this function must hold `g_dependentsMutex`. + bool dependsOn( const Collaboration *collaboration ) const; + + // Protects access to `dependents` on _all_ Collaborations. + static tbb::spin_mutex g_dependentsMutex; + +}; + +/// Collaboration subclass specific to a single type of process, providing storage for the result +/// and tracking of the currently in-flight collaborations by cache key. +/// +/// > Note : We track dependencies between all types of collaboration, not just between like types. +template +class Process::TypedCollaboration : public Process::Collaboration +{ + public : + + std::optional result; + + IE_CORE_DECLAREMEMBERPTR( TypedCollaboration ); + + using PendingCollaborations = tbb::concurrent_hash_map>; + static PendingCollaborations g_pendingCollaborations; + +}; + +template +typename Process::TypedCollaboration::PendingCollaborations Process::TypedCollaboration::g_pendingCollaborations; + +template +typename ProcessType::ResultType Process::acquireCollaborativeResult( + const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments&&... args +) +{ + const ThreadState &threadState = ThreadState::current(); + const Collaboration *currentCollaboration = threadState.process() ? threadState.process()->m_collaboration : nullptr; + + // Check for any in-flight computes for the same cache key. If we find a + // suitable one, we'll wait for it and use its result. + + using CollaborationType = TypedCollaboration; + using CollaborationTypePtr = typename CollaborationType::Ptr; + + typename CollaborationType::PendingCollaborations::accessor accessor; + CollaborationType::g_pendingCollaborations.insert( accessor, cacheKey ); + + for( const auto &candidate : accessor->second ) + { + // Check to see if we can safely collaborate on `candidate` without + // risking deadlock. We optimistically perform the cheapest checks + // first; if we're not already in a collaboration, or if the + // collaboration we're in already depends on the candidate (via another + // thread of execution) then we're good to go. + // + // The call to `candidate->dependents.find()` is safe even though we + // don't hold `g_dependentsMutex`, because we hold the accessor for + // `candidate`, and that is always held by any writer of + // `candidate->dependents`. + if( currentCollaboration && candidate->dependents.find( currentCollaboration ) == candidate->dependents.end() ) + { + // Perform much more expensive check for potential deadlock - we + // mustn't become a dependent of `candidate` if it already depends + // on us. This requires traversing all dependents of + // `currentCollaboration` while holding `g_dependentsMutex` (so they + // can't be modified while we read). + tbb::spin_mutex::scoped_lock dependentsLock( Collaboration::g_dependentsMutex ); + if( !candidate->dependsOn( currentCollaboration ) ) + { + // We're safe to collaborate. Add ourself as a dependent before + // releasing `g_dependentsMutex`. + candidate->dependents.insert( currentCollaboration ); + } + else + { + continue; + } + } + + // We've found an in-flight process we can wait on without causing + // deadlock. Join its `task_arena` and wait on the result, so we get to + // work on any TBB tasks it has created. + // + // > Note : We need to own a reference to `collaboration` because the + // thread that created it may drop its own reference as soon as we call + // `release()`, because that allows the original `run_and_wait()` to + // complete. + // + // > Caution : Now the primary `run_and_wait()` can return, any other + // waiting threads can also move on. That means that + // `collaboration->dependents` may now contain dangling pointers. Do + // not access them! + + CollaborationTypePtr collaboration = candidate; + accessor.release(); + + collaboration->arena.execute( + [&]{ return collaboration->taskGroup.wait(); } + ); + + if( collaboration->result ) + { + return *collaboration->result; + } + else + { + throw IECore::Exception( "Process::acquireCollaborativeResult : No result found" ); + } + } + + // No suitable in-flight collaborations, so we'll create one of our own. + // First though, check the cache one more time, in case another thread has + // started and finished an equivalent collaboration since we first checked. + + if( auto result = ProcessType::g_cache.getIfCached( cacheKey ) ) + { + return *result; + } + + CollaborationTypePtr collaboration = new CollaborationType; + if( currentCollaboration ) + { + // No need to hold `m_dependentsMutex` here because other threads can't + // access `collaboration->dependents` until we publish it. + collaboration->dependents.insert( currentCollaboration ); + } + + std::exception_ptr exception; + + auto status = collaboration->arena.execute( + [&] { + return collaboration->taskGroup.run_and_wait( + [&] { + // Publish ourselves so that other threads can collaborate + // by calling `collaboration->taskGroup.wait()`. + accessor->second.push_back( collaboration ); + accessor.release(); + + try + { + ProcessType process( std::forward( args )... ); + process.m_collaboration = collaboration.get(); + collaboration->result = process.run(); + // Publish result to cache before we remove ourself from + // `g_pendingCollaborations`, so that other threads will + // be able to get the result one way or the other. + ProcessType::g_cache.setIfUncached( + cacheKey, *collaboration->result, + ProcessType::cacheCostFunction + ); + } + catch( ... ) + { + // Don't allow `task_group::wait()` to see exceptions, + // because then we'd hit a thread-safety bug in + // `tbb::task_group_context::reset()`. + exception = std::current_exception(); + } + + // Now we're done, remove `collaboration` from the pending collaborations. + [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); + assert( found ); + auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); + assert( toErase != accessor->second.end() ); + accessor->second.erase( toErase ); + if( accessor->second.empty() ) + { + CollaborationType::g_pendingCollaborations.erase( accessor ); + } + accessor.release(); + } + ); + } + ); + + if( exception ) + { + std::rethrow_exception( exception ); + } + else if( status == tbb::task_group_status::canceled ) + { + throw IECore::Cancelled(); + } + + return *collaboration->result; +} + inline bool Process::forceMonitoring( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType ) { if( s.m_mightForceMonitoring ) diff --git a/include/Gaffer/ValuePlug.h b/include/Gaffer/ValuePlug.h index 4913e88e953..706ea41f349 100644 --- a/include/Gaffer/ValuePlug.h +++ b/include/Gaffer/ValuePlug.h @@ -119,41 +119,29 @@ class GAFFER_API ValuePlug : public Plug /// and hash for output plugs. enum class CachePolicy { - /// No caching is performed. Suitable for - /// extremely quick processes. Also useful - /// to avoid double-counting of cache memory when - /// a compute always returns a sub-object of another - /// cache entry. + /// No caching is performed. Suitable for extremely quick processes. + /// Also useful to avoid double-counting of cache memory when a + /// compute always returns a sub-object of another cache entry. Uncached, - /// Suitable for regular processes that don't spawn - /// TBB tasks. It is essential that any task-spawning - /// processes use one of the dedicated policies below. - /// \todo It isn't actually clear that the locking of the - /// Standard policy is an improvement over the non-locked - /// Legacy policy. Locking on a downstream Standard - /// compute might prevent multiple threads from participating - /// in an upstream TaskCollaboration. And for small computes - /// that are unlikely to be needed by multiple threads, - /// we may well prefer to avoid the contention. Note that - /// many scene computes may fit this category, as every - /// non-filtered location is implemented as a very cheap - /// pass-through compute. There's also a decent argument - /// that any non-trivial amount of work should be using TBB, - /// so it would be a mistake to do anything expensive with - /// a Standard policy anyway. + /// Deprecated synonym for TaskCollaboration (for + /// `computeCachePolicy()`) and Default (for `hashCachePolicy()`). + /// Will be removed in a future release. Standard, - /// Suitable for processes that spawn TBB tasks. - /// Threads waiting for the same result will collaborate - /// to perform tasks together until the work is complete. + /// Must be used for processes that spawn TBB tasks. Results are + /// stored in a global cache, and threads waiting for the same + /// result will collaborate to perform tasks together until the work + /// is complete. TaskCollaboration, - /// Suitable for processes that spawn TBB tasks. Threads - /// waiting for an in-progress compute will block until - /// it is complete. In theory this is inferior to TaskCollaboration, - /// but due to TBB overhead it may be preferable for small - /// but frequent computes. + /// Deprecated synonym for TaskCollaboration. Will be removed in a + /// future release. TaskIsolation, - /// Legacy policy, to be removed. - Legacy + /// Suitable for relatively lightweight processes that could benefit + /// from caching, but do not spawn TBB tasks, and are unlikely to be + /// required from multiple threads concurrently. + Default, + /// Deprecated synonym for Default. Will be removed in a future + /// release. + Legacy = Default }; /// @name Cache management diff --git a/include/GafferScene/ObjectProcessor.h b/include/GafferScene/ObjectProcessor.h index 170859e993b..99b0b6e1e65 100644 --- a/include/GafferScene/ObjectProcessor.h +++ b/include/GafferScene/ObjectProcessor.h @@ -79,8 +79,8 @@ class GAFFERSCENE_API ObjectProcessor : public FilteredSceneProcessor virtual void hashProcessedObject( const ScenePath &path, const Gaffer::Context *context, IECore::MurmurHash &h ) const = 0; /// Must be implemented by derived classes to return the processed object. virtual IECore::ConstObjectPtr computeProcessedObject( const ScenePath &path, const Gaffer::Context *context, const IECore::Object *inputObject ) const = 0; - /// Must be implemented to return an appropriate policy if `computeProcessedObject()` spawns - /// TBB tasks. The default implementation returns `ValuePlug::CachePolicy::Legacy`. + /// Must be implemented to return `ValuePlug::CachePolicy::TaskCollaboration` if `computeProcessedObject()` spawns + /// TBB tasks. The default implementation returns `ValuePlug::CachePolicy::Default`. virtual Gaffer::ValuePlug::CachePolicy processedObjectComputeCachePolicy() const; void hash( const Gaffer::ValuePlug *output, const Gaffer::Context *context, IECore::MurmurHash &h ) const override; diff --git a/python/GafferSceneTest/ParentTest.py b/python/GafferSceneTest/ParentTest.py index be8031ee210..eb46adb1b64 100644 --- a/python/GafferSceneTest/ParentTest.py +++ b/python/GafferSceneTest/ParentTest.py @@ -36,6 +36,7 @@ import pathlib import inspect +import functools import imath @@ -937,5 +938,312 @@ def testInvalidDestinationNames( self ) : with self.assertRaisesRegex( Gaffer.ProcessException, r".*Invalid destination `/woops/a\*`. Name `a\*` is invalid \(because it contains filter wildcards\)" ) : parent["out"].childNames( "/" ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testChainedNodesWithIdenticalBranches( self ) : + + # infiniteScene + # | + # parent2 + # | + # parent1 + + # Trick to make a large scene without needing a cache file + # and without using a BranchCreator. This scene is infinite + # but very cheap to compute. + + infiniteScene = GafferScene.ScenePlug() + infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + # Filter that will search the scene to a fixed depth, but + # never find anything. + + nothingFilter = GafferScene.PathFilter() + nothingFilter["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*/*/*/*/thisDoesntExist" ] ) ) + + # Two Parent nodes one after another, using the same filter. + # These will generate the same (empty) set of branches, and + # therefore have the same hash. + + parent2 = GafferScene.Parent() + parent2["in"].setInput( infiniteScene ) + parent2["filter"].setInput( nothingFilter["out"] ) + + parent1 = GafferScene.Parent() + parent1["in"].setInput( parent2["out"] ) + parent1["filter"].setInput( nothingFilter["out"] ) + + self.assertEqual( parent1["__branches"].hash(), parent2["__branches"].hash() ) + + # Simulate the effects of a previous computation being evicted + # from the cache. + + parent1["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # We are now a situation where the hash for `parent1.__branches` is + # cached, but the value isn't. This is significant, because it means + # that in the next step, the downstream compute for `parent1.__branches` + # starts _before_ the upstream one for `parent2.__branches`. If the hash + # wasn't cached, then the hash for `parent1` would trigger + # an upstream compute for `parent2.__branches` first. + + # Trigger scene generation. We can't use `traverseScene()` because the + # scene is infinite, so we use `matchingPaths()` to generate up to a fixed + # depth. The key effect here is that lots of threads are indirectly pulling on + # `parent1.__branches`, triggering task collaboration. + + with Gaffer.PerformanceMonitor() as monitor : + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), parent1["out"], paths ) + + # We only expect to see a single hash/compute for `__branches` on each + # node. A previous bug meant that this was not the case, and thousands + # of unnecessary evaluations of `parent2.__branches` could occur. + + self.assertEqual( monitor.plugStatistics( parent1["__branches"] ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( parent2["__branches"] ).computeCount, 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testChainedNodesWithIdenticalBranchesAndIntermediateCompute( self ) : + + # As for `testChainedNodesWithIdenticalBranches` above, but with a + # _different_ Parent node inserted between the identical two. + # + # infiniteScene + # | + # parentA2 + # | + # parentB + # | + # parentA1 + # + + infiniteScene = GafferScene.ScenePlug() + infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + filterA = GafferScene.PathFilter() + filterA["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/thisDoesntExist" ] ) ) + + somethingFilter = GafferScene.PathFilter() + somethingFilter["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*" ] ) ) + + parentA2 = GafferScene.Parent() + parentA2["in"].setInput( infiniteScene ) + parentA2["filter"].setInput( filterA["out"] ) + + parentB = GafferScene.Parent() + parentB["in"].setInput( parentA2["out"] ) + parentB["filter"].setInput( somethingFilter["out"] ) + + parentA1 = GafferScene.Parent() + parentA1["in"].setInput( parentB["out"] ) + parentA1["filter"].setInput( filterA["out"] ) + + self.assertEqual( parentA1["__branches"].hash(), parentA2["__branches"].hash() ) + + # Simulate the effects of a previous computation being evicted + # from the cache. + + parentA1["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # We are now a situation where the hash for `parentA1.__branches` is + # cached, but the value isn't. This means that the next call will + # trigger the compute for `parentA1.__branches` _first_. In turn, that + # will trigger a compute on `parentB.__branches`, and _that_ will + # recurse to `parentA2.__branches`. There are now two paths for worker + # threads to need the result from `parentA2` : + # + # 1. The worker enters the task arena for `parentA1`, then enters the + # nested task arena for `parentB`, and finally tries to get the result + # from `parentA2`. + # 2. The worker enters the task arena for `parentB` directly, takes a task, then + # tries to get the result from `parentA1`. + # + # In the first case we were able to detect the recursion from `parentA1` to + # `parentA2` using a `task_arena_observer`. But in the second case, the worker + # enters the `parentB` arena directly, and isn't considered to be in the `parentA1` + # arena by `task_arena_observer`. So this worker tries to do task collaboration + # on A, and is therefore waiting on A to finish. But A can't finish until B finishes + # on the first worker thread. And B can't finish on the first worker thread because + # it is waiting on a task that is trapped on the second worker thread. Deadlock! + # + # This deficiency in using `task_arena_observer` to detect recursion lead + # us to develop a different method, one that avoids the deadlock + # that was being triggered by the call below. + + parentA1["__branches"].getValue() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testSplitNodesWithIdenticalBranchesAndIntermediateCompute( self ) : + + # As for `testChainedNodesWithIdenticalBranches` above, but with node + # `parentC` branching off in parallel with `parentA1`. This branch + # arrives at `parentA2` without having `parentA1` in the history. This + # is similar to the case above - it is another mechanism whereby B is + # needed before A on one thread in particular. + # + # infiniteScene + # | + # parentA2 + # | + # parentB + # / \ + # parentA1 parentC + + script = Gaffer.ScriptNode() + + script["infiniteScene"] = GafferScene.ScenePlug() + script["infiniteScene"]["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + script["filterA"] = GafferScene.PathFilter() + script["filterA"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*thisDoesntExist" ] ) ) + + script["filterB"] = GafferScene.PathFilter() + script["filterB"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/one" ] ) ) + + script["filterC"] = GafferScene.PathFilter() + script["filterC"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/two" ] ) ) + + script["parentA2"] = GafferScene.Parent() + script["parentA2"]["in"].setInput( script["infiniteScene"] ) + script["parentA2"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentB"] = GafferScene.Parent() + script["parentB"]["in"].setInput( script["parentA2"]["out"] ) + script["parentB"]["filter"].setInput( script["filterB"]["out"] ) + + script["parentA1"] = GafferScene.Parent() + script["parentA1"]["in"].setInput( script["parentB"]["out"] ) + script["parentA1"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentC"] = GafferScene.Parent() + script["parentC"]["in"].setInput( script["parentB"]["out"] ) + script["parentC"]["filter"].setInput( script["filterC"]["out"] ) + + self.assertEqual( script["parentA1"]["__branches"].hash(), script["parentA2"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentB"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + self.assertNotEqual( script["parentB"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + + # Repeat this one, because it is more sensitive to specific task timings. + for i in range( 0, 100 ) : + + # Simulate the effects of a previous computation being evicted + # from the cache. + + script["parentA1"]["__branches"].getValue() + script["parentC"]["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # Trigger a bunch of parallel computes on both `parentA1` and `parentC`. + + def evaluateScene( scene ) : + + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), scene, paths ) + + parentA1Task = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentA1"]["out"], functools.partial( evaluateScene, script["parentA1"]["out"] ) ) + parentCTask = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentC"]["out"], functools.partial( evaluateScene, script["parentC"]["out"] ) ) + + parentA1Task.wait() + parentCTask.wait() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testDependencyPropagationThroughIntermediateCompute( self ) : + + # As for `testSplitNodesWithIdenticalBranchesAndIntermediateCompute` above, but + # with an additional task collaboration between `parentB` and `parentA2`. This + # means that we need to track the dependency from `parentB` to `parentA2` through + # an intermediate node. + # + # infiniteScene + # | + # parentA2 + # | + # parentI + # | + # parentB + # / \ + # parentA1 parentC + + script = Gaffer.ScriptNode() + + script["infiniteScene"] = GafferScene.ScenePlug() + script["infiniteScene"]["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + script["filterA"] = GafferScene.PathFilter() + script["filterA"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*thisDoesntExist" ] ) ) + + script["filterB"] = GafferScene.PathFilter() + script["filterB"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/one" ] ) ) + + script["filterC"] = GafferScene.PathFilter() + script["filterC"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/two" ] ) ) + + script["filterI"] = GafferScene.PathFilter() + script["filterI"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*/*two" ] ) ) + + script["parentA2"] = GafferScene.Parent() + script["parentA2"]["in"].setInput( script["infiniteScene"] ) + script["parentA2"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentI"] = GafferScene.Parent() + script["parentI"]["in"].setInput( script["parentA2"]["out"] ) + script["parentI"]["filter"].setInput( script["filterI"]["out"] ) + + script["parentB"] = GafferScene.Parent() + script["parentB"]["in"].setInput( script["parentI"]["out"] ) + script["parentB"]["filter"].setInput( script["filterB"]["out"] ) + + script["parentA1"] = GafferScene.Parent() + script["parentA1"]["in"].setInput( script["parentB"]["out"] ) + script["parentA1"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentC"] = GafferScene.Parent() + script["parentC"]["in"].setInput( script["parentB"]["out"] ) + script["parentC"]["filter"].setInput( script["filterC"]["out"] ) + + self.assertEqual( script["parentA1"]["__branches"].hash(), script["parentA2"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentB"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + self.assertNotEqual( script["parentB"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + + # Repeat this one, because it is more sensitive to specific task timings. + for i in range( 0, 100 ) : + + # Simulate the effects of a previous computation being evicted + # from the cache. + + script["parentA1"]["__branches"].getValue() + script["parentC"]["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # Trigger a bunch of parallel computes on both `parentA1` and + # `parentC`. The sequence of computes we're concerned about is this : + # + # 1. `parentC` + # 2. `parentB` + # 3. `parentI` + # 4. `parentA1` + # 5. `parentB` (collaboration from `parentA1`) + # 6. `parentA2` (mustn't collaborate with `parentA1`) + # + # This means realising that `parentA2` is being waited on by + # `parentA1`, even though `parentI` started computing _before_ + # collaboration on `parentB` started. So we can't simply track + # dependents up the chain at the point each process starts. + + def evaluateScene( scene ) : + + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), scene, paths ) + + parentCTask = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentC"]["out"], functools.partial( evaluateScene, script["parentC"]["out"] ) ) + parentA1Task = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentA1"]["out"], functools.partial( evaluateScene, script["parentA1"]["out"] ) ) + + parentCTask.wait() + parentA1Task.wait() + if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/ComputeNodeTest.py b/python/GafferTest/ComputeNodeTest.py index 943ebd24f91..7a3792f3082 100644 --- a/python/GafferTest/ComputeNodeTest.py +++ b/python/GafferTest/ComputeNodeTest.py @@ -608,6 +608,7 @@ class ThrowingNode( Gaffer.ComputeNode ) : def __init__( self, name="ThrowingNode" ) : self.hashFail = False + self.cachePolicy = Gaffer.ValuePlug.CachePolicy.Standard Gaffer.ComputeNode.__init__( self, name ) @@ -639,11 +640,11 @@ def compute( self, plug, context ) : def hashCachePolicy( self, plug ) : - return Gaffer.ValuePlug.CachePolicy.Standard + return self.cachePolicy def computeCachePolicy( self, plug ) : - return Gaffer.ValuePlug.CachePolicy.Standard + return self.cachePolicy IECore.registerRunTimeTyped( ThrowingNode ) @@ -684,18 +685,25 @@ def testProcessException( self ) : def testProcessExceptionNotShared( self ) : - thrower1 = self.ThrowingNode( "thrower1" ) - thrower2 = self.ThrowingNode( "thrower2" ) + for policy in Gaffer.ValuePlug.CachePolicy.names.values() : + with self.subTest( policy = policy ) : - with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower1.out : [\s\S]*Eeek!' ) as raised : - thrower1["out"].getValue() + Gaffer.ValuePlug.clearCache() - self.assertEqual( raised.exception.plug(), thrower1["out"] ) + thrower1 = self.ThrowingNode( "thrower1" ) + thrower1.cachePolicy = policy + thrower2 = self.ThrowingNode( "thrower2" ) + thrower2.cachePolicy = policy - with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower2.out : [\s\S]*Eeek!' ) as raised : - thrower2["out"].getValue() + with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower1.out : [\s\S]*Eeek!' ) as raised : + thrower1["out"].getValue() + + self.assertEqual( raised.exception.plug(), thrower1["out"] ) + + with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower2.out : [\s\S]*Eeek!' ) as raised : + thrower2["out"].getValue() - self.assertEqual( raised.exception.plug(), thrower2["out"] ) + self.assertEqual( raised.exception.plug(), thrower2["out"] ) def testProcessExceptionRespectsNameChanges( self ) : diff --git a/python/GafferTest/ExpressionTest.py b/python/GafferTest/ExpressionTest.py index 76dc9ebd2b0..3e08fa1aaa3 100644 --- a/python/GafferTest/ExpressionTest.py +++ b/python/GafferTest/ExpressionTest.py @@ -1650,5 +1650,60 @@ def testPathForStringPlug( self ) : self.assertEqual( script["node"]["in"].getValue(), pathlib.Path.cwd().as_posix() ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testHashAliasing( self ) : + + # p1 + # | + # e1 + # | + # p2 + # | + # e2 + # | + # p3 + # | + # e3 (aliases the hash from e1) + # | + # p4 + + script = Gaffer.ScriptNode() + + script["n"] = Gaffer.Node() + script["n"]["user"]["p1"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p2"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p3"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p4"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + + script["e1"] = Gaffer.Expression() + script["e1"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p2"] = parent["n"]["user"]["p1"]""" ) + + script["e2"] = Gaffer.Expression() + script["e2"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p3"] = parent["n"]["user"]["p2"] and ''""" ) + + script["e3"] = Gaffer.Expression() + script["e3"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p4"] = parent["n"]["user"]["p3"]""" ) + + # Because the input StringPlugs hash by value (see de8ab79d6f958cef3b80954798f8083a346945a7), + # and the expression is stored in an internalised form that omits the names of the source + # and destination plugs, the hashes for `e3` and `e1`` are identical, even though one depends + # on the other. + self.assertEqual( script["e1"]["__execute"].hash(), script["e3"]["__execute"].hash() ) + # But the hash for `e2` is different, because it has different expression source code + # (even though it will generate the same value). + self.assertNotEqual( script["e2"]["__execute"].hash(), script["e1"]["__execute"].hash() ) + + # Get the value for `p4`, which will actually compute and cache `p2` first due to the hash + # computation done before the compute. So we never actually do a compute on `p4`, as the value + # for that comes from the cache. + script["n"]["user"]["p4"].getValue() + # Simulate a cache eviction, so we have the hash cached, but not the value. + Gaffer.ValuePlug.clearCache() + + # Now, getting the value of `p4` will trigger an immediate compute for `e3`, which will + # recurse to an upstream compute for `e1`, which has the same hash. If we don't have a + # mechanism for handling it, this will deadlock. + script["n"]["user"]["p4"].getValue() + if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py index 2a49068aa4e..f38ad0d3f4a 100644 --- a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py +++ b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py @@ -154,14 +154,6 @@ def testRecursionWithEvictionsTaskParallel( self ) : GafferTest.testLRUCacheRecursion( "taskParallel", numIterations = 100000, numValues = 1000, maxCost = 100 ) - def testRecursionOnOneItemSerial( self ) : - - GafferTest.testLRUCacheRecursionOnOneItem( "serial" ) - - def testRecursionOnOneItemTaskParallel( self ) : - - GafferTest.testLRUCacheRecursionOnOneItem( "taskParallel" ) - def testClearFromGetSerial( self ) : GafferTest.testLRUCacheClearFromGet( "serial" ) @@ -236,12 +228,5 @@ def testSetIfUncached( self ) : with self.subTest( policy = policy ) : GafferTest.testLRUCacheSetIfUncached( policy ) - def testSetIfUncachedRecursion( self ) : - - # `parallel` policy omitted because it doesn't support recursion. - for policy in [ "serial", "taskParallel" ] : - with self.subTest( policy = policy ) : - GafferTest.testLRUCacheSetIfUncachedRecursion( policy ) - if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/IECorePreviewTest/TaskMutexTest.py b/python/GafferTest/IECorePreviewTest/TaskMutexTest.py index bf20cb53c82..885319f7900 100644 --- a/python/GafferTest/IECorePreviewTest/TaskMutexTest.py +++ b/python/GafferTest/IECorePreviewTest/TaskMutexTest.py @@ -64,10 +64,6 @@ def testHeavyContentionWithoutWorkAcceptance( self ) : GafferTest.testTaskMutexHeavyContention( False ) - def testWorkerRecursion( self ) : - - GafferTest.testTaskMutexWorkerRecursion() - def testAcquireOr( self ) : GafferTest.testTaskMutexAcquireOr() diff --git a/python/GafferTest/LoopTest.py b/python/GafferTest/LoopTest.py index 33835f0c6a0..0cf4d975f19 100644 --- a/python/GafferTest/LoopTest.py +++ b/python/GafferTest/LoopTest.py @@ -282,5 +282,30 @@ def plugDirtied( plug ) : for plug, value in valuesWhenDirtied.items() : self.assertEqual( plugValue( plug ), value ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testHashAliasingDeadlock( self ) : + + script = Gaffer.ScriptNode() + + script["loop"] = Gaffer.Loop() + script["loop"].setup( Gaffer.StringPlug() ) + + # Dumb expression that just sets the value for the next iteration to + # the value from the previous iteration. Because of de8ab79d6f958cef3b80954798f8083a346945a7, + # the hash for the expression output is identical for every iteration of + # the loop, even though the context differs. + script["expression"] = Gaffer.Expression() + script["expression"].setExpression( """parent["loop"]["next"] = parent["loop"]["previous"]""" ) + + # Get the result of the loop. This actually computes the _first_ iteration of the loop first, + # while computing the hash of the result, and reuses the result for every other loop iteration. + script["loop"]["out"].getValue() + # Simulate cache eviction by clearing the compute cache. + Gaffer.ValuePlug.clearCache() + # Get the value again. Now, because the hash is still cached, this will first start the + # compute for the _last_ iteration. This leads to a recursive compute, which can cause deadlock + # if not handled appropriately. + script["loop"]["out"].getValue() + if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/ProcessTest.py b/python/GafferTest/ProcessTest.py new file mode 100644 index 00000000000..466924e8e31 --- /dev/null +++ b/python/GafferTest/ProcessTest.py @@ -0,0 +1,433 @@ +########################################################################## +# +# Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided with +# the distribution. +# +# * Neither the name of John Haddon nor the names of +# any other contributors to this software may be used to endorse or +# promote products derived from this software without specific prior +# written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +########################################################################## + +import unittest + +import IECore + +import Gaffer +import GafferTest + +class ProcessTest( GafferTest.TestCase ) : + + def setUp( self ) : + + GafferTest.TestCase.setUp( self ) + GafferTest.clearTestProcessCache() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testCollaboration( self ) : + + # We expect processes `1...n` to collaborate on process `n+1`. + # + # n+1 + # / | \ + # 1 ... n + # \ | / + # 0 + # + # Note on conventions used throughout this file : + # + # - Processes are labelled with the value of their result. + # - Lines connecting processes denote dependencies between them. + # - Dependent processes appear below the processes they depend on, + # matching the typical top-to-bottom flow of a Gaffer graph. + # - The root process is therefore always the bottom-most one. + + n = 10000 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { x : { n + 1 : {} } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testCollaborationFromNonCollaborativeProcesses( self ) : + + # As above, but the waiting processes are not themselves collaborative. + # + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 + + n = 100000 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { -x : { 1 : {} } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 10 + # | + # 10 + # | + # 10 + # | + # 10 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 1, { 10 : { 10 : { 10 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnIndirectRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 1 + # | + # 2 + # | + # 1 + # | + # 0 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 0, { 1 : { 2 : { 1 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNonCollaborativeProcessWithinRecursion( self ) : + + # As above, but using a non-collaborative task in the middle of the recursion. + # + # 1 + # | + # -2 + # | + # 1 + # | + # 0 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 0, { 1 : { -2 : { 1 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnDiamondRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 1 + # | + # 3 + # / \ + # 1 2 + # \ / + # 0 + + plug = Gaffer.Plug() + + for i in range( 0, 100 ) : + GafferTest.clearTestProcessCache() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { + 1 : { 3 : { 1 : {} } }, + 2 : { 3 : { 1 : {} } } + } + ) + + # There are various possibilities for execution, based on different + # thread timings. + # + # - The `0-2-3-1` branch completes first, so `1` is already cached by + # the time the `0-1` branch wants it. 4 computes total. + # - The `0-1-3-1` branch completes first, with a duplicate compute for + # `1` to avoid deadlock. 5 computes total. + # - The `0-2-3` branch waits on `1` from the `0-1` branch. The `0-1` + # branch performs duplicate computes for `3` and `1` to avoid deadlock. + # 6 computes total. + self.assertIn( monitor.plugStatistics( plug ).computeCount, { 4, 5, 6 } ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnIndirectDiamondRecursion( self ) : + + # As above, but with an additional process (4), meaning we have + # to track non-immediate dependencies between processes. + # + # 1 + # | + # 4 + # | + # 3 + # / \ + # 1 2 + # \ / + # 0 + + plug = Gaffer.Plug() + + for i in range( 0, 100 ) : + GafferTest.clearTestProcessCache() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { + 1 : { 3 : { 4 : { 1 : {} } } }, + 2 : { 3 : { 4 : { 1 : {} } } }, + } + ) + + self.assertIn( monitor.plugStatistics( plug ).computeCount, { 5, 6, 8 } ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testCollaborationTaskDistribution( self ) : + + # -1 ... -n plug4 + # \ | / + # n+1 plug3 + # / | \ + # 1 ... n plug2 + # \ | / + # 0 plug1 + + numWorkers = IECore.tbb_global_control.active_value( IECore.tbb_global_control.parameter.max_allowed_parallelism ) + n = 10000 * numWorkers + + plug1 = Gaffer.Plug() + plug2 = Gaffer.Plug() + plug3 = Gaffer.Plug() + plug4 = Gaffer.Plug() + plug1.setInput( plug2 ) + plug2.setInput( plug3 ) + plug3.setInput( plug4 ) + + dependencies = { -x : {} for x in range( 1, n + 1 ) } + dependencies = { x : { n + 1 : dependencies } for x in range( 1, n + 1 ) } + + intPlug = Gaffer.IntPlug() + GafferTest.parallelGetValue( intPlug, 100000 ) # Get worker threads running in advance + + with Gaffer.PerformanceMonitor() as monitor, Gaffer.ThreadMonitor() as threadMonitor : + GafferTest.runTestProcess( plug1, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug1 ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( plug2 ).computeCount, n ) + self.assertEqual( monitor.plugStatistics( plug3 ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( plug4 ).computeCount, n ) + + def assertExpectedThreading( plug, numTasks ) : + + s = threadMonitor.plugStatistics( plug ) # Dict mapping thread ID to number of computes + self.assertEqual( sum( s.values() ), numTasks ) + + if numTasks == 1 : + self.assertEqual( len( s ), 1 ) + else : + # Check that every worker thread contributed some work. + self.assertEqual( len( s ), numWorkers ) + # Check that each worker thread did at least half of its fair + # share of work. This assertion is too sensitive in CI so it is + # disabled by default. On my local test machine (Dual 16 core + # Xeon with hyperthreading) I see pretty reliable success up to + # `-threads 32`, and regular failure at `-threads 64` (the + # default). + if False : + for t in s.values() : + self.assertGreaterEqual( t, 0.5 * numTasks / numWorkers ) + + assertExpectedThreading( plug1, 1 ) + assertExpectedThreading( plug2, n ) + assertExpectedThreading( plug3, 1 ) + assertExpectedThreading( plug4, n ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testFanOutGatherPerformance( self ) : + + # Pathological case for cycle checking - huge permutations + # of paths through the downstream graph. + # + # 0 + # / | \ + # 1 2 3 + # \ | / + # 4 + # / | \ + # 5 6 7 + # \ | / + # 8 + # / | \ + # 9 10 11 + # \ | / + # 12 (for width 3 and depth 3) + + width = 64 + depth = 10 + + dependencies = {} + i = 0 + for d in range( 0, depth ) : + dependencies = { i : dependencies } + i += 1 + dependencies = { w : dependencies for w in range( i, i + width ) } + i += width + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.clearTestProcessCache() + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, i, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, i + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testCollaborationPerformance( self ) : + + # -(n+1)...-2n + # \ | / + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 + + n = 100000 + + upstreamDependencies = { -x : {} for x in range( n + 1, 2 * n + 1 ) } + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( + plug, 0, + { -x : { 1 : upstreamDependencies } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 + n ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testCollaborationTransitionPerformance( self ) : + + # Case we think is probably pretty common - all threads need to migrate + # through a series of collaborations. + # + # 3 + # / | \ + # -(2n+1)...-3n + # \ | / + # 2 + # / | \ + # -(n+1)...-2n + # \ | / + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 (for depth 3) + + n = IECore.hardwareConcurrency() + depth = 1000 + + dependencies = {} + for d in range( depth, 0, -1 ) : + dependencies = { -x : { d : dependencies } for x in range( (d-1) * n + 1, d * n + 1 ) } + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, depth * ( n + 1 ) + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testDeepTreePerformance( self ) : + + # Models things like the recursive computation of bounding boxes in GafferScene + # + # 3 4 5 6 + # \ / \ / + # 1 2 + # \ / + # \ / + # 0 (for maxDepth 2 and branchFactor 2) + + maxDepth = 14 + branchFactor = 2 + + def makeDependencies( n, depth = 0 ) : + + if depth == maxDepth : + return {} + + return { i : makeDependencies( i, depth + 1 ) for i in range( n * branchFactor + 1, (n + 1) * branchFactor + 1 ) } + + dependencies = makeDependencies( 0 ) + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, sum( branchFactor ** d for d in range( 0, maxDepth + 1 ) ) ) + +if __name__ == "__main__": + unittest.main() diff --git a/python/GafferTest/ValuePlugTest.py b/python/GafferTest/ValuePlugTest.py index cd913ee67bc..95b245c9900 100644 --- a/python/GafferTest/ValuePlugTest.py +++ b/python/GafferTest/ValuePlugTest.py @@ -951,12 +951,29 @@ def computeCachePolicy( self, output ) : IECore.registerRunTimeTyped( InfiniteLoop ) for cachePolicy in ( - Gaffer.ValuePlug.CachePolicy.Legacy, - Gaffer.ValuePlug.CachePolicy.Standard, - Gaffer.ValuePlug.CachePolicy.TaskIsolation, + Gaffer.ValuePlug.CachePolicy.Default, # Omitting TaskCollaboration, because if our second compute joins as - # a worker, there is currently no way we can recall it. See comments - # in `LRUCachePolicy.TaskParallel.Handle.acquire`. + # a worker, there is currently no way we can recall it. This is not + # ideal as it means the UI stalls if one UI element is waiting to + # cancel an operation, but its tasks have been "captured" by + # collaboration on a compute started by another UI element (which + # hasn't requested cancellation). + # + ## \todo Improve this situation. Possibilities include : + # + # 1. Only collaborating on work if our canceller matches the one used + # by the original caller. This would cause redundant computes in + # the UI though. + # 2. Finding a way to join the cancellers so that cancellation on the + # second one triggers cancellation on the first. + # 3. Finding a way for the second canceller to trigger a call to + # `task_group::cancel()`. + # 4. Getting the two UI elements to use the same canceller in the first + # place. Perhaps this is the most promising avenue? Making their + # combined fates explicit in the API might not be a bad thing, and if + # we combined this with #1, a third UI element would have the option + # of doing its own compute with its own canceller, making it safe + # from combined cancellation. ) : script = Gaffer.ScriptNode() diff --git a/python/GafferTest/__init__.py b/python/GafferTest/__init__.py index 12f7263f04b..24d0b0dd4ac 100644 --- a/python/GafferTest/__init__.py +++ b/python/GafferTest/__init__.py @@ -159,6 +159,7 @@ def inCI( platforms = set() ) : from .OptionalValuePlugTest import OptionalValuePlugTest from .ThreadMonitorTest import ThreadMonitorTest from .CollectTest import CollectTest +from .ProcessTest import ProcessTest from .IECorePreviewTest import * diff --git a/src/Gaffer/ComputeNode.cpp b/src/Gaffer/ComputeNode.cpp index 1816a0d013c..92152840a79 100644 --- a/src/Gaffer/ComputeNode.cpp +++ b/src/Gaffer/ComputeNode.cpp @@ -80,7 +80,7 @@ void ComputeNode::compute( ValuePlug *output, const Context *context ) const ValuePlug::CachePolicy ComputeNode::hashCachePolicy( const ValuePlug *output ) const { - return ValuePlug::CachePolicy::Standard; + return ValuePlug::CachePolicy::Default; } ValuePlug::CachePolicy ComputeNode::computeCachePolicy( const ValuePlug *output ) const @@ -89,7 +89,5 @@ ValuePlug::CachePolicy ComputeNode::computeCachePolicy( const ValuePlug *output { return ValuePlug::CachePolicy::Uncached; } - /// \todo Return `Standard` once all task-spawning computes are - /// known to be declaring an appropriate policy. - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } diff --git a/src/Gaffer/Expression.cpp b/src/Gaffer/Expression.cpp index d2580f5f1e7..b605cb3330f 100644 --- a/src/Gaffer/Expression.cpp +++ b/src/Gaffer/Expression.cpp @@ -336,10 +336,6 @@ Gaffer::ValuePlug::CachePolicy Expression::computeCachePolicy( const Gaffer::Val { return m_engine->executeCachePolicy(); } - else - { - return ValuePlug::CachePolicy::Legacy; - } } return ComputeNode::computeCachePolicy( output ); } diff --git a/src/Gaffer/Process.cpp b/src/Gaffer/Process.cpp index 51a98d249d9..5f5fb3a85c5 100644 --- a/src/Gaffer/Process.cpp +++ b/src/Gaffer/Process.cpp @@ -71,15 +71,53 @@ std::string prefixedWhat( const IECore::Exception &e ) } // namespace +////////////////////////////////////////////////////////////////////////// +// Collaboration +////////////////////////////////////////////////////////////////////////// + +Process::Collaboration::~Collaboration() noexcept( true ) +{ +} + +bool Process::Collaboration::dependsOn( const Collaboration *collaboration ) const +{ + if( collaboration == this ) + { + return true; + } + + std::unordered_set visited; + std::deque toVisit( { collaboration } ); + + while( !toVisit.empty() ) + { + const Collaboration *c = toVisit.front(); + toVisit.pop_front(); + if( !visited.insert( c ).second ) + { + continue; + } + if( c->dependents.count( this ) ) + { + return true; + } + toVisit.insert( toVisit.end(), c->dependents.begin(), c->dependents.end() ); + } + + return false; +} + +tbb::spin_mutex Process::Collaboration::g_dependentsMutex; + ////////////////////////////////////////////////////////////////////////// // Process ////////////////////////////////////////////////////////////////////////// Process::Process( const IECore::InternedString &type, const Plug *plug, const Plug *destinationPlug ) - : m_type( type ), m_plug( plug ), m_destinationPlug( destinationPlug ? destinationPlug : plug ) + : m_type( type ), m_plug( plug ), m_destinationPlug( destinationPlug ? destinationPlug : plug ), + m_parent( m_threadState->m_process ), m_collaboration( m_parent ? m_parent->m_collaboration : nullptr ) { IECore::Canceller::check( context()->canceller() ); - m_parent = m_threadState->m_process; m_threadState->m_process = this; for( const auto &m : *m_threadState->m_monitors ) diff --git a/src/Gaffer/ValuePlug.cpp b/src/Gaffer/ValuePlug.cpp index 3272c17c587..ad2087c0693 100644 --- a/src/Gaffer/ValuePlug.cpp +++ b/src/Gaffer/ValuePlug.cpp @@ -52,6 +52,7 @@ #include "fmt/format.h" #include +#include using namespace Gaffer; @@ -139,41 +140,11 @@ size_t hash_value( const HashCacheKey &key ) return result; } -// Derives from HashCacheKey, adding on everything we need to -// construct a HashProcess. We access our caches via this augmented -// key so that we have all the information we need in our getter -// functions. -// -// Note the requirements the LRUCache places on a GetterKey like this : -// "it must be implicitly castable to Key, and all GetterKeys -// which yield the same Key must also yield the same results -// from the GetterFunction". We meet these requirements as follows : -// -// - `computeNode` and `cachePolicy` are properties of the plug which -// is included in the HashCacheKey. We store them explicitly only -// for convenience and performance. -// - `context` is represented in HashCacheKey via `contextHash`. -// - `destinationPlug` does not influence the results of the computation -// in any way. It is merely used for error reporting. -struct HashProcessKey : public HashCacheKey +// `tbb_hasher` is a requirement of `Process::acquireCollaborativeResult()`, +// which uses `tbb::concurrent_hash_map` internally to manage collaborations. +size_t tbb_hasher( const HashCacheKey &key ) { - HashProcessKey( const ValuePlug *plug, const ValuePlug *destinationPlug, const Context *context, uint64_t dirtyCount, const ComputeNode *computeNode, ValuePlug::CachePolicy cachePolicy ) - : HashCacheKey( plug, context, dirtyCount ), - destinationPlug( destinationPlug ), - computeNode( computeNode ), - cachePolicy( cachePolicy ) - { - } - - const ValuePlug *destinationPlug; - const ComputeNode *computeNode; - const ValuePlug::CachePolicy cachePolicy; -}; - -// Avoids LRUCache overhead for non-collaborative policies. -bool spawnsTasks( const HashProcessKey &key ) -{ - return key.cachePolicy == ValuePlug::CachePolicy::TaskCollaboration; + return hash_value( key ); } ValuePlug::HashCacheMode defaultHashCacheMode() @@ -210,6 +181,8 @@ class ValuePlug::HashProcess : public Process public : + // Interface used by ValuePlug. + static IECore::MurmurHash hash( const ValuePlug *plug ) { const ValuePlug *p = sourcePlug( plug ); @@ -244,83 +217,114 @@ class ValuePlug::HashProcess : public Process const ComputeNode *computeNode = IECore::runTimeCast( p->node() ); const ThreadState &threadState = ThreadState::current(); const Context *currentContext = threadState.context(); - const HashProcessKey processKey( p, plug, currentContext, p->m_dirtyCount, computeNode, computeNode ? computeNode->hashCachePolicy( p ) : CachePolicy::Uncached ); + const CachePolicy cachePolicy = computeNode ? computeNode->hashCachePolicy( p ) : CachePolicy::Uncached; - if( processKey.cachePolicy == CachePolicy::Uncached ) + if( cachePolicy == CachePolicy::Uncached ) { - HashProcess process( processKey ); - return process.m_result; + return HashProcess( p, plug, computeNode ).run(); } - else if( Process::forceMonitoring( threadState, plug, ValuePlug::HashProcess::staticType ) ) + + // Perform any pending adjustments to our thread-local cache. + + ThreadData &threadData = g_threadData.local(); + if( threadData.clearCache.load( std::memory_order_acquire ) ) { - HashProcess process( processKey ); - auto costFunction = [] ( const IECore::MurmurHash &key ) { return 1; }; - if( - processKey.cachePolicy == CachePolicy::TaskCollaboration || - processKey.cachePolicy == CachePolicy::TaskIsolation - ) - { - g_globalCache.setIfUncached( processKey, process.m_result, costFunction ); - } - else - { - ThreadData &threadData = g_threadData.local(); - threadData.cache.setIfUncached( processKey, process.m_result, costFunction ); - } - return process.m_result; + threadData.cache.clear(); + threadData.clearCache.store( 0, std::memory_order_release ); } - else + + if( threadData.cache.getMaxCost() != g_cacheSizeLimit ) { - // Perform any pending adjustments to our thread-local cache. + threadData.cache.setMaxCost( g_cacheSizeLimit ); + } + + // Then get our hash. We do this using this `acquireHash()` functor so that + // we can repeat the process for `Checked` mode. - ThreadData &threadData = g_threadData.local(); - if( threadData.clearCache.load( std::memory_order_acquire ) ) + const bool forceMonitoring = Process::forceMonitoring( threadState, plug, staticType ); + + auto acquireHash = [&]( const HashCacheKey &cacheKey ) { + + // Before we use this key, check that the dirty count hasn't maxed out + if( + cacheKey.dirtyCount == DIRTY_COUNT_RANGE_MAX || + cacheKey.dirtyCount == DIRTY_COUNT_RANGE_MAX + 1 + DIRTY_COUNT_RANGE_MAX + ) { - threadData.cache.clear(); - threadData.clearCache.store( 0, std::memory_order_release ); + throw IECore::Exception( "Dirty count exceeded max. Either you've left Gaffer running for 100 million years, or a strange bug is incrementing dirty counts way too fast." ); } - if( threadData.cache.getMaxCost() != g_cacheSizeLimit ) + // Check for an already-cached value in our thread-local cache, and return it if we have one. + if( !forceMonitoring ) { - threadData.cache.setMaxCost( g_cacheSizeLimit ); + if( auto result = threadData.cache.getIfCached( cacheKey ) ) + { + return *result; + } } - // And then look up the result in our cache. - if( g_hashCacheMode == HashCacheMode::Standard ) + // No value in local cache, so either compute it directly or get it via + // the global cache if it's expensive enough to warrant collaboration. + IECore::MurmurHash result; + if( cachePolicy == CachePolicy::Default || cachePolicy == CachePolicy::Standard ) { - return threadData.cache.get( processKey, currentContext->canceller() ); + result = HashProcess( p, plug, computeNode ).run(); } - else if( g_hashCacheMode == HashCacheMode::Checked ) + else { - HashProcessKey legacyProcessKey( processKey ); - legacyProcessKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; - - const IECore::MurmurHash check = threadData.cache.get( legacyProcessKey, currentContext->canceller() ); - const IECore::MurmurHash result = threadData.cache.get( processKey, currentContext->canceller() ); - - if( result != check ) + std::optional cachedValue; + if( !forceMonitoring ) + { + cachedValue = g_cache.getIfCached( cacheKey ); + } + if( cachedValue ) { - // This isn't exactly a process exception, but we want to treat it the same, in - // terms of associating it with a plug. Creating a ProcessException is the simplest - // approach, which can be done by throwing and then immediately wrapping - try - { - throw IECore::Exception( "Detected undeclared dependency. Fix DependencyNode::affects() implementation." ); - } - catch( ... ) - { - ProcessException::wrapCurrentException( processKey.plug, currentContext, staticType ); - } + result = *cachedValue; + } + else + { + result = Process::acquireCollaborativeResult( cacheKey, p, plug, computeNode ); } - return result; } - else + // Update local cache and return result + threadData.cache.setIfUncached( cacheKey, result, cacheCostFunction ); + return result; + }; + + const HashCacheKey cacheKey( p, currentContext, p->m_dirtyCount ); + if( g_hashCacheMode == HashCacheMode::Standard ) + { + return acquireHash( cacheKey ); + } + else if( g_hashCacheMode == HashCacheMode::Checked ) + { + HashCacheKey legacyCacheKey( cacheKey ); + legacyCacheKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; + const IECore::MurmurHash check = acquireHash( legacyCacheKey ); + const IECore::MurmurHash result = acquireHash( cacheKey ); + + if( result != check ) { - // HashCacheMode::Legacy - HashProcessKey legacyProcessKey( processKey ); - legacyProcessKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; - return threadData.cache.get( legacyProcessKey, currentContext->canceller() ); + // This isn't exactly a process exception, but we want to treat it the same, in + // terms of associating it with a plug. Creating a ProcessException is the simplest + // approach, which can be done by throwing and then immediately wrapping. + try + { + throw IECore::Exception( "Detected undeclared dependency. Fix DependencyNode::affects() implementation." ); + } + catch( ... ) + { + ProcessException::wrapCurrentException( p, currentContext, staticType ); + } } + return result; + } + else + { + // HashCacheMode::Legacy + HashCacheKey legacyCacheKey( cacheKey ); + legacyCacheKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; + return acquireHash( legacyCacheKey ); } } @@ -332,12 +336,12 @@ class ValuePlug::HashProcess : public Process static void setCacheSizeLimit( size_t maxEntriesPerThread ) { g_cacheSizeLimit = maxEntriesPerThread; - g_globalCache.setMaxCost( g_cacheSizeLimit ); + g_cache.setMaxCost( g_cacheSizeLimit ); } static void clearCache( bool now = false ) { - g_globalCache.clear(); + g_cache.clear(); // It's not documented explicitly, but it is safe to iterate over an // `enumerable_thread_specific` while `local()` is being called on // other threads, because the underlying container is a @@ -364,7 +368,7 @@ class ValuePlug::HashProcess : public Process static size_t totalCacheUsage() { - size_t usage = g_globalCache.currentCost(); + size_t usage = g_cache.currentCost(); tbb::enumerable_thread_specific::iterator it, eIt; for( it = g_threadData.begin(), eIt = g_threadData.end(); it != eIt; ++it ) { @@ -399,32 +403,33 @@ class ValuePlug::HashProcess : public Process static const IECore::InternedString staticType; - private : + // Interface required by `Process::acquireCollaborativeResult()`. - HashProcess( const HashProcessKey &key ) - : Process( staticType, key.plug, key.destinationPlug ) + HashProcess( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode ) + : Process( staticType, plug, destinationPlug ), m_computeNode( computeNode ) + { + } + + using ResultType = IECore::MurmurHash; + + ResultType run() const { try { - if( !key.computeNode ) + if( !m_computeNode ) { throw IECore::Exception( "Plug has no ComputeNode." ); } - // Before we use this key, check that the dirty count hasn't maxed out - if( - key.dirtyCount == DIRTY_COUNT_RANGE_MAX || - key.dirtyCount == DIRTY_COUNT_RANGE_MAX + 1 + DIRTY_COUNT_RANGE_MAX ) - { - throw IECore::Exception( "Dirty count exceeded max. Either you've left Gaffer running for 100 million years, or a strange bug is incrementing dirty counts way too fast." ); - } - - key.computeNode->hash( key.plug, context(), m_result ); + IECore::MurmurHash result; + m_computeNode->hash( static_cast( plug() ), context(), result ); - if( m_result == g_nullHash ) + if( result == g_nullHash ) { throw IECore::Exception( "ComputeNode::hash() not implemented." ); } + + return result; } catch( ... ) { @@ -432,76 +437,27 @@ class ValuePlug::HashProcess : public Process } } - static IECore::MurmurHash globalCacheGetter( const HashProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) - { - // Canceller will be passed to `ComputeNode::hash()` implicitly - // via the context. - assert( canceller == Context::current()->canceller() ); - cost = 1; - IECore::MurmurHash result; - switch( key.cachePolicy ) - { - case CachePolicy::TaskCollaboration : - { - HashProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskIsolation : - { - tbb::this_task_arena::isolate( - [&result, &key] { - HashProcess process( key ); - result = process.m_result; - } - ); - break; - } - default : - // Cache policy not valid for global cache. - assert( false ); - break; - } - - return result; - } + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; - static IECore::MurmurHash localCacheGetter( const HashProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) + static size_t cacheCostFunction( const IECore::MurmurHash &value ) { - assert( canceller == Context::current()->canceller() ); - cost = 1; - switch( key.cachePolicy ) - { - case CachePolicy::TaskCollaboration : - case CachePolicy::TaskIsolation : - return g_globalCache.get( key, canceller ); - default : - { - assert( key.cachePolicy != CachePolicy::Uncached ); - HashProcess process( key ); - return process.m_result; - } - } + return 1; } - // Global cache. We use this for heavy hash computations that will spawn subtasks, - // so that the work and the result is shared among all threads. - using GlobalCache = IECorePreview::LRUCache; - static GlobalCache g_globalCache; - static std::atomic g_legacyGlobalDirtyCount; + private : + const ComputeNode *m_computeNode; + static std::atomic g_legacyGlobalDirtyCount; static HashCacheMode g_hashCacheMode; - // Per-thread cache. This is our default cache, used for hash computations that are - // presumed to be lightweight. Using a per-thread cache limits the contention among - // threads. - using Cache = IECorePreview::LRUCache; - struct ThreadData { - ThreadData() : cache( localCacheGetter, g_cacheSizeLimit, Cache::RemovalCallback(), /* cacheErrors = */ false ), clearCache( 0 ) {} - Cache cache; + // Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. + ThreadData() : cache( CacheType::GetterFunction(), g_cacheSizeLimit, CacheType::RemovalCallback(), /* cacheErrors = */ false ), clearCache( 0 ) {} + using CacheType = IECorePreview::LRUCache; + CacheType cache; // Flag to request that hashCache be cleared. std::atomic_int clearCache; }; @@ -509,15 +465,14 @@ class ValuePlug::HashProcess : public Process static tbb::enumerable_thread_specific, tbb::ets_key_per_instance > g_threadData; static std::atomic_size_t g_cacheSizeLimit; - IECore::MurmurHash m_result; - }; const IECore::InternedString ValuePlug::HashProcess::staticType( ValuePlug::hashProcessType() ); tbb::enumerable_thread_specific, tbb::ets_key_per_instance > ValuePlug::HashProcess::g_threadData; // Default limit corresponds to a cost of roughly 25Mb per thread. std::atomic_size_t ValuePlug::HashProcess::g_cacheSizeLimit( 128000 ); -ValuePlug::HashProcess::GlobalCache ValuePlug::HashProcess::g_globalCache( globalCacheGetter, g_cacheSizeLimit, Cache::RemovalCallback(), /* cacheErrors = */ false ); +// Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. +ValuePlug::HashProcess::CacheType ValuePlug::HashProcess::g_cache( CacheType::GetterFunction(), g_cacheSizeLimit, CacheType::RemovalCallback(), /* cacheErrors = */ false ); std::atomic ValuePlug::HashProcess::g_legacyGlobalDirtyCount( 0 ); ValuePlug::HashCacheMode ValuePlug::HashProcess::g_hashCacheMode( defaultHashCacheMode() ); @@ -526,62 +481,13 @@ ValuePlug::HashCacheMode ValuePlug::HashProcess::g_hashCacheMode( defaultHashCac // and storing a cache of recently computed results. ////////////////////////////////////////////////////////////////////////// -namespace -{ - -// Contains everything needed to create a ComputeProcess. We access our -// cache via this key so that we have all the information we need in our getter -// function. -struct ComputeProcessKey -{ - ComputeProcessKey( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode, ValuePlug::CachePolicy cachePolicy, const IECore::MurmurHash *precomputedHash ) - : plug( plug ), - destinationPlug( destinationPlug ), - computeNode( computeNode ), - cachePolicy( cachePolicy ), - m_hash( precomputedHash ? *precomputedHash : IECore::MurmurHash() ) - { - } - - const ValuePlug *plug; - const ValuePlug *destinationPlug; - const ComputeNode *computeNode; - const ValuePlug::CachePolicy cachePolicy; - - operator const IECore::MurmurHash &() const - { - if( m_hash == g_nullHash ) - { - // Note : We call `plug->ValuePlug::hash()` rather than - // `plug->hash()` because we only want to represent the result of - // the private `getValueInternal()` method. Overrides such as - // `StringPlug::hash()` account for additional processing (such as - // substitutions) performed in public `getValue()` methods _after_ - // calling `getValueInternal()`. - m_hash = plug->ValuePlug::hash(); - } - return m_hash; - } - - private : - - mutable IECore::MurmurHash m_hash; - -}; - -// Avoids LRUCache overhead for non-collaborative policies. -bool spawnsTasks( const ComputeProcessKey &key ) -{ - return key.cachePolicy == ValuePlug::CachePolicy::TaskCollaboration; -} - -} // namespace - class ValuePlug::ComputeProcess : public Process { public : + // Interface used by ValuePlug. + static size_t getCacheMemoryLimit() { return g_cache.getMaxCost(); @@ -622,75 +528,84 @@ class ValuePlug::ComputeProcess : public Process } // A plug with an input connection or an output plug on a ComputeNode. There can be many values - - // one per context, computed via ComputeNode::compute(). Pull the value out of our cache, or compute - // it with a ComputeProcess. + // one per context, computed via `ComputeNode::compute()` or `Plug::setFrom()`. Determine our + // cache policy for the result. assert( (plug->getInput() && !computeNode) || computeNode ); - const ThreadState &threadState = ThreadState::current(); - const Context *currentContext = threadState.context(); - CachePolicy cachePolicy = CachePolicy::Uncached; if( p->getInput() ) { // Type conversion will be implemented by `setFrom()`. // \todo Determine if caching is actually worthwhile for this. - cachePolicy = CachePolicy::Legacy; + cachePolicy = CachePolicy::Default; } else if( computeNode ) { + // We'll be calling `compute()`. cachePolicy = computeNode->computeCachePolicy( p ); } - const ComputeProcessKey processKey( p, plug, computeNode, cachePolicy, precomputedHash ); + // If caching is off then its just a case of using a ComputeProcess + // to do the work. - if( processKey.cachePolicy == CachePolicy::Uncached ) - { - return ComputeProcess( processKey ).m_result; - } - else if( Process::forceMonitoring( threadState, plug, ValuePlug::ComputeProcess::staticType ) ) + if( cachePolicy == CachePolicy::Uncached ) { - ComputeProcess process( processKey ); - g_cache.setIfUncached( - processKey, process.m_result, - []( const IECore::ConstObjectPtr &v ) { return v->memoryUsage(); } - ); - return process.m_result; + return ComputeProcess( p, plug, computeNode ).run(); } - else if( processKey.cachePolicy == CachePolicy::Legacy ) + + const ThreadState &threadState = ThreadState::current(); + + // If caching is on, then we first check for an already-cached value + // and return it if we have one. The cache is accessed via the + // plug's hash. + // + // > Note : We call `plug->ValuePlug::hash()` rather than + // > `plug->hash()` because we only want to represent the result of + // > the private `getValueInternal()` method. Overrides such as + // > `StringPlug::hash()` account for additional processing (such as + // > substitutions) performed in public `getValue()` methods _after_ + // > calling `getValueInternal()`. + const IECore::MurmurHash hash = p->ValuePlug::hash(); + + if( !Process::forceMonitoring( threadState, plug, staticType ) ) { - // Legacy code path, necessary until all task-spawning computes - // have declared an appropriate cache policy. We can't perform - // the compute inside `cacheGetter()` because that is called - // from inside a lock. If tasks were spawned without being - // isolated, TBB could steal an outer task which tries to get - // the same item from the cache, leading to deadlock. - if( auto result = g_cache.getIfCached( processKey ) ) + if( auto result = g_cache.getIfCached( hash ) ) { // Move avoids unnecessary additional addRef/removeRef. return std::move( *result ); } - ComputeProcess process( processKey ); - // Store the value in the cache, but only if it isn't there - // already. The check is useful because it's common for an - // upstream compute triggered by us to have already done the - // work, and calling `memoryUsage()` can be very expensive for - // some datatypes. A prime example of this is the attribute - // state passed around in GafferScene - it's common for a - // selective filter to mean that the attribute compute is - // implemented as a pass-through (thus an upstream node will - // already have computed the same result) and the attribute data - // itself consists of many small objects for which computing - // memory usage is slow. - g_cache.setIfUncached( - processKey, process.m_result, - []( const IECore::ConstObjectPtr &v ) { return v->memoryUsage(); } - ); - return process.m_result; + } + + // The value isn't in the cache, so we'll need to compute it, + // taking account of the cache policy. + + if( cachePolicy == CachePolicy::Default ) + { + // Do the compute ourselves, without worrying if the same + // compute is in flight elsewhere. We assume the compute is + // lightweight enough and unlikely enough to be shared that in + // the worst case it's OK to do it redundantly on a few threads + // before it gets cached. + IECore::ConstObjectPtr result = ComputeProcess( p, plug, computeNode ).run(); + // Store the value in the cache, but only if it isn't there already. + // The check is useful because it's common for an upstream compute + // triggered by us to have already done the work, and calling + // `memoryUsage()` can be very expensive for some datatypes. A prime + // example of this is the attribute state passed around in + // GafferScene - it's common for a selective filter to mean that the + // attribute compute is implemented as a pass-through (thus an + // upstream node will already have computed the same result) and the + // attribute data itself consists of many small objects for which + // computing memory usage is slow. + g_cache.setIfUncached( hash, result, cacheCostFunction ); + return result; } else { - return g_cache.get( processKey, currentContext->canceller() ); + return acquireCollaborativeResult( + hash, p, plug, computeNode + ); } } @@ -713,27 +628,33 @@ class ValuePlug::ComputeProcess : public Process static const IECore::InternedString staticType; - private : + // Interface required by `Process::acquireCollaborativeResult()`. + + ComputeProcess( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode ) + : Process( staticType, plug, destinationPlug ), m_computeNode( computeNode ) + { + } - ComputeProcess( const ComputeProcessKey &key ) - : Process( staticType, key.plug, key.destinationPlug ) + IECore::ConstObjectPtr run() const { try { - if( const ValuePlug *input = key.plug->getInput() ) + // Cast is safe because our constructor takes ValuePlugs. + const ValuePlug *valuePlug = static_cast( plug() ); + if( const ValuePlug *input = valuePlug->getInput() ) { // Cast is ok, because we know that the resulting setValue() call won't // actually modify the plug, but will just place the value in our m_result. - const_cast( key.plug )->setFrom( input ); + const_cast( valuePlug )->setFrom( input ); } else { - if( !key.computeNode ) + if( !m_computeNode ) { throw IECore::Exception( "Plug has no ComputeNode." ); } // Cast is ok - see comment above. - key.computeNode->compute( const_cast( key.plug ), context() ); + m_computeNode->compute( const_cast( valuePlug ), context() ); } // The calls above should cause setValue() to be called on the result plug, which in // turn will call ValuePlug::setObjectValue(), which will then store the result in @@ -743,6 +664,7 @@ class ValuePlug::ComputeProcess : public Process { throw IECore::Exception( "Compute did not set plug value." ); } + return m_result; } catch( ... ) { @@ -750,58 +672,25 @@ class ValuePlug::ComputeProcess : public Process } } - static IECore::ConstObjectPtr cacheGetter( const ComputeProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) - { - // Canceller will be passed to `ComputeNode::hash()` implicitly - // via the context. - assert( canceller == Context::current()->canceller() ); - IECore::ConstObjectPtr result; - switch( key.cachePolicy ) - { - case CachePolicy::Standard : - { - ComputeProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskCollaboration : - { - ComputeProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskIsolation : - { - tbb::this_task_arena::isolate( - [&result, &key] { - ComputeProcess process( key ); - result = process.m_result; - } - ); - break; - } - default : - // Should not get here, because these cases are - // dealt with directly in `ComputeProcess::value()`. - assert( false ); - break; - } + using ResultType = IECore::ConstObjectPtr; + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; - cost = result->memoryUsage(); - return result; + static size_t cacheCostFunction( const IECore::ConstObjectPtr &v ) + { + return v->memoryUsage(); } - // A cache mapping from ValuePlug::hash() to the result of the previous computation - // for that hash. This allows us to cache results for faster repeat evaluation - using Cache = IECorePreview::LRUCache; - static Cache g_cache; + private : + const ComputeNode *m_computeNode; IECore::ConstObjectPtr m_result; }; const IECore::InternedString ValuePlug::ComputeProcess::staticType( ValuePlug::computeProcessType() ); -ValuePlug::ComputeProcess::Cache ValuePlug::ComputeProcess::g_cache( cacheGetter, 1024 * 1024 * 1024 * 1, ValuePlug::ComputeProcess::Cache::RemovalCallback(), /* cacheErrors = */ false ); // 1 gig +// Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. +ValuePlug::ComputeProcess::CacheType ValuePlug::ComputeProcess::g_cache( CacheType::GetterFunction(), 1024 * 1024 * 1024 * 1, CacheType::RemovalCallback(), /* cacheErrors = */ false ); // 1 gig ////////////////////////////////////////////////////////////////////////// // SetValueAction implementation diff --git a/src/GafferModule/ExpressionBinding.cpp b/src/GafferModule/ExpressionBinding.cpp index 6ae88c7457f..4da38fa1173 100644 --- a/src/GafferModule/ExpressionBinding.cpp +++ b/src/GafferModule/ExpressionBinding.cpp @@ -129,9 +129,9 @@ ValuePlug::CachePolicy defaultExecuteCachePolicy() { return ValuePlug::CachePolicy::TaskIsolation; } - else if( !strcmp( cp, "Legacy" ) ) + else if( !strcmp( cp, "Legacy" ) || !strcmp( cp, "Default" ) ) { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } else { diff --git a/src/GafferModule/ValuePlugBinding.cpp b/src/GafferModule/ValuePlugBinding.cpp index f3fdef36bea..ef8d4af0847 100644 --- a/src/GafferModule/ValuePlugBinding.cpp +++ b/src/GafferModule/ValuePlugBinding.cpp @@ -160,6 +160,7 @@ void GafferModule::bindValuePlug() .value( "Standard", ValuePlug::CachePolicy::Standard ) .value( "TaskCollaboration", ValuePlug::CachePolicy::TaskCollaboration ) .value( "TaskIsolation", ValuePlug::CachePolicy::TaskIsolation ) + .value( "Default", ValuePlug::CachePolicy::Default ) .value( "Legacy", ValuePlug::CachePolicy::Legacy ) ; diff --git a/src/GafferOSL/OSLExpressionEngine.cpp b/src/GafferOSL/OSLExpressionEngine.cpp index 597b7ad87c6..5d2fc17ee94 100644 --- a/src/GafferOSL/OSLExpressionEngine.cpp +++ b/src/GafferOSL/OSLExpressionEngine.cpp @@ -475,7 +475,7 @@ class OSLExpressionEngine : public Gaffer::Expression::Engine ValuePlug::CachePolicy executeCachePolicy() const override { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } void apply( Gaffer::ValuePlug *proxyOutput, const Gaffer::ValuePlug *topLevelProxyOutput, const IECore::Object *value ) const override diff --git a/src/GafferScene/ObjectProcessor.cpp b/src/GafferScene/ObjectProcessor.cpp index 120cabd2aee..568a55ca020 100644 --- a/src/GafferScene/ObjectProcessor.cpp +++ b/src/GafferScene/ObjectProcessor.cpp @@ -158,7 +158,7 @@ void ObjectProcessor::hashProcessedObject( const ScenePath &path, const Gaffer:: Gaffer::ValuePlug::CachePolicy ObjectProcessor::processedObjectComputeCachePolicy() const { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } void ObjectProcessor::hashObject( const ScenePath &path, const Gaffer::Context *context, const ScenePlug *parent, IECore::MurmurHash &h ) const diff --git a/src/GafferScene/SceneReader.cpp b/src/GafferScene/SceneReader.cpp index 1c2110a3a02..a52fe482134 100644 --- a/src/GafferScene/SceneReader.cpp +++ b/src/GafferScene/SceneReader.cpp @@ -135,9 +135,9 @@ ValuePlug::CachePolicy cachePolicyFromEnv( const char *name ) { return ValuePlug::CachePolicy::TaskIsolation; } - else if( !strcmp( cp, "Legacy" ) ) + else if( !strcmp( cp, "Legacy" ) || !strcmp( cp, "Default" ) ) { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } else { @@ -148,7 +148,7 @@ ValuePlug::CachePolicy cachePolicyFromEnv( const char *name ) } } - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } const ValuePlug::CachePolicy g_objectCachePolicy = cachePolicyFromEnv( "GAFFERSCENE_SCENEREADER_OBJECT_CACHEPOLICY" ); diff --git a/src/GafferTestModule/GafferTestModule.cpp b/src/GafferTestModule/GafferTestModule.cpp index 24c54280e4c..46f1d1d04b3 100644 --- a/src/GafferTestModule/GafferTestModule.cpp +++ b/src/GafferTestModule/GafferTestModule.cpp @@ -50,6 +50,7 @@ #include "ValuePlugTest.h" #include "MessagesTest.h" #include "MetadataTest.h" +#include "ProcessTest.h" #include "SignalsTest.h" #include "IECorePython/ScopedGILRelease.h" @@ -116,6 +117,7 @@ BOOST_PYTHON_MODULE( _GafferTest ) bindValuePlugTest(); bindMessagesTest(); bindSignalsTest(); + bindProcessTest(); object module( borrowed( PyImport_AddModule( "GafferTest._MetadataTest" ) ) ); scope().attr( "_MetadataTest" ) = module; diff --git a/src/GafferTestModule/LRUCacheTest.cpp b/src/GafferTestModule/LRUCacheTest.cpp index 85713d9571e..46f6a52268a 100644 --- a/src/GafferTestModule/LRUCacheTest.cpp +++ b/src/GafferTestModule/LRUCacheTest.cpp @@ -308,53 +308,6 @@ void testLRUCacheRecursion( const std::string &policy, int numIterations, size_t DispatchTest()( policy, numIterations, numValues, maxCost ); } -template class Policy> -struct TestLRUCacheRecursionOnOneItem -{ - - void operator()() - { - using Cache = LRUCache; - using CachePtr = std::unique_ptr; - int recursionDepth = 0; - - CachePtr cache; - cache.reset( - new Cache( - // Getter that calls back into the cache with the _same_ - // key, up to a certain limit, and then actually returns - // a value. This is basically insane, but it models - // situations that can occur in Gaffer. - [&cache, &recursionDepth]( int key, size_t &cost, const IECore::Canceller *canceller ) { - cost = 1; - if( ++recursionDepth == 100 ) - { - return key; - } - else - { - return cache->get( key ); - } - }, - // Max cost is small enough that we'll be trying to evict - // keys while unwinding the recursion. - 20 - ) - ); - - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 0 ); - GAFFERTEST_ASSERTEQUAL( cache->get( 1 ), 1 ); - GAFFERTEST_ASSERTEQUAL( recursionDepth, 100 ); - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 1 ); - } - -}; - -void testLRUCacheRecursionOnOneItem( const std::string &policy ) -{ - DispatchTest()( policy ); -} - template class Policy> struct TestLRUCacheClearFromGet { @@ -847,44 +800,6 @@ void testLRUCacheSetIfUncached( const std::string &policy ) DispatchTest()( policy ); } -template class Policy> -struct TestLRUCacheSetIfUncachedRecursion -{ - - void operator()() - { - using Cache = LRUCache; - using CachePtr = std::unique_ptr; - - CachePtr cache; - cache.reset( - new Cache( - // Getter that calls `setIfUncached()` with the _same_ key. This - // is basically insane, but it models situations that can occur - // in Gaffer. - [&cache]( int key, size_t &cost, const IECore::Canceller *canceller ) { - cost = 1; - // We expect the call to fail, because the lock is held by the - // outer call to `get()`. - GAFFERTEST_ASSERT( !cache->setIfUncached( key, key, []( int ) { return 1; } ) ); - return key; - }, - 1000 - ) - ); - - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 0 ); - GAFFERTEST_ASSERTEQUAL( cache->get( 1 ), 1 ); - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 1 ); - } - -}; - -void testLRUCacheSetIfUncachedRecursion( const std::string &policy ) -{ - DispatchTest()( policy ); -} - } // namespace void GafferTestModule::bindLRUCacheTest() @@ -893,7 +808,6 @@ void GafferTestModule::bindLRUCacheTest() def( "testLRUCacheRemovalCallback", &testLRUCacheRemovalCallback ); def( "testLRUCacheContentionForOneItem", &testLRUCacheContentionForOneItem, arg( "withCanceller" ) = false ); def( "testLRUCacheRecursion", &testLRUCacheRecursion, ( arg( "numIterations" ), arg( "numValues" ), arg( "maxCost" ) ) ); - def( "testLRUCacheRecursionOnOneItem", &testLRUCacheRecursionOnOneItem ); def( "testLRUCacheClearFromGet", &testLRUCacheClearFromGet ); def( "testLRUCacheExceptions", &testLRUCacheExceptions ); def( "testLRUCacheCancellation", &testLRUCacheCancellation ); @@ -901,5 +815,4 @@ void GafferTestModule::bindLRUCacheTest() def( "testLRUCacheUncacheableItem", &testLRUCacheUncacheableItem ); def( "testLRUCacheGetIfCached", &testLRUCacheGetIfCached ); def( "testLRUCacheSetIfUncached", &testLRUCacheSetIfUncached ); - def( "testLRUCacheSetIfUncachedRecursion", &testLRUCacheSetIfUncachedRecursion ); } diff --git a/src/GafferTestModule/ProcessTest.cpp b/src/GafferTestModule/ProcessTest.cpp new file mode 100644 index 00000000000..0af1176aa02 --- /dev/null +++ b/src/GafferTestModule/ProcessTest.cpp @@ -0,0 +1,199 @@ +////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials provided with +// the distribution. +// +// * Neither the name of John Haddon nor the names of +// any other contributors to this software may be used to endorse or +// promote products derived from this software without specific prior +// written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +////////////////////////////////////////////////////////////////////////// + +#include "boost/python.hpp" + +#include "ProcessTest.h" + +#include "GafferTest/Assert.h" + +#include "Gaffer/Context.h" +#include "Gaffer/NumericPlug.h" +#include "Gaffer/TypedObjectPlug.h" +#include "Gaffer/Private/IECorePreview/LRUCache.h" +#include "Gaffer/Process.h" + +#include "IECorePython/ScopedGILRelease.h" + +#include "tbb/parallel_for_each.h" + +using namespace boost::python; +using namespace IECore; +using namespace Gaffer; + +namespace +{ + +struct Dependencies : public IECore::RefCounted +{ + IE_CORE_DECLAREMEMBERPTR( Dependencies ); + using Map = std::map; + Map map; +}; + +// Test Process +// ============ +// +// This Process subclass is used primarily to test the collaboration mechanism +// provided by `Process::acquireCollaborativeResult()`. The result is an integer +// which is given to the TestProcess directly, and which also provides the cache +// key. The upstream dependencies are also given verbatim to TestProcess as a +// nested dictionary of integers mapping from the result for each dependency to +// the dictionary for _its_ upstream dependencies. Non-negative results are +// computed using `acquireCollaborativeResult()` and negative results are +// computed by constructing a TestProcess directly. This mechanism lets us +// create a variety of process graphs very explicitly from ProcessTestCase. +class TestProcess : public Process +{ + + public : + + TestProcess( const Plug *plug, int result, const Dependencies::ConstPtr &dependencies ) + : Process( g_staticType, plug, plug ), m_result( result ), m_dependencies( dependencies ) + { + } + + using ResultType = int; + + ResultType run() const + { + const ThreadState &threadState = ThreadState::current(); + tbb::task_group_context taskGroupContext( tbb::task_group_context::isolated ); + + tbb::parallel_for_each( + + m_dependencies->map.begin(), m_dependencies->map.end(), + + [&] ( const Dependencies::Map::value_type &dependency ) { + + ThreadState::Scope scope( threadState ); + const int expectedResult = dependency.first; + + const Plug *p = plug(); + if( const Plug *input = p->getInput() ) + { + // Compute the dependencies using the plug's input if it + // has one, otherwise using this plug. The only reason + // for using an input is get more fine-grained information + // from the Monitors used in the unit tests (because they + // capture statistics per plug). + p = input; + } + + int actualResult; + if( expectedResult >= 0 ) + { + actualResult = Process::acquireCollaborativeResult( expectedResult, p, expectedResult, dependency.second ); + } + else + { + actualResult = TestProcess( p, expectedResult, dependency.second ).run(); + } + GAFFERTEST_ASSERTEQUAL( actualResult, expectedResult ); + + }, + + taskGroupContext + + ); + + return m_result; + } + + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; + + static size_t cacheCostFunction( int value ) + { + return 1; + } + + private : + + const int m_result; + const Dependencies::ConstPtr m_dependencies; + + static const IECore::InternedString g_staticType; + +}; + +TestProcess::CacheType TestProcess::g_cache( TestProcess::CacheType::GetterFunction(), 100000 ); +// Spoof type so that we can use PerformanceMonitor to check we get the processes we expect in ProcessTest.py. +const IECore::InternedString TestProcess::g_staticType( "computeNode:compute" ); + +Dependencies::ConstPtr dependenciesFromDict( dict dependenciesDict, std::unordered_map &converted ) +{ + auto it = converted.find( dependenciesDict.ptr() ); + if( it != converted.end() ) + { + return it->second; + } + + Dependencies::Ptr result = new Dependencies; + auto items = dependenciesDict.items(); + for( size_t i = 0, l = len( items ); i < l; ++i ) + { + int v = extract( items[i][0] ); + dict d = extract( items[i][1] ); + result->map[v] = dependenciesFromDict( d, converted ); + } + + converted[dependenciesDict.ptr()] = result; + return result; +} + +void runTestProcess( const Plug *plug, int expectedResult, dict dependenciesDict ) +{ + std::unordered_map convertedDependencies; + Dependencies::ConstPtr dependencies = dependenciesFromDict( dependenciesDict, convertedDependencies ); + + Context::EditableScope context( Context::current() ); + int result = TestProcess( plug, expectedResult, dependencies ).run(); + GAFFERTEST_ASSERTEQUAL( result, expectedResult ); +} + +void clearTestProcessCache() +{ + TestProcess::g_cache.clear(); +} + +} // namespace + +void GafferTestModule::bindProcessTest() +{ + def( "runTestProcess", &runTestProcess ); + def( "clearTestProcessCache", &clearTestProcessCache ); +} diff --git a/src/GafferTestModule/ProcessTest.h b/src/GafferTestModule/ProcessTest.h new file mode 100644 index 00000000000..1975e4a1cfc --- /dev/null +++ b/src/GafferTestModule/ProcessTest.h @@ -0,0 +1,44 @@ +////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials provided with +// the distribution. +// +// * Neither the name of John Haddon nor the names of +// any other contributors to this software may be used to endorse or +// promote products derived from this software without specific prior +// written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +////////////////////////////////////////////////////////////////////////// + +#pragma once + +namespace GafferTestModule +{ + +void bindProcessTest(); + +} // namespace GafferTestModule diff --git a/src/GafferTestModule/TaskMutexTest.cpp b/src/GafferTestModule/TaskMutexTest.cpp index e394835f001..756b18f2f3a 100644 --- a/src/GafferTestModule/TaskMutexTest.cpp +++ b/src/GafferTestModule/TaskMutexTest.cpp @@ -74,12 +74,12 @@ void testTaskMutex() TaskMutex::ScopedLock lock( mutex, /* write = */ false ); gotLock.local() = true; - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Read ) + GAFFERTEST_ASSERT( !lock.isWriter() ) if( !initialised ) { lock.upgradeToWriter(); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ); + GAFFERTEST_ASSERT( lock.isWriter() ); if( !initialised ) // Check again, because upgrading to writer may lose the lock temporarily. { @@ -134,7 +134,7 @@ void testTaskMutexWithinIsolate() tbb::this_task_arena::isolate( [&mutex]() { TaskMutex::ScopedLock lock( mutex ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ); std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) ); } ); @@ -175,7 +175,7 @@ void testTaskMutexJoiningOuterTasks() TaskMutex::ScopedLock lock( mutex ); gotLock.local() = true; - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ) if( !initialised ) { @@ -213,7 +213,7 @@ void testTaskMutexJoiningOuterTasks() for( size_t i = r.begin(); i < r.end(); ++i ) { TaskMutex::ScopedLock lock( *independentTasks[i] ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ) lock.execute( [&]() { initialise(); @@ -247,56 +247,13 @@ void testTaskMutexHeavyContention( bool acceptWork ) for( size_t i = r.begin(); i < r.end(); ++i ) { TaskMutex::ScopedLock lock( mutex, /* write = */ false, acceptWork ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Read ); + GAFFERTEST_ASSERT( !lock.isWriter() ); GAFFERTEST_ASSERTEQUAL( initialised, true ); } } ); } -void testTaskMutexWorkerRecursion() -{ - TaskMutex mutex; - tbb::enumerable_thread_specific gotLock; - - std::function recurse; - recurse = [&mutex, &gotLock, &recurse] ( int depth ) { - - TaskMutex::ScopedLock lock; - const bool acquired = lock.acquireOr( - mutex, TaskMutex::ScopedLock::LockType::WorkerRead, - []( bool workAvailable ) { return true; } - ); - - GAFFERTEST_ASSERT( acquired ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::WorkerRead ); - - gotLock.local() = true; - - if( depth > 4 ) - { - std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) ); - } - else - { - tbb::parallel_for( - 0, 4, - [&recurse, depth] ( int i ) { - recurse( depth + 1 ); - } - ); - } - - }; - - TaskMutex::ScopedLock lock( mutex ); - lock.execute( - [&recurse] { recurse( 0 ); } - ); - - GAFFERTEST_ASSERTEQUAL( gotLock.size(), tbb::tbb_thread::hardware_concurrency() ); -} - void testTaskMutexAcquireOr() { TaskMutex mutex; @@ -305,7 +262,7 @@ void testTaskMutexAcquireOr() TaskMutex::ScopedLock lock2; bool workAvailable = true; const bool acquired = lock2.acquireOr( - mutex, TaskMutex::ScopedLock::LockType::Write, + mutex, /* write = */ true, [&workAvailable] ( bool wa ) { workAvailable = wa; return true; } ); @@ -522,7 +479,6 @@ void GafferTestModule::bindTaskMutexTest() def( "testTaskMutexWithinIsolate", &testTaskMutexWithinIsolate ); def( "testTaskMutexJoiningOuterTasks", &testTaskMutexJoiningOuterTasks ); def( "testTaskMutexHeavyContention", &testTaskMutexHeavyContention ); - def( "testTaskMutexWorkerRecursion", &testTaskMutexWorkerRecursion ); def( "testTaskMutexAcquireOr", &testTaskMutexAcquireOr ); def( "testTaskMutexExceptions", &testTaskMutexExceptions ); def( "testTaskMutexWorkerExceptions", &testTaskMutexWorkerExceptions );