From 14a2a1d502ff94612efc9bb091c2808f3afe193f Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 7 Aug 2023 15:45:21 +0100 Subject: [PATCH 01/11] ComputeNodeTest : Test exception handling for all cache policies --- python/GafferTest/ComputeNodeTest.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/python/GafferTest/ComputeNodeTest.py b/python/GafferTest/ComputeNodeTest.py index 943ebd24f91..7a3792f3082 100644 --- a/python/GafferTest/ComputeNodeTest.py +++ b/python/GafferTest/ComputeNodeTest.py @@ -608,6 +608,7 @@ class ThrowingNode( Gaffer.ComputeNode ) : def __init__( self, name="ThrowingNode" ) : self.hashFail = False + self.cachePolicy = Gaffer.ValuePlug.CachePolicy.Standard Gaffer.ComputeNode.__init__( self, name ) @@ -639,11 +640,11 @@ def compute( self, plug, context ) : def hashCachePolicy( self, plug ) : - return Gaffer.ValuePlug.CachePolicy.Standard + return self.cachePolicy def computeCachePolicy( self, plug ) : - return Gaffer.ValuePlug.CachePolicy.Standard + return self.cachePolicy IECore.registerRunTimeTyped( ThrowingNode ) @@ -684,18 +685,25 @@ def testProcessException( self ) : def testProcessExceptionNotShared( self ) : - thrower1 = self.ThrowingNode( "thrower1" ) - thrower2 = self.ThrowingNode( "thrower2" ) + for policy in Gaffer.ValuePlug.CachePolicy.names.values() : + with self.subTest( policy = policy ) : - with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower1.out : [\s\S]*Eeek!' ) as raised : - thrower1["out"].getValue() + Gaffer.ValuePlug.clearCache() - self.assertEqual( raised.exception.plug(), thrower1["out"] ) + thrower1 = self.ThrowingNode( "thrower1" ) + thrower1.cachePolicy = policy + thrower2 = self.ThrowingNode( "thrower2" ) + thrower2.cachePolicy = policy - with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower2.out : [\s\S]*Eeek!' ) as raised : - thrower2["out"].getValue() + with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower1.out : [\s\S]*Eeek!' ) as raised : + thrower1["out"].getValue() + + self.assertEqual( raised.exception.plug(), thrower1["out"] ) + + with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower2.out : [\s\S]*Eeek!' ) as raised : + thrower2["out"].getValue() - self.assertEqual( raised.exception.plug(), thrower2["out"] ) + self.assertEqual( raised.exception.plug(), thrower2["out"] ) def testProcessExceptionRespectsNameChanges( self ) : From be386f02c7ee2b8627add91e89c7a1dfe78057bd Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 23 Aug 2023 17:19:55 +0100 Subject: [PATCH 02/11] Process : Add `acquireCollaborativeResult()` method This will be used to replace the TaskMutex collaboration mechanism currently used via the LRUCaches in ValuePlug. The underlying collaboration mechanism is still a `task_arena/task_group` combo, but this arrangement has a few key benefits : - Moving the collaboration out from the guts of the LRUCache gives us much more control over how it is performed, particularly for collaborating threads. - We are now tracking interprocess dependencies completely ourselves, so we are no longer vulnerable to the deadlocks that TaskMutex couldn't prevent. - We are no longer using `task_arena_observer`, which gives us much greater freedom to change implementation in future. --- Changes.md | 5 + include/Gaffer/Process.h | 27 +++ include/Gaffer/Process.inl | 399 +++++++++++++++++++++++++++++++++++++ src/Gaffer/Process.cpp | 42 +++- 4 files changed, 471 insertions(+), 2 deletions(-) diff --git a/Changes.md b/Changes.md index 55e383e8d38..f3fe86254bd 100644 --- a/Changes.md +++ b/Changes.md @@ -22,6 +22,11 @@ Fixes - Fixed likely cause of crash when resizing Spreadsheet column width (#5296). - Reference : Fixed rare reloading error. +API +--- + +- Process : Added `acquireCollaborativeResult()` method, providing an improved mechanism for multiple threads to collaborate on TBB tasks spawned by a single process they all depend on. + 1.3.5.0 (relative to 1.3.4.0) ======= diff --git a/include/Gaffer/Process.h b/include/Gaffer/Process.h index e69eb11f8b1..629a36b73b9 100644 --- a/include/Gaffer/Process.h +++ b/include/Gaffer/Process.h @@ -101,8 +101,34 @@ class GAFFER_API Process : private ThreadState::Scope /// \todo This just exists for ABI compatibility. Remove it. void handleException(); + /// Searches for an in-flight process and waits for its result, collaborating + /// on any TBB tasks it spawns. If no such process exists, constructs one + /// using `args` and makes it available for collaboration by other threads, + /// publishing the result to a cache when the process completes. + /// + /// Requirements : + /// + /// - `ProcessType( args... )` constructs a process suitable for computing + /// the result for `cacheKey`. + /// - `ProcessType::ResultType` defines the result type for the process. + /// - `ProcessType::run()` does the work for the process and returns the + /// result. + /// - `ProcessType::g_cache` is a static LRUCache of type `ProcessType::CacheType` + /// to be used for the caching of the result. + /// - `ProcessType::cacheCostFunction()` is a static function suitable + /// for use with `CacheType::setIfUncached()`. + /// + template + static typename ProcessType::ResultType acquireCollaborativeResult( + const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments&&... args + ); + private : + class Collaboration; + template + class TypedCollaboration; + static bool forceMonitoringInternal( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType ); void emitError( const std::string &error, const Plug *source = nullptr ) const; @@ -111,6 +137,7 @@ class GAFFER_API Process : private ThreadState::Scope const Plug *m_plug; const Plug *m_destinationPlug; const Process *m_parent; + const Collaboration *m_collaboration; }; diff --git a/include/Gaffer/Process.inl b/include/Gaffer/Process.inl index c8d8a71861b..19d67cccbbc 100644 --- a/include/Gaffer/Process.inl +++ b/include/Gaffer/Process.inl @@ -34,9 +34,408 @@ // ////////////////////////////////////////////////////////////////////////// +#include "tbb/concurrent_hash_map.h" +#include "tbb/spin_mutex.h" +#include "tbb/task_arena.h" +#include "tbb/task_group.h" + +#include + namespace Gaffer { +/// Process Graph Overview +/// ====================== +/// +/// > Note : These notes (and the Process design itself) are heavily biased +/// towards ValuePlug and ComputeNode, and their associated ComputeProcess and +/// HashProcess. +/// +/// It's tempting to think that because processes are stack-allocated, they each +/// have a single parent process waiting for them to complete, and that each +/// process is only waiting on a single child. It's also tempting to think that +/// there is a one-to-one correspondence between nodes and processes. +/// +/// Node graph Process graph +/// ---------- ------------- +/// +/// AddNode1 o current process +/// | | +/// AddNode2 o waiting process (lower in stack) +/// | | +/// AddNode3 o waiting process (even lower in stack) +/// +/// While that is true for the simple case shown above, the reality is far more +/// complicated due to contexts, multithreading, task collaboration and hash +/// aliasing. +/// +/// Contexts +/// -------- +/// +/// Processes are operations being performed by a node for a particular plug, in +/// a _particular context_. The topology of the process graph does not +/// correspond directly to the topology of the node graph itself. Rather, the +/// process graph is generated dynamically in response to each process launching +/// upstream processes it depends on. +/// +/// Loop <--- o Loop, loop:index=0 +/// | | | +/// v | o AddNode, loop:index=0 +/// AddNode-- | +/// o Loop, loop:index=1 +/// | +/// o AddNode, loop:index=1 +/// | +/// o ... +/// +/// As this example shows, cyclic _connections_ between plugs are even OK +/// provided that each process launches child _processes_ in a different context, +/// meaning that there are no cyclic dependencies between _processes_. +/// Even in this case, every process has only a single child and a single +/// parent, all living on the stack of a single thread, so the topology of +/// our process graph remains completely linear. But that ends as soon as +/// we consider multithreading. +/// +/// Multithreading +/// -------------- +/// +/// A single process can use TBB tasks to launch many child processes that may +/// each be run on a different thread : +/// +/// Random o o o current processes, one per thread +/// | \ | / +/// Collect o waiting process +/// +/// In this case, a single parent process may be waiting for multiple children +/// to complete. Our simple linear "graph" is now a directed tree (I'm using +/// terminology loosely here, I think the official term would be an +/// "arborescence"). +/// +/// This doesn't present any great obstacle in itself - the only new requirement +/// is that each TBB task scopes the ThreadState from the parent process, so +/// that we can associate the tasks's processes with the correct parent and run them in +/// the correct context. But it does highlight that a parent process may have +/// many children, and that processes may perform arbitrarily expensive amounts +/// of work. +/// +/// Task collaboration +/// ------------------ +/// +/// Now that we know there can be processes in-flight on each thread, we need to +/// consider what happens if two or more threads simultaneously want a result +/// from the same not-yet-run upstream process. Gaffer cannot query the upstream +/// dependencies for a process before launching it, and therefore cannot perform +/// any up-front task scheduling. So in the example below, when two threads are +/// each running their own process and they dynamically turn out to require the +/// same upstream dependency, we need to deal with it dynamically. +/// +/// AddNode1 ? ? +/// / \ | | +/// AddNode2 AddNode3 o o +/// +/// One approach is to simply allow each thread to run their own copy of the +/// process redundantly, and in fact this is a reasonable strategy that we do use +/// for lightweight processes. +/// +/// AddNode1 o o +/// / \ | | +/// AddNode2 AddNode3 o o +/// +/// But where a process is expensive, duplication is not +/// an option. We need to arrange things such that we launch the upstream +/// compute on one thread, and have the other wait for its completion. +/// +/// Collect o +/// / \ / \ < second thread waiting for process +/// AddNode2 AddNode3 o o launched by first thread +/// +/// Ideally we don't want the waiting thread to simply block or spin though, as +/// that quickly reduces to only a single thread doing useful work. Instead we +/// want to provide the facility for waiting threads to _collaborate_, by +/// working on any TBB tasks spawned by the upstream process. We now have a new +/// requirement : we need to track the in-flight processes that are available +/// for collaboration, which we do in `Process::acquireCollaborativeResult()`. +/// And our process graphs can now contain diamond connections at collaboration +/// points, making them general directed acyclic graphs rather than simple +/// trees. +/// +/// Hash aliasing +/// ------------- +/// +/// To track in-flight processes we need a way of identifying them, and we do +/// this using the same key that is used to cache their results. In the case of +/// ComputeProcess, the key is a hash generated by `ComputeNode::hash()`, which +/// must uniquely identify the result of the process. +/// +/// But we have a problem : this hash can _alias_, and indeed it is encouraged +/// to. By aliasing, we mean that two processes can have the same hash provided +/// that they will generate the same result. For example, two different +/// SceneReader nodes will share hashes if they are each reading from the same +/// file. Or two locations within a scene will share hashes if they are known to +/// generate identical objects. In both cases, aliasing the hashes allows us to +/// avoid redundant computes and the creation of redundant cache entries. But this +/// adds complexity to the process graph - through hash aliasing, processes can +/// end up collaborating on nodes they have no actual connection to. +/// +/// Collect1 Collect2 o < Collect1 and Collect2 have the same +/// | | / \ < hash, so Expression2 is now +/// Expression1 Expression2 o o < collaborating on Collect1! +/// +/// Again, this is desirable as it reduces redundant work. But hashes can also +/// alias in less predictable ways. As `ExpressionTest.testHashAliasing` +/// shows, it's possible to create a node network such that a downstream node +/// depends on an upstream node with an _identical hash_. If we attempt process +/// collaboration in this case, we create a cyclic dependency that results in +/// a form of deadlock. +/// +/// Expression1 +/// | +/// Expression2 o----- +/// | | | +/// Expression3 o<---- +/// +/// This is _the_ key problem in our management of threaded collaborative +/// processes. We want node authors to be free to alias hashes without +/// constraint, to reduce redundant computes and cache pressure to the maximum +/// extent possible. But with the right node graph, _any_ aliasing may +/// lead to a cyclic dependency evolving dynamically in the corresponding +/// process graph. +/// +/// In practice, such cyclic dependencies are rare, but not rare enough +/// that we can neglect them completely. Our stragegy is therefore to +/// perform collaboration wherever we can, but to replace it with one +/// additional "redundant" process where collaboration would cause a +/// cycle. +/// +/// Expression1 o < this process has the same hash... +/// | | +/// Expression2 o +/// | | +/// Expression3 o < ...as this one +/// +/// Conceptually this is relatively simple, but it is made trickier by the +/// constantly mutating nature of the process graph. Although all new processes +/// are always added at the leafs of the process "tree", collaboration can insert +/// arbitrary diamond dependencies between existing processes anywhere in the +/// graph, at any time, and from any thread, and our cycle checking must account +/// for this without introducing excessive overhead. +/// +/// > Tip : At this point it is useful to forget about nodes and plugs and +/// connections and to instead consider the process graph largely in the +/// abstract. Processes are vertices in the graph. Dependencies are directed +/// edges between processes. Edge insertion may be attempted anywhere by +/// collaboration at any time, and cycles must be avoided. + +/// A "vertex" in the process graph where collaboration may be performed. We +/// only track collaborative processes because non-collaborative processes can't +/// introduce edges that could lead to cycles. +class GAFFER_API Process::Collaboration : public IECore::RefCounted +{ + + public : + + // Work around https://bugs.llvm.org/show_bug.cgi?id=32978 + ~Collaboration() noexcept( true ) override; + + IE_CORE_DECLAREMEMBERPTR( Collaboration ); + + // Arena and task group used to allow waiting threads to participate + // in collaborative work. + tbb::task_arena arena; + tbb::task_group taskGroup; + + using Set = std::unordered_set; + // Collaborations depending directly on this one. + Set dependents; + + // Returns true if this collaboration depends on `collaboration`, either + // directly or indirectly via other collaborations it depends on. + // The caller of this function must hold `g_dependentsMutex`. + bool dependsOn( const Collaboration *collaboration ) const; + + // Protects access to `dependents` on _all_ Collaborations. + static tbb::spin_mutex g_dependentsMutex; + +}; + +/// Collaboration subclass specific to a single type of process, providing storage for the result +/// and tracking of the currently in-flight collaborations by cache key. +/// +/// > Note : We track dependencies between all types of collaboration, not just between like types. +template +class Process::TypedCollaboration : public Process::Collaboration +{ + public : + + std::optional result; + + IE_CORE_DECLAREMEMBERPTR( TypedCollaboration ); + + using PendingCollaborations = tbb::concurrent_hash_map>; + static PendingCollaborations g_pendingCollaborations; + +}; + +template +typename Process::TypedCollaboration::PendingCollaborations Process::TypedCollaboration::g_pendingCollaborations; + +template +typename ProcessType::ResultType Process::acquireCollaborativeResult( + const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments&&... args +) +{ + const ThreadState &threadState = ThreadState::current(); + const Collaboration *currentCollaboration = threadState.process() ? threadState.process()->m_collaboration : nullptr; + + // Check for any in-flight computes for the same cache key. If we find a + // suitable one, we'll wait for it and use its result. + + using CollaborationType = TypedCollaboration; + using CollaborationTypePtr = typename CollaborationType::Ptr; + + typename CollaborationType::PendingCollaborations::accessor accessor; + CollaborationType::g_pendingCollaborations.insert( accessor, cacheKey ); + + for( const auto &candidate : accessor->second ) + { + // Check to see if we can safely collaborate on `candidate` without + // risking deadlock. We optimistically perform the cheapest checks + // first; if we're not already in a collaboration, or if the + // collaboration we're in already depends on the candidate (via another + // thread of execution) then we're good to go. + // + // The call to `candidate->dependents.find()` is safe even though we + // don't hold `g_dependentsMutex`, because we hold the accessor for + // `candidate`, and that is always held by any writer of + // `candidate->dependents`. + if( currentCollaboration && candidate->dependents.find( currentCollaboration ) == candidate->dependents.end() ) + { + // Perform much more expensive check for potential deadlock - we + // mustn't become a dependent of `candidate` if it already depends + // on us. This requires traversing all dependents of + // `currentCollaboration` while holding `g_dependentsMutex` (so they + // can't be modified while we read). + tbb::spin_mutex::scoped_lock dependentsLock( Collaboration::g_dependentsMutex ); + if( !candidate->dependsOn( currentCollaboration ) ) + { + // We're safe to collaborate. Add ourself as a dependent before + // releasing `g_dependentsMutex`. + candidate->dependents.insert( currentCollaboration ); + } + else + { + continue; + } + } + + // We've found an in-flight process we can wait on without causing + // deadlock. Join its `task_arena` and wait on the result, so we get to + // work on any TBB tasks it has created. + // + // > Note : We need to own a reference to `collaboration` because the + // thread that created it may drop its own reference as soon as we call + // `release()`, because that allows the original `run_and_wait()` to + // complete. + // + // > Caution : Now the primary `run_and_wait()` can return, any other + // waiting threads can also move on. That means that + // `collaboration->dependents` may now contain dangling pointers. Do + // not access them! + + CollaborationTypePtr collaboration = candidate; + accessor.release(); + + collaboration->arena.execute( + [&]{ return collaboration->taskGroup.wait(); } + ); + + if( collaboration->result ) + { + return *collaboration->result; + } + else + { + throw IECore::Exception( "Process::acquireCollaborativeResult : No result found" ); + } + } + + // No suitable in-flight collaborations, so we'll create one of our own. + // First though, check the cache one more time, in case another thread has + // started and finished an equivalent collaboration since we first checked. + + if( auto result = ProcessType::g_cache.getIfCached( cacheKey ) ) + { + return *result; + } + + CollaborationTypePtr collaboration = new CollaborationType; + if( currentCollaboration ) + { + // No need to hold `m_dependentsMutex` here because other threads can't + // access `collaboration->dependents` until we publish it. + collaboration->dependents.insert( currentCollaboration ); + } + + std::exception_ptr exception; + + auto status = collaboration->arena.execute( + [&] { + return collaboration->taskGroup.run_and_wait( + [&] { + // Publish ourselves so that other threads can collaborate + // by calling `collaboration->taskGroup.wait()`. + accessor->second.push_back( collaboration ); + accessor.release(); + + try + { + ProcessType process( std::forward( args )... ); + process.m_collaboration = collaboration.get(); + collaboration->result = process.run(); + // Publish result to cache before we remove ourself from + // `g_pendingCollaborations`, so that other threads will + // be able to get the result one way or the other. + ProcessType::g_cache.setIfUncached( + cacheKey, *collaboration->result, + ProcessType::cacheCostFunction + ); + } + catch( ... ) + { + // Don't allow `task_group::wait()` to see exceptions, + // because then we'd hit a thread-safety bug in + // `tbb::task_group_context::reset()`. + exception = std::current_exception(); + } + + // Now we're done, remove `collaboration` from the pending collaborations. + [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); + assert( found ); + auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); + assert( toErase != accessor->second.end() ); + accessor->second.erase( toErase ); + if( accessor->second.empty() ) + { + CollaborationType::g_pendingCollaborations.erase( accessor ); + } + accessor.release(); + } + ); + } + ); + + if( exception ) + { + std::rethrow_exception( exception ); + } + else if( status == tbb::task_group_status::canceled ) + { + throw IECore::Cancelled(); + } + + return *collaboration->result; +} + inline bool Process::forceMonitoring( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType ) { if( s.m_mightForceMonitoring ) diff --git a/src/Gaffer/Process.cpp b/src/Gaffer/Process.cpp index 51a98d249d9..5f5fb3a85c5 100644 --- a/src/Gaffer/Process.cpp +++ b/src/Gaffer/Process.cpp @@ -71,15 +71,53 @@ std::string prefixedWhat( const IECore::Exception &e ) } // namespace +////////////////////////////////////////////////////////////////////////// +// Collaboration +////////////////////////////////////////////////////////////////////////// + +Process::Collaboration::~Collaboration() noexcept( true ) +{ +} + +bool Process::Collaboration::dependsOn( const Collaboration *collaboration ) const +{ + if( collaboration == this ) + { + return true; + } + + std::unordered_set visited; + std::deque toVisit( { collaboration } ); + + while( !toVisit.empty() ) + { + const Collaboration *c = toVisit.front(); + toVisit.pop_front(); + if( !visited.insert( c ).second ) + { + continue; + } + if( c->dependents.count( this ) ) + { + return true; + } + toVisit.insert( toVisit.end(), c->dependents.begin(), c->dependents.end() ); + } + + return false; +} + +tbb::spin_mutex Process::Collaboration::g_dependentsMutex; + ////////////////////////////////////////////////////////////////////////// // Process ////////////////////////////////////////////////////////////////////////// Process::Process( const IECore::InternedString &type, const Plug *plug, const Plug *destinationPlug ) - : m_type( type ), m_plug( plug ), m_destinationPlug( destinationPlug ? destinationPlug : plug ) + : m_type( type ), m_plug( plug ), m_destinationPlug( destinationPlug ? destinationPlug : plug ), + m_parent( m_threadState->m_process ), m_collaboration( m_parent ? m_parent->m_collaboration : nullptr ) { IECore::Canceller::check( context()->canceller() ); - m_parent = m_threadState->m_process; m_threadState->m_process = this; for( const auto &m : *m_threadState->m_monitors ) From 9399899fdcae50a11c2bd458c5c7f5f8a9b15b0e Mon Sep 17 00:00:00 2001 From: John Haddon Date: Tue, 3 Oct 2023 10:57:08 +0100 Subject: [PATCH 03/11] ProcessTest : Add tests for `Process::acquireCollaborativeResult()` --- python/GafferTest/ProcessTest.py | 433 ++++++++++++++++++++++ python/GafferTest/__init__.py | 1 + src/GafferTestModule/GafferTestModule.cpp | 2 + src/GafferTestModule/ProcessTest.cpp | 199 ++++++++++ src/GafferTestModule/ProcessTest.h | 44 +++ 5 files changed, 679 insertions(+) create mode 100644 python/GafferTest/ProcessTest.py create mode 100644 src/GafferTestModule/ProcessTest.cpp create mode 100644 src/GafferTestModule/ProcessTest.h diff --git a/python/GafferTest/ProcessTest.py b/python/GafferTest/ProcessTest.py new file mode 100644 index 00000000000..466924e8e31 --- /dev/null +++ b/python/GafferTest/ProcessTest.py @@ -0,0 +1,433 @@ +########################################################################## +# +# Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided with +# the distribution. +# +# * Neither the name of John Haddon nor the names of +# any other contributors to this software may be used to endorse or +# promote products derived from this software without specific prior +# written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +########################################################################## + +import unittest + +import IECore + +import Gaffer +import GafferTest + +class ProcessTest( GafferTest.TestCase ) : + + def setUp( self ) : + + GafferTest.TestCase.setUp( self ) + GafferTest.clearTestProcessCache() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testCollaboration( self ) : + + # We expect processes `1...n` to collaborate on process `n+1`. + # + # n+1 + # / | \ + # 1 ... n + # \ | / + # 0 + # + # Note on conventions used throughout this file : + # + # - Processes are labelled with the value of their result. + # - Lines connecting processes denote dependencies between them. + # - Dependent processes appear below the processes they depend on, + # matching the typical top-to-bottom flow of a Gaffer graph. + # - The root process is therefore always the bottom-most one. + + n = 10000 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { x : { n + 1 : {} } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testCollaborationFromNonCollaborativeProcesses( self ) : + + # As above, but the waiting processes are not themselves collaborative. + # + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 + + n = 100000 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { -x : { 1 : {} } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 10 + # | + # 10 + # | + # 10 + # | + # 10 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 1, { 10 : { 10 : { 10 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnIndirectRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 1 + # | + # 2 + # | + # 1 + # | + # 0 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 0, { 1 : { 2 : { 1 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNonCollaborativeProcessWithinRecursion( self ) : + + # As above, but using a non-collaborative task in the middle of the recursion. + # + # 1 + # | + # -2 + # | + # 1 + # | + # 0 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 0, { 1 : { -2 : { 1 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnDiamondRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 1 + # | + # 3 + # / \ + # 1 2 + # \ / + # 0 + + plug = Gaffer.Plug() + + for i in range( 0, 100 ) : + GafferTest.clearTestProcessCache() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { + 1 : { 3 : { 1 : {} } }, + 2 : { 3 : { 1 : {} } } + } + ) + + # There are various possibilities for execution, based on different + # thread timings. + # + # - The `0-2-3-1` branch completes first, so `1` is already cached by + # the time the `0-1` branch wants it. 4 computes total. + # - The `0-1-3-1` branch completes first, with a duplicate compute for + # `1` to avoid deadlock. 5 computes total. + # - The `0-2-3` branch waits on `1` from the `0-1` branch. The `0-1` + # branch performs duplicate computes for `3` and `1` to avoid deadlock. + # 6 computes total. + self.assertIn( monitor.plugStatistics( plug ).computeCount, { 4, 5, 6 } ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testNoCollaborationOnIndirectDiamondRecursion( self ) : + + # As above, but with an additional process (4), meaning we have + # to track non-immediate dependencies between processes. + # + # 1 + # | + # 4 + # | + # 3 + # / \ + # 1 2 + # \ / + # 0 + + plug = Gaffer.Plug() + + for i in range( 0, 100 ) : + GafferTest.clearTestProcessCache() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { + 1 : { 3 : { 4 : { 1 : {} } } }, + 2 : { 3 : { 4 : { 1 : {} } } }, + } + ) + + self.assertIn( monitor.plugStatistics( plug ).computeCount, { 5, 6, 8 } ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } ) + def testCollaborationTaskDistribution( self ) : + + # -1 ... -n plug4 + # \ | / + # n+1 plug3 + # / | \ + # 1 ... n plug2 + # \ | / + # 0 plug1 + + numWorkers = IECore.tbb_global_control.active_value( IECore.tbb_global_control.parameter.max_allowed_parallelism ) + n = 10000 * numWorkers + + plug1 = Gaffer.Plug() + plug2 = Gaffer.Plug() + plug3 = Gaffer.Plug() + plug4 = Gaffer.Plug() + plug1.setInput( plug2 ) + plug2.setInput( plug3 ) + plug3.setInput( plug4 ) + + dependencies = { -x : {} for x in range( 1, n + 1 ) } + dependencies = { x : { n + 1 : dependencies } for x in range( 1, n + 1 ) } + + intPlug = Gaffer.IntPlug() + GafferTest.parallelGetValue( intPlug, 100000 ) # Get worker threads running in advance + + with Gaffer.PerformanceMonitor() as monitor, Gaffer.ThreadMonitor() as threadMonitor : + GafferTest.runTestProcess( plug1, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug1 ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( plug2 ).computeCount, n ) + self.assertEqual( monitor.plugStatistics( plug3 ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( plug4 ).computeCount, n ) + + def assertExpectedThreading( plug, numTasks ) : + + s = threadMonitor.plugStatistics( plug ) # Dict mapping thread ID to number of computes + self.assertEqual( sum( s.values() ), numTasks ) + + if numTasks == 1 : + self.assertEqual( len( s ), 1 ) + else : + # Check that every worker thread contributed some work. + self.assertEqual( len( s ), numWorkers ) + # Check that each worker thread did at least half of its fair + # share of work. This assertion is too sensitive in CI so it is + # disabled by default. On my local test machine (Dual 16 core + # Xeon with hyperthreading) I see pretty reliable success up to + # `-threads 32`, and regular failure at `-threads 64` (the + # default). + if False : + for t in s.values() : + self.assertGreaterEqual( t, 0.5 * numTasks / numWorkers ) + + assertExpectedThreading( plug1, 1 ) + assertExpectedThreading( plug2, n ) + assertExpectedThreading( plug3, 1 ) + assertExpectedThreading( plug4, n ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testFanOutGatherPerformance( self ) : + + # Pathological case for cycle checking - huge permutations + # of paths through the downstream graph. + # + # 0 + # / | \ + # 1 2 3 + # \ | / + # 4 + # / | \ + # 5 6 7 + # \ | / + # 8 + # / | \ + # 9 10 11 + # \ | / + # 12 (for width 3 and depth 3) + + width = 64 + depth = 10 + + dependencies = {} + i = 0 + for d in range( 0, depth ) : + dependencies = { i : dependencies } + i += 1 + dependencies = { w : dependencies for w in range( i, i + width ) } + i += width + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.clearTestProcessCache() + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, i, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, i + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testCollaborationPerformance( self ) : + + # -(n+1)...-2n + # \ | / + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 + + n = 100000 + + upstreamDependencies = { -x : {} for x in range( n + 1, 2 * n + 1 ) } + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( + plug, 0, + { -x : { 1 : upstreamDependencies } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 + n ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testCollaborationTransitionPerformance( self ) : + + # Case we think is probably pretty common - all threads need to migrate + # through a series of collaborations. + # + # 3 + # / | \ + # -(2n+1)...-3n + # \ | / + # 2 + # / | \ + # -(n+1)...-2n + # \ | / + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 (for depth 3) + + n = IECore.hardwareConcurrency() + depth = 1000 + + dependencies = {} + for d in range( depth, 0, -1 ) : + dependencies = { -x : { d : dependencies } for x in range( (d-1) * n + 1, d * n + 1 ) } + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, depth * ( n + 1 ) + 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testDeepTreePerformance( self ) : + + # Models things like the recursive computation of bounding boxes in GafferScene + # + # 3 4 5 6 + # \ / \ / + # 1 2 + # \ / + # \ / + # 0 (for maxDepth 2 and branchFactor 2) + + maxDepth = 14 + branchFactor = 2 + + def makeDependencies( n, depth = 0 ) : + + if depth == maxDepth : + return {} + + return { i : makeDependencies( i, depth + 1 ) for i in range( n * branchFactor + 1, (n + 1) * branchFactor + 1 ) } + + dependencies = makeDependencies( 0 ) + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, sum( branchFactor ** d for d in range( 0, maxDepth + 1 ) ) ) + +if __name__ == "__main__": + unittest.main() diff --git a/python/GafferTest/__init__.py b/python/GafferTest/__init__.py index 12f7263f04b..24d0b0dd4ac 100644 --- a/python/GafferTest/__init__.py +++ b/python/GafferTest/__init__.py @@ -159,6 +159,7 @@ def inCI( platforms = set() ) : from .OptionalValuePlugTest import OptionalValuePlugTest from .ThreadMonitorTest import ThreadMonitorTest from .CollectTest import CollectTest +from .ProcessTest import ProcessTest from .IECorePreviewTest import * diff --git a/src/GafferTestModule/GafferTestModule.cpp b/src/GafferTestModule/GafferTestModule.cpp index 24c54280e4c..46f1d1d04b3 100644 --- a/src/GafferTestModule/GafferTestModule.cpp +++ b/src/GafferTestModule/GafferTestModule.cpp @@ -50,6 +50,7 @@ #include "ValuePlugTest.h" #include "MessagesTest.h" #include "MetadataTest.h" +#include "ProcessTest.h" #include "SignalsTest.h" #include "IECorePython/ScopedGILRelease.h" @@ -116,6 +117,7 @@ BOOST_PYTHON_MODULE( _GafferTest ) bindValuePlugTest(); bindMessagesTest(); bindSignalsTest(); + bindProcessTest(); object module( borrowed( PyImport_AddModule( "GafferTest._MetadataTest" ) ) ); scope().attr( "_MetadataTest" ) = module; diff --git a/src/GafferTestModule/ProcessTest.cpp b/src/GafferTestModule/ProcessTest.cpp new file mode 100644 index 00000000000..0af1176aa02 --- /dev/null +++ b/src/GafferTestModule/ProcessTest.cpp @@ -0,0 +1,199 @@ +////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials provided with +// the distribution. +// +// * Neither the name of John Haddon nor the names of +// any other contributors to this software may be used to endorse or +// promote products derived from this software without specific prior +// written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +////////////////////////////////////////////////////////////////////////// + +#include "boost/python.hpp" + +#include "ProcessTest.h" + +#include "GafferTest/Assert.h" + +#include "Gaffer/Context.h" +#include "Gaffer/NumericPlug.h" +#include "Gaffer/TypedObjectPlug.h" +#include "Gaffer/Private/IECorePreview/LRUCache.h" +#include "Gaffer/Process.h" + +#include "IECorePython/ScopedGILRelease.h" + +#include "tbb/parallel_for_each.h" + +using namespace boost::python; +using namespace IECore; +using namespace Gaffer; + +namespace +{ + +struct Dependencies : public IECore::RefCounted +{ + IE_CORE_DECLAREMEMBERPTR( Dependencies ); + using Map = std::map; + Map map; +}; + +// Test Process +// ============ +// +// This Process subclass is used primarily to test the collaboration mechanism +// provided by `Process::acquireCollaborativeResult()`. The result is an integer +// which is given to the TestProcess directly, and which also provides the cache +// key. The upstream dependencies are also given verbatim to TestProcess as a +// nested dictionary of integers mapping from the result for each dependency to +// the dictionary for _its_ upstream dependencies. Non-negative results are +// computed using `acquireCollaborativeResult()` and negative results are +// computed by constructing a TestProcess directly. This mechanism lets us +// create a variety of process graphs very explicitly from ProcessTestCase. +class TestProcess : public Process +{ + + public : + + TestProcess( const Plug *plug, int result, const Dependencies::ConstPtr &dependencies ) + : Process( g_staticType, plug, plug ), m_result( result ), m_dependencies( dependencies ) + { + } + + using ResultType = int; + + ResultType run() const + { + const ThreadState &threadState = ThreadState::current(); + tbb::task_group_context taskGroupContext( tbb::task_group_context::isolated ); + + tbb::parallel_for_each( + + m_dependencies->map.begin(), m_dependencies->map.end(), + + [&] ( const Dependencies::Map::value_type &dependency ) { + + ThreadState::Scope scope( threadState ); + const int expectedResult = dependency.first; + + const Plug *p = plug(); + if( const Plug *input = p->getInput() ) + { + // Compute the dependencies using the plug's input if it + // has one, otherwise using this plug. The only reason + // for using an input is get more fine-grained information + // from the Monitors used in the unit tests (because they + // capture statistics per plug). + p = input; + } + + int actualResult; + if( expectedResult >= 0 ) + { + actualResult = Process::acquireCollaborativeResult( expectedResult, p, expectedResult, dependency.second ); + } + else + { + actualResult = TestProcess( p, expectedResult, dependency.second ).run(); + } + GAFFERTEST_ASSERTEQUAL( actualResult, expectedResult ); + + }, + + taskGroupContext + + ); + + return m_result; + } + + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; + + static size_t cacheCostFunction( int value ) + { + return 1; + } + + private : + + const int m_result; + const Dependencies::ConstPtr m_dependencies; + + static const IECore::InternedString g_staticType; + +}; + +TestProcess::CacheType TestProcess::g_cache( TestProcess::CacheType::GetterFunction(), 100000 ); +// Spoof type so that we can use PerformanceMonitor to check we get the processes we expect in ProcessTest.py. +const IECore::InternedString TestProcess::g_staticType( "computeNode:compute" ); + +Dependencies::ConstPtr dependenciesFromDict( dict dependenciesDict, std::unordered_map &converted ) +{ + auto it = converted.find( dependenciesDict.ptr() ); + if( it != converted.end() ) + { + return it->second; + } + + Dependencies::Ptr result = new Dependencies; + auto items = dependenciesDict.items(); + for( size_t i = 0, l = len( items ); i < l; ++i ) + { + int v = extract( items[i][0] ); + dict d = extract( items[i][1] ); + result->map[v] = dependenciesFromDict( d, converted ); + } + + converted[dependenciesDict.ptr()] = result; + return result; +} + +void runTestProcess( const Plug *plug, int expectedResult, dict dependenciesDict ) +{ + std::unordered_map convertedDependencies; + Dependencies::ConstPtr dependencies = dependenciesFromDict( dependenciesDict, convertedDependencies ); + + Context::EditableScope context( Context::current() ); + int result = TestProcess( plug, expectedResult, dependencies ).run(); + GAFFERTEST_ASSERTEQUAL( result, expectedResult ); +} + +void clearTestProcessCache() +{ + TestProcess::g_cache.clear(); +} + +} // namespace + +void GafferTestModule::bindProcessTest() +{ + def( "runTestProcess", &runTestProcess ); + def( "clearTestProcessCache", &clearTestProcessCache ); +} diff --git a/src/GafferTestModule/ProcessTest.h b/src/GafferTestModule/ProcessTest.h new file mode 100644 index 00000000000..1975e4a1cfc --- /dev/null +++ b/src/GafferTestModule/ProcessTest.h @@ -0,0 +1,44 @@ +////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials provided with +// the distribution. +// +// * Neither the name of John Haddon nor the names of +// any other contributors to this software may be used to endorse or +// promote products derived from this software without specific prior +// written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +////////////////////////////////////////////////////////////////////////// + +#pragma once + +namespace GafferTestModule +{ + +void bindProcessTest(); + +} // namespace GafferTestModule From 62ccbff6a53cc5c5302659dfa49b546aa49350b1 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 23 Aug 2023 17:20:13 +0100 Subject: [PATCH 04/11] ValuePlug : Use `Process::acquireCollaborativeResult()` Fixes #4978 --- Changes.md | 1 + src/Gaffer/ValuePlug.cpp | 517 +++++++++++++++------------------------ 2 files changed, 204 insertions(+), 314 deletions(-) diff --git a/Changes.md b/Changes.md index f3fe86254bd..df6c120bc6c 100644 --- a/Changes.md +++ b/Changes.md @@ -16,6 +16,7 @@ Improvements Fixes ----- +- ValuePlug : Fixed hangs and poor performance caused by plugs depending on upstream plugs with an identical hash (#4978). - Filter : Fixed bug which allowed the `scene:path` context variable to "leak" upstream via the `Filter.enabled` plug. This caused unnecessary evaluations of the input, and also provided a loophole via which the filter result could be made inconsistent with respect to descendant and ancestor matches. - Windows : - Fixed a bug preventing anything except strings from being copied and pasted. diff --git a/src/Gaffer/ValuePlug.cpp b/src/Gaffer/ValuePlug.cpp index 3272c17c587..5d32a087781 100644 --- a/src/Gaffer/ValuePlug.cpp +++ b/src/Gaffer/ValuePlug.cpp @@ -52,6 +52,7 @@ #include "fmt/format.h" #include +#include using namespace Gaffer; @@ -139,41 +140,11 @@ size_t hash_value( const HashCacheKey &key ) return result; } -// Derives from HashCacheKey, adding on everything we need to -// construct a HashProcess. We access our caches via this augmented -// key so that we have all the information we need in our getter -// functions. -// -// Note the requirements the LRUCache places on a GetterKey like this : -// "it must be implicitly castable to Key, and all GetterKeys -// which yield the same Key must also yield the same results -// from the GetterFunction". We meet these requirements as follows : -// -// - `computeNode` and `cachePolicy` are properties of the plug which -// is included in the HashCacheKey. We store them explicitly only -// for convenience and performance. -// - `context` is represented in HashCacheKey via `contextHash`. -// - `destinationPlug` does not influence the results of the computation -// in any way. It is merely used for error reporting. -struct HashProcessKey : public HashCacheKey +// `tbb_hasher` is a requirement of `Process::acquireCollaborativeResult()`, +// which uses `tbb::concurrent_hash_map` internally to manage collaborations. +size_t tbb_hasher( const HashCacheKey &key ) { - HashProcessKey( const ValuePlug *plug, const ValuePlug *destinationPlug, const Context *context, uint64_t dirtyCount, const ComputeNode *computeNode, ValuePlug::CachePolicy cachePolicy ) - : HashCacheKey( plug, context, dirtyCount ), - destinationPlug( destinationPlug ), - computeNode( computeNode ), - cachePolicy( cachePolicy ) - { - } - - const ValuePlug *destinationPlug; - const ComputeNode *computeNode; - const ValuePlug::CachePolicy cachePolicy; -}; - -// Avoids LRUCache overhead for non-collaborative policies. -bool spawnsTasks( const HashProcessKey &key ) -{ - return key.cachePolicy == ValuePlug::CachePolicy::TaskCollaboration; + return hash_value( key ); } ValuePlug::HashCacheMode defaultHashCacheMode() @@ -210,6 +181,8 @@ class ValuePlug::HashProcess : public Process public : + // Interface used by ValuePlug. + static IECore::MurmurHash hash( const ValuePlug *plug ) { const ValuePlug *p = sourcePlug( plug ); @@ -244,83 +217,114 @@ class ValuePlug::HashProcess : public Process const ComputeNode *computeNode = IECore::runTimeCast( p->node() ); const ThreadState &threadState = ThreadState::current(); const Context *currentContext = threadState.context(); - const HashProcessKey processKey( p, plug, currentContext, p->m_dirtyCount, computeNode, computeNode ? computeNode->hashCachePolicy( p ) : CachePolicy::Uncached ); + const CachePolicy cachePolicy = computeNode ? computeNode->hashCachePolicy( p ) : CachePolicy::Uncached; - if( processKey.cachePolicy == CachePolicy::Uncached ) + if( cachePolicy == CachePolicy::Uncached ) { - HashProcess process( processKey ); - return process.m_result; + return HashProcess( p, plug, computeNode ).run(); } - else if( Process::forceMonitoring( threadState, plug, ValuePlug::HashProcess::staticType ) ) + + // Perform any pending adjustments to our thread-local cache. + + ThreadData &threadData = g_threadData.local(); + if( threadData.clearCache.load( std::memory_order_acquire ) ) { - HashProcess process( processKey ); - auto costFunction = [] ( const IECore::MurmurHash &key ) { return 1; }; - if( - processKey.cachePolicy == CachePolicy::TaskCollaboration || - processKey.cachePolicy == CachePolicy::TaskIsolation - ) - { - g_globalCache.setIfUncached( processKey, process.m_result, costFunction ); - } - else - { - ThreadData &threadData = g_threadData.local(); - threadData.cache.setIfUncached( processKey, process.m_result, costFunction ); - } - return process.m_result; + threadData.cache.clear(); + threadData.clearCache.store( 0, std::memory_order_release ); } - else + + if( threadData.cache.getMaxCost() != g_cacheSizeLimit ) { - // Perform any pending adjustments to our thread-local cache. + threadData.cache.setMaxCost( g_cacheSizeLimit ); + } + + // Then get our hash. We do this using this `acquireHash()` functor so that + // we can repeat the process for `Checked` mode. - ThreadData &threadData = g_threadData.local(); - if( threadData.clearCache.load( std::memory_order_acquire ) ) + const bool forceMonitoring = Process::forceMonitoring( threadState, plug, staticType ); + + auto acquireHash = [&]( const HashCacheKey &cacheKey ) { + + // Before we use this key, check that the dirty count hasn't maxed out + if( + cacheKey.dirtyCount == DIRTY_COUNT_RANGE_MAX || + cacheKey.dirtyCount == DIRTY_COUNT_RANGE_MAX + 1 + DIRTY_COUNT_RANGE_MAX + ) { - threadData.cache.clear(); - threadData.clearCache.store( 0, std::memory_order_release ); + throw IECore::Exception( "Dirty count exceeded max. Either you've left Gaffer running for 100 million years, or a strange bug is incrementing dirty counts way too fast." ); } - if( threadData.cache.getMaxCost() != g_cacheSizeLimit ) + // Check for an already-cached value in our thread-local cache, and return it if we have one. + if( !forceMonitoring ) { - threadData.cache.setMaxCost( g_cacheSizeLimit ); + if( auto result = threadData.cache.getIfCached( cacheKey ) ) + { + return *result; + } } - // And then look up the result in our cache. - if( g_hashCacheMode == HashCacheMode::Standard ) + // No value in local cache, so either compute it directly or get it via + // the global cache if it's expensive enough to warrant collaboration. + IECore::MurmurHash result; + if( cachePolicy == CachePolicy::Legacy || cachePolicy == CachePolicy::Standard ) { - return threadData.cache.get( processKey, currentContext->canceller() ); + result = HashProcess( p, plug, computeNode ).run(); } - else if( g_hashCacheMode == HashCacheMode::Checked ) + else { - HashProcessKey legacyProcessKey( processKey ); - legacyProcessKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; - - const IECore::MurmurHash check = threadData.cache.get( legacyProcessKey, currentContext->canceller() ); - const IECore::MurmurHash result = threadData.cache.get( processKey, currentContext->canceller() ); - - if( result != check ) + std::optional cachedValue; + if( !forceMonitoring ) + { + cachedValue = g_cache.getIfCached( cacheKey ); + } + if( cachedValue ) { - // This isn't exactly a process exception, but we want to treat it the same, in - // terms of associating it with a plug. Creating a ProcessException is the simplest - // approach, which can be done by throwing and then immediately wrapping - try - { - throw IECore::Exception( "Detected undeclared dependency. Fix DependencyNode::affects() implementation." ); - } - catch( ... ) - { - ProcessException::wrapCurrentException( processKey.plug, currentContext, staticType ); - } + result = *cachedValue; + } + else + { + result = Process::acquireCollaborativeResult( cacheKey, p, plug, computeNode ); } - return result; } - else + // Update local cache and return result + threadData.cache.setIfUncached( cacheKey, result, cacheCostFunction ); + return result; + }; + + const HashCacheKey cacheKey( p, currentContext, p->m_dirtyCount ); + if( g_hashCacheMode == HashCacheMode::Standard ) + { + return acquireHash( cacheKey ); + } + else if( g_hashCacheMode == HashCacheMode::Checked ) + { + HashCacheKey legacyCacheKey( cacheKey ); + legacyCacheKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; + const IECore::MurmurHash check = acquireHash( legacyCacheKey ); + const IECore::MurmurHash result = acquireHash( cacheKey ); + + if( result != check ) { - // HashCacheMode::Legacy - HashProcessKey legacyProcessKey( processKey ); - legacyProcessKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; - return threadData.cache.get( legacyProcessKey, currentContext->canceller() ); + // This isn't exactly a process exception, but we want to treat it the same, in + // terms of associating it with a plug. Creating a ProcessException is the simplest + // approach, which can be done by throwing and then immediately wrapping. + try + { + throw IECore::Exception( "Detected undeclared dependency. Fix DependencyNode::affects() implementation." ); + } + catch( ... ) + { + ProcessException::wrapCurrentException( p, currentContext, staticType ); + } } + return result; + } + else + { + // HashCacheMode::Legacy + HashCacheKey legacyCacheKey( cacheKey ); + legacyCacheKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; + return acquireHash( legacyCacheKey ); } } @@ -332,12 +336,12 @@ class ValuePlug::HashProcess : public Process static void setCacheSizeLimit( size_t maxEntriesPerThread ) { g_cacheSizeLimit = maxEntriesPerThread; - g_globalCache.setMaxCost( g_cacheSizeLimit ); + g_cache.setMaxCost( g_cacheSizeLimit ); } static void clearCache( bool now = false ) { - g_globalCache.clear(); + g_cache.clear(); // It's not documented explicitly, but it is safe to iterate over an // `enumerable_thread_specific` while `local()` is being called on // other threads, because the underlying container is a @@ -364,7 +368,7 @@ class ValuePlug::HashProcess : public Process static size_t totalCacheUsage() { - size_t usage = g_globalCache.currentCost(); + size_t usage = g_cache.currentCost(); tbb::enumerable_thread_specific::iterator it, eIt; for( it = g_threadData.begin(), eIt = g_threadData.end(); it != eIt; ++it ) { @@ -399,32 +403,33 @@ class ValuePlug::HashProcess : public Process static const IECore::InternedString staticType; - private : + // Interface required by `Process::acquireCollaborativeResult()`. - HashProcess( const HashProcessKey &key ) - : Process( staticType, key.plug, key.destinationPlug ) + HashProcess( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode ) + : Process( staticType, plug, destinationPlug ), m_computeNode( computeNode ) + { + } + + using ResultType = IECore::MurmurHash; + + ResultType run() const { try { - if( !key.computeNode ) + if( !m_computeNode ) { throw IECore::Exception( "Plug has no ComputeNode." ); } - // Before we use this key, check that the dirty count hasn't maxed out - if( - key.dirtyCount == DIRTY_COUNT_RANGE_MAX || - key.dirtyCount == DIRTY_COUNT_RANGE_MAX + 1 + DIRTY_COUNT_RANGE_MAX ) - { - throw IECore::Exception( "Dirty count exceeded max. Either you've left Gaffer running for 100 million years, or a strange bug is incrementing dirty counts way too fast." ); - } - - key.computeNode->hash( key.plug, context(), m_result ); + IECore::MurmurHash result; + m_computeNode->hash( static_cast( plug() ), context(), result ); - if( m_result == g_nullHash ) + if( result == g_nullHash ) { throw IECore::Exception( "ComputeNode::hash() not implemented." ); } + + return result; } catch( ... ) { @@ -432,76 +437,27 @@ class ValuePlug::HashProcess : public Process } } - static IECore::MurmurHash globalCacheGetter( const HashProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) - { - // Canceller will be passed to `ComputeNode::hash()` implicitly - // via the context. - assert( canceller == Context::current()->canceller() ); - cost = 1; - IECore::MurmurHash result; - switch( key.cachePolicy ) - { - case CachePolicy::TaskCollaboration : - { - HashProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskIsolation : - { - tbb::this_task_arena::isolate( - [&result, &key] { - HashProcess process( key ); - result = process.m_result; - } - ); - break; - } - default : - // Cache policy not valid for global cache. - assert( false ); - break; - } - - return result; - } + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; - static IECore::MurmurHash localCacheGetter( const HashProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) + static size_t cacheCostFunction( const IECore::MurmurHash &value ) { - assert( canceller == Context::current()->canceller() ); - cost = 1; - switch( key.cachePolicy ) - { - case CachePolicy::TaskCollaboration : - case CachePolicy::TaskIsolation : - return g_globalCache.get( key, canceller ); - default : - { - assert( key.cachePolicy != CachePolicy::Uncached ); - HashProcess process( key ); - return process.m_result; - } - } + return 1; } - // Global cache. We use this for heavy hash computations that will spawn subtasks, - // so that the work and the result is shared among all threads. - using GlobalCache = IECorePreview::LRUCache; - static GlobalCache g_globalCache; - static std::atomic g_legacyGlobalDirtyCount; + private : + const ComputeNode *m_computeNode; + static std::atomic g_legacyGlobalDirtyCount; static HashCacheMode g_hashCacheMode; - // Per-thread cache. This is our default cache, used for hash computations that are - // presumed to be lightweight. Using a per-thread cache limits the contention among - // threads. - using Cache = IECorePreview::LRUCache; - struct ThreadData { - ThreadData() : cache( localCacheGetter, g_cacheSizeLimit, Cache::RemovalCallback(), /* cacheErrors = */ false ), clearCache( 0 ) {} - Cache cache; + // Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. + ThreadData() : cache( CacheType::GetterFunction(), g_cacheSizeLimit, CacheType::RemovalCallback(), /* cacheErrors = */ false ), clearCache( 0 ) {} + using CacheType = IECorePreview::LRUCache; + CacheType cache; // Flag to request that hashCache be cleared. std::atomic_int clearCache; }; @@ -509,15 +465,14 @@ class ValuePlug::HashProcess : public Process static tbb::enumerable_thread_specific, tbb::ets_key_per_instance > g_threadData; static std::atomic_size_t g_cacheSizeLimit; - IECore::MurmurHash m_result; - }; const IECore::InternedString ValuePlug::HashProcess::staticType( ValuePlug::hashProcessType() ); tbb::enumerable_thread_specific, tbb::ets_key_per_instance > ValuePlug::HashProcess::g_threadData; // Default limit corresponds to a cost of roughly 25Mb per thread. std::atomic_size_t ValuePlug::HashProcess::g_cacheSizeLimit( 128000 ); -ValuePlug::HashProcess::GlobalCache ValuePlug::HashProcess::g_globalCache( globalCacheGetter, g_cacheSizeLimit, Cache::RemovalCallback(), /* cacheErrors = */ false ); +// Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. +ValuePlug::HashProcess::CacheType ValuePlug::HashProcess::g_cache( CacheType::GetterFunction(), g_cacheSizeLimit, CacheType::RemovalCallback(), /* cacheErrors = */ false ); std::atomic ValuePlug::HashProcess::g_legacyGlobalDirtyCount( 0 ); ValuePlug::HashCacheMode ValuePlug::HashProcess::g_hashCacheMode( defaultHashCacheMode() ); @@ -526,62 +481,13 @@ ValuePlug::HashCacheMode ValuePlug::HashProcess::g_hashCacheMode( defaultHashCac // and storing a cache of recently computed results. ////////////////////////////////////////////////////////////////////////// -namespace -{ - -// Contains everything needed to create a ComputeProcess. We access our -// cache via this key so that we have all the information we need in our getter -// function. -struct ComputeProcessKey -{ - ComputeProcessKey( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode, ValuePlug::CachePolicy cachePolicy, const IECore::MurmurHash *precomputedHash ) - : plug( plug ), - destinationPlug( destinationPlug ), - computeNode( computeNode ), - cachePolicy( cachePolicy ), - m_hash( precomputedHash ? *precomputedHash : IECore::MurmurHash() ) - { - } - - const ValuePlug *plug; - const ValuePlug *destinationPlug; - const ComputeNode *computeNode; - const ValuePlug::CachePolicy cachePolicy; - - operator const IECore::MurmurHash &() const - { - if( m_hash == g_nullHash ) - { - // Note : We call `plug->ValuePlug::hash()` rather than - // `plug->hash()` because we only want to represent the result of - // the private `getValueInternal()` method. Overrides such as - // `StringPlug::hash()` account for additional processing (such as - // substitutions) performed in public `getValue()` methods _after_ - // calling `getValueInternal()`. - m_hash = plug->ValuePlug::hash(); - } - return m_hash; - } - - private : - - mutable IECore::MurmurHash m_hash; - -}; - -// Avoids LRUCache overhead for non-collaborative policies. -bool spawnsTasks( const ComputeProcessKey &key ) -{ - return key.cachePolicy == ValuePlug::CachePolicy::TaskCollaboration; -} - -} // namespace - class ValuePlug::ComputeProcess : public Process { public : + // Interface used by ValuePlug. + static size_t getCacheMemoryLimit() { return g_cache.getMaxCost(); @@ -622,14 +528,11 @@ class ValuePlug::ComputeProcess : public Process } // A plug with an input connection or an output plug on a ComputeNode. There can be many values - - // one per context, computed via ComputeNode::compute(). Pull the value out of our cache, or compute - // it with a ComputeProcess. + // one per context, computed via `ComputeNode::compute()` or `Plug::setFrom()`. Determine our + // cache policy for the result. assert( (plug->getInput() && !computeNode) || computeNode ); - const ThreadState &threadState = ThreadState::current(); - const Context *currentContext = threadState.context(); - CachePolicy cachePolicy = CachePolicy::Uncached; if( p->getInput() ) { @@ -639,58 +542,70 @@ class ValuePlug::ComputeProcess : public Process } else if( computeNode ) { + // We'll be calling `compute()`. cachePolicy = computeNode->computeCachePolicy( p ); } - const ComputeProcessKey processKey( p, plug, computeNode, cachePolicy, precomputedHash ); + // If caching is off then its just a case of using a ComputeProcess + // to do the work. - if( processKey.cachePolicy == CachePolicy::Uncached ) - { - return ComputeProcess( processKey ).m_result; - } - else if( Process::forceMonitoring( threadState, plug, ValuePlug::ComputeProcess::staticType ) ) + if( cachePolicy == CachePolicy::Uncached ) { - ComputeProcess process( processKey ); - g_cache.setIfUncached( - processKey, process.m_result, - []( const IECore::ConstObjectPtr &v ) { return v->memoryUsage(); } - ); - return process.m_result; + return ComputeProcess( p, plug, computeNode ).run(); } - else if( processKey.cachePolicy == CachePolicy::Legacy ) + + const ThreadState &threadState = ThreadState::current(); + + // If caching is on, then we first check for an already-cached value + // and return it if we have one. The cache is accessed via the + // plug's hash. + // + // > Note : We call `plug->ValuePlug::hash()` rather than + // > `plug->hash()` because we only want to represent the result of + // > the private `getValueInternal()` method. Overrides such as + // > `StringPlug::hash()` account for additional processing (such as + // > substitutions) performed in public `getValue()` methods _after_ + // > calling `getValueInternal()`. + const IECore::MurmurHash hash = p->ValuePlug::hash(); + + if( !Process::forceMonitoring( threadState, plug, staticType ) ) { - // Legacy code path, necessary until all task-spawning computes - // have declared an appropriate cache policy. We can't perform - // the compute inside `cacheGetter()` because that is called - // from inside a lock. If tasks were spawned without being - // isolated, TBB could steal an outer task which tries to get - // the same item from the cache, leading to deadlock. - if( auto result = g_cache.getIfCached( processKey ) ) + if( auto result = g_cache.getIfCached( hash ) ) { // Move avoids unnecessary additional addRef/removeRef. return std::move( *result ); } - ComputeProcess process( processKey ); - // Store the value in the cache, but only if it isn't there - // already. The check is useful because it's common for an - // upstream compute triggered by us to have already done the - // work, and calling `memoryUsage()` can be very expensive for - // some datatypes. A prime example of this is the attribute - // state passed around in GafferScene - it's common for a - // selective filter to mean that the attribute compute is - // implemented as a pass-through (thus an upstream node will - // already have computed the same result) and the attribute data - // itself consists of many small objects for which computing - // memory usage is slow. - g_cache.setIfUncached( - processKey, process.m_result, - []( const IECore::ConstObjectPtr &v ) { return v->memoryUsage(); } - ); - return process.m_result; + } + + // The value isn't in the cache, so we'll need to compute it, + // taking account of the cache policy. + + if( cachePolicy == CachePolicy::Legacy ) + { + // Do the compute ourselves, without worrying if the same + // compute is in flight elsewhere. We assume the compute is + // lightweight enough and unlikely enough to be shared that in + // the worst case it's OK to do it redundantly on a few threads + // before it gets cached. + IECore::ConstObjectPtr result = ComputeProcess( p, plug, computeNode ).run(); + // Store the value in the cache, but only if it isn't there already. + // The check is useful because it's common for an upstream compute + // triggered by us to have already done the work, and calling + // `memoryUsage()` can be very expensive for some datatypes. A prime + // example of this is the attribute state passed around in + // GafferScene - it's common for a selective filter to mean that the + // attribute compute is implemented as a pass-through (thus an + // upstream node will already have computed the same result) and the + // attribute data itself consists of many small objects for which + // computing memory usage is slow. + g_cache.setIfUncached( hash, result, cacheCostFunction ); + return result; } else { - return g_cache.get( processKey, currentContext->canceller() ); + return acquireCollaborativeResult( + hash, p, plug, computeNode + ); } } @@ -713,27 +628,33 @@ class ValuePlug::ComputeProcess : public Process static const IECore::InternedString staticType; - private : + // Interface required by `Process::acquireCollaborativeResult()`. + + ComputeProcess( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode ) + : Process( staticType, plug, destinationPlug ), m_computeNode( computeNode ) + { + } - ComputeProcess( const ComputeProcessKey &key ) - : Process( staticType, key.plug, key.destinationPlug ) + IECore::ConstObjectPtr run() const { try { - if( const ValuePlug *input = key.plug->getInput() ) + // Cast is safe because our constructor takes ValuePlugs. + const ValuePlug *valuePlug = static_cast( plug() ); + if( const ValuePlug *input = valuePlug->getInput() ) { // Cast is ok, because we know that the resulting setValue() call won't // actually modify the plug, but will just place the value in our m_result. - const_cast( key.plug )->setFrom( input ); + const_cast( valuePlug )->setFrom( input ); } else { - if( !key.computeNode ) + if( !m_computeNode ) { throw IECore::Exception( "Plug has no ComputeNode." ); } // Cast is ok - see comment above. - key.computeNode->compute( const_cast( key.plug ), context() ); + m_computeNode->compute( const_cast( valuePlug ), context() ); } // The calls above should cause setValue() to be called on the result plug, which in // turn will call ValuePlug::setObjectValue(), which will then store the result in @@ -743,6 +664,7 @@ class ValuePlug::ComputeProcess : public Process { throw IECore::Exception( "Compute did not set plug value." ); } + return m_result; } catch( ... ) { @@ -750,58 +672,25 @@ class ValuePlug::ComputeProcess : public Process } } - static IECore::ConstObjectPtr cacheGetter( const ComputeProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) - { - // Canceller will be passed to `ComputeNode::hash()` implicitly - // via the context. - assert( canceller == Context::current()->canceller() ); - IECore::ConstObjectPtr result; - switch( key.cachePolicy ) - { - case CachePolicy::Standard : - { - ComputeProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskCollaboration : - { - ComputeProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskIsolation : - { - tbb::this_task_arena::isolate( - [&result, &key] { - ComputeProcess process( key ); - result = process.m_result; - } - ); - break; - } - default : - // Should not get here, because these cases are - // dealt with directly in `ComputeProcess::value()`. - assert( false ); - break; - } + using ResultType = IECore::ConstObjectPtr; + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; - cost = result->memoryUsage(); - return result; + static size_t cacheCostFunction( const IECore::ConstObjectPtr &v ) + { + return v->memoryUsage(); } - // A cache mapping from ValuePlug::hash() to the result of the previous computation - // for that hash. This allows us to cache results for faster repeat evaluation - using Cache = IECorePreview::LRUCache; - static Cache g_cache; + private : + const ComputeNode *m_computeNode; IECore::ConstObjectPtr m_result; }; const IECore::InternedString ValuePlug::ComputeProcess::staticType( ValuePlug::computeProcessType() ); -ValuePlug::ComputeProcess::Cache ValuePlug::ComputeProcess::g_cache( cacheGetter, 1024 * 1024 * 1024 * 1, ValuePlug::ComputeProcess::Cache::RemovalCallback(), /* cacheErrors = */ false ); // 1 gig +// Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. +ValuePlug::ComputeProcess::CacheType ValuePlug::ComputeProcess::g_cache( CacheType::GetterFunction(), 1024 * 1024 * 1024 * 1, CacheType::RemovalCallback(), /* cacheErrors = */ false ); // 1 gig ////////////////////////////////////////////////////////////////////////// // SetValueAction implementation From 070085e15385b8d6065874e8c6eb0bf0af178fc1 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 28 Nov 2022 14:32:38 +0000 Subject: [PATCH 05/11] GafferTest, GafferSceneTest : Exercise hash aliasing deadlock avoidance All these tests would hang if ValuePlug wasn't now using `Process::acquireCollaborativeResult()`. --- python/GafferSceneTest/ParentTest.py | 308 +++++++++++++++++++++++++++ python/GafferTest/ExpressionTest.py | 55 +++++ python/GafferTest/LoopTest.py | 25 +++ 3 files changed, 388 insertions(+) diff --git a/python/GafferSceneTest/ParentTest.py b/python/GafferSceneTest/ParentTest.py index be8031ee210..eb46adb1b64 100644 --- a/python/GafferSceneTest/ParentTest.py +++ b/python/GafferSceneTest/ParentTest.py @@ -36,6 +36,7 @@ import pathlib import inspect +import functools import imath @@ -937,5 +938,312 @@ def testInvalidDestinationNames( self ) : with self.assertRaisesRegex( Gaffer.ProcessException, r".*Invalid destination `/woops/a\*`. Name `a\*` is invalid \(because it contains filter wildcards\)" ) : parent["out"].childNames( "/" ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testChainedNodesWithIdenticalBranches( self ) : + + # infiniteScene + # | + # parent2 + # | + # parent1 + + # Trick to make a large scene without needing a cache file + # and without using a BranchCreator. This scene is infinite + # but very cheap to compute. + + infiniteScene = GafferScene.ScenePlug() + infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + # Filter that will search the scene to a fixed depth, but + # never find anything. + + nothingFilter = GafferScene.PathFilter() + nothingFilter["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*/*/*/*/thisDoesntExist" ] ) ) + + # Two Parent nodes one after another, using the same filter. + # These will generate the same (empty) set of branches, and + # therefore have the same hash. + + parent2 = GafferScene.Parent() + parent2["in"].setInput( infiniteScene ) + parent2["filter"].setInput( nothingFilter["out"] ) + + parent1 = GafferScene.Parent() + parent1["in"].setInput( parent2["out"] ) + parent1["filter"].setInput( nothingFilter["out"] ) + + self.assertEqual( parent1["__branches"].hash(), parent2["__branches"].hash() ) + + # Simulate the effects of a previous computation being evicted + # from the cache. + + parent1["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # We are now a situation where the hash for `parent1.__branches` is + # cached, but the value isn't. This is significant, because it means + # that in the next step, the downstream compute for `parent1.__branches` + # starts _before_ the upstream one for `parent2.__branches`. If the hash + # wasn't cached, then the hash for `parent1` would trigger + # an upstream compute for `parent2.__branches` first. + + # Trigger scene generation. We can't use `traverseScene()` because the + # scene is infinite, so we use `matchingPaths()` to generate up to a fixed + # depth. The key effect here is that lots of threads are indirectly pulling on + # `parent1.__branches`, triggering task collaboration. + + with Gaffer.PerformanceMonitor() as monitor : + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), parent1["out"], paths ) + + # We only expect to see a single hash/compute for `__branches` on each + # node. A previous bug meant that this was not the case, and thousands + # of unnecessary evaluations of `parent2.__branches` could occur. + + self.assertEqual( monitor.plugStatistics( parent1["__branches"] ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( parent2["__branches"] ).computeCount, 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testChainedNodesWithIdenticalBranchesAndIntermediateCompute( self ) : + + # As for `testChainedNodesWithIdenticalBranches` above, but with a + # _different_ Parent node inserted between the identical two. + # + # infiniteScene + # | + # parentA2 + # | + # parentB + # | + # parentA1 + # + + infiniteScene = GafferScene.ScenePlug() + infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + filterA = GafferScene.PathFilter() + filterA["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/thisDoesntExist" ] ) ) + + somethingFilter = GafferScene.PathFilter() + somethingFilter["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*" ] ) ) + + parentA2 = GafferScene.Parent() + parentA2["in"].setInput( infiniteScene ) + parentA2["filter"].setInput( filterA["out"] ) + + parentB = GafferScene.Parent() + parentB["in"].setInput( parentA2["out"] ) + parentB["filter"].setInput( somethingFilter["out"] ) + + parentA1 = GafferScene.Parent() + parentA1["in"].setInput( parentB["out"] ) + parentA1["filter"].setInput( filterA["out"] ) + + self.assertEqual( parentA1["__branches"].hash(), parentA2["__branches"].hash() ) + + # Simulate the effects of a previous computation being evicted + # from the cache. + + parentA1["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # We are now a situation where the hash for `parentA1.__branches` is + # cached, but the value isn't. This means that the next call will + # trigger the compute for `parentA1.__branches` _first_. In turn, that + # will trigger a compute on `parentB.__branches`, and _that_ will + # recurse to `parentA2.__branches`. There are now two paths for worker + # threads to need the result from `parentA2` : + # + # 1. The worker enters the task arena for `parentA1`, then enters the + # nested task arena for `parentB`, and finally tries to get the result + # from `parentA2`. + # 2. The worker enters the task arena for `parentB` directly, takes a task, then + # tries to get the result from `parentA1`. + # + # In the first case we were able to detect the recursion from `parentA1` to + # `parentA2` using a `task_arena_observer`. But in the second case, the worker + # enters the `parentB` arena directly, and isn't considered to be in the `parentA1` + # arena by `task_arena_observer`. So this worker tries to do task collaboration + # on A, and is therefore waiting on A to finish. But A can't finish until B finishes + # on the first worker thread. And B can't finish on the first worker thread because + # it is waiting on a task that is trapped on the second worker thread. Deadlock! + # + # This deficiency in using `task_arena_observer` to detect recursion lead + # us to develop a different method, one that avoids the deadlock + # that was being triggered by the call below. + + parentA1["__branches"].getValue() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testSplitNodesWithIdenticalBranchesAndIntermediateCompute( self ) : + + # As for `testChainedNodesWithIdenticalBranches` above, but with node + # `parentC` branching off in parallel with `parentA1`. This branch + # arrives at `parentA2` without having `parentA1` in the history. This + # is similar to the case above - it is another mechanism whereby B is + # needed before A on one thread in particular. + # + # infiniteScene + # | + # parentA2 + # | + # parentB + # / \ + # parentA1 parentC + + script = Gaffer.ScriptNode() + + script["infiniteScene"] = GafferScene.ScenePlug() + script["infiniteScene"]["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + script["filterA"] = GafferScene.PathFilter() + script["filterA"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*thisDoesntExist" ] ) ) + + script["filterB"] = GafferScene.PathFilter() + script["filterB"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/one" ] ) ) + + script["filterC"] = GafferScene.PathFilter() + script["filterC"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/two" ] ) ) + + script["parentA2"] = GafferScene.Parent() + script["parentA2"]["in"].setInput( script["infiniteScene"] ) + script["parentA2"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentB"] = GafferScene.Parent() + script["parentB"]["in"].setInput( script["parentA2"]["out"] ) + script["parentB"]["filter"].setInput( script["filterB"]["out"] ) + + script["parentA1"] = GafferScene.Parent() + script["parentA1"]["in"].setInput( script["parentB"]["out"] ) + script["parentA1"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentC"] = GafferScene.Parent() + script["parentC"]["in"].setInput( script["parentB"]["out"] ) + script["parentC"]["filter"].setInput( script["filterC"]["out"] ) + + self.assertEqual( script["parentA1"]["__branches"].hash(), script["parentA2"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentB"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + self.assertNotEqual( script["parentB"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + + # Repeat this one, because it is more sensitive to specific task timings. + for i in range( 0, 100 ) : + + # Simulate the effects of a previous computation being evicted + # from the cache. + + script["parentA1"]["__branches"].getValue() + script["parentC"]["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # Trigger a bunch of parallel computes on both `parentA1` and `parentC`. + + def evaluateScene( scene ) : + + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), scene, paths ) + + parentA1Task = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentA1"]["out"], functools.partial( evaluateScene, script["parentA1"]["out"] ) ) + parentCTask = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentC"]["out"], functools.partial( evaluateScene, script["parentC"]["out"] ) ) + + parentA1Task.wait() + parentCTask.wait() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testDependencyPropagationThroughIntermediateCompute( self ) : + + # As for `testSplitNodesWithIdenticalBranchesAndIntermediateCompute` above, but + # with an additional task collaboration between `parentB` and `parentA2`. This + # means that we need to track the dependency from `parentB` to `parentA2` through + # an intermediate node. + # + # infiniteScene + # | + # parentA2 + # | + # parentI + # | + # parentB + # / \ + # parentA1 parentC + + script = Gaffer.ScriptNode() + + script["infiniteScene"] = GafferScene.ScenePlug() + script["infiniteScene"]["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + script["filterA"] = GafferScene.PathFilter() + script["filterA"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*thisDoesntExist" ] ) ) + + script["filterB"] = GafferScene.PathFilter() + script["filterB"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/one" ] ) ) + + script["filterC"] = GafferScene.PathFilter() + script["filterC"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/two" ] ) ) + + script["filterI"] = GafferScene.PathFilter() + script["filterI"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*/*two" ] ) ) + + script["parentA2"] = GafferScene.Parent() + script["parentA2"]["in"].setInput( script["infiniteScene"] ) + script["parentA2"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentI"] = GafferScene.Parent() + script["parentI"]["in"].setInput( script["parentA2"]["out"] ) + script["parentI"]["filter"].setInput( script["filterI"]["out"] ) + + script["parentB"] = GafferScene.Parent() + script["parentB"]["in"].setInput( script["parentI"]["out"] ) + script["parentB"]["filter"].setInput( script["filterB"]["out"] ) + + script["parentA1"] = GafferScene.Parent() + script["parentA1"]["in"].setInput( script["parentB"]["out"] ) + script["parentA1"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentC"] = GafferScene.Parent() + script["parentC"]["in"].setInput( script["parentB"]["out"] ) + script["parentC"]["filter"].setInput( script["filterC"]["out"] ) + + self.assertEqual( script["parentA1"]["__branches"].hash(), script["parentA2"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentB"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + self.assertNotEqual( script["parentB"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + + # Repeat this one, because it is more sensitive to specific task timings. + for i in range( 0, 100 ) : + + # Simulate the effects of a previous computation being evicted + # from the cache. + + script["parentA1"]["__branches"].getValue() + script["parentC"]["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # Trigger a bunch of parallel computes on both `parentA1` and + # `parentC`. The sequence of computes we're concerned about is this : + # + # 1. `parentC` + # 2. `parentB` + # 3. `parentI` + # 4. `parentA1` + # 5. `parentB` (collaboration from `parentA1`) + # 6. `parentA2` (mustn't collaborate with `parentA1`) + # + # This means realising that `parentA2` is being waited on by + # `parentA1`, even though `parentI` started computing _before_ + # collaboration on `parentB` started. So we can't simply track + # dependents up the chain at the point each process starts. + + def evaluateScene( scene ) : + + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), scene, paths ) + + parentCTask = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentC"]["out"], functools.partial( evaluateScene, script["parentC"]["out"] ) ) + parentA1Task = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentA1"]["out"], functools.partial( evaluateScene, script["parentA1"]["out"] ) ) + + parentCTask.wait() + parentA1Task.wait() + if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/ExpressionTest.py b/python/GafferTest/ExpressionTest.py index 76dc9ebd2b0..3e08fa1aaa3 100644 --- a/python/GafferTest/ExpressionTest.py +++ b/python/GafferTest/ExpressionTest.py @@ -1650,5 +1650,60 @@ def testPathForStringPlug( self ) : self.assertEqual( script["node"]["in"].getValue(), pathlib.Path.cwd().as_posix() ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testHashAliasing( self ) : + + # p1 + # | + # e1 + # | + # p2 + # | + # e2 + # | + # p3 + # | + # e3 (aliases the hash from e1) + # | + # p4 + + script = Gaffer.ScriptNode() + + script["n"] = Gaffer.Node() + script["n"]["user"]["p1"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p2"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p3"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p4"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + + script["e1"] = Gaffer.Expression() + script["e1"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p2"] = parent["n"]["user"]["p1"]""" ) + + script["e2"] = Gaffer.Expression() + script["e2"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p3"] = parent["n"]["user"]["p2"] and ''""" ) + + script["e3"] = Gaffer.Expression() + script["e3"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p4"] = parent["n"]["user"]["p3"]""" ) + + # Because the input StringPlugs hash by value (see de8ab79d6f958cef3b80954798f8083a346945a7), + # and the expression is stored in an internalised form that omits the names of the source + # and destination plugs, the hashes for `e3` and `e1`` are identical, even though one depends + # on the other. + self.assertEqual( script["e1"]["__execute"].hash(), script["e3"]["__execute"].hash() ) + # But the hash for `e2` is different, because it has different expression source code + # (even though it will generate the same value). + self.assertNotEqual( script["e2"]["__execute"].hash(), script["e1"]["__execute"].hash() ) + + # Get the value for `p4`, which will actually compute and cache `p2` first due to the hash + # computation done before the compute. So we never actually do a compute on `p4`, as the value + # for that comes from the cache. + script["n"]["user"]["p4"].getValue() + # Simulate a cache eviction, so we have the hash cached, but not the value. + Gaffer.ValuePlug.clearCache() + + # Now, getting the value of `p4` will trigger an immediate compute for `e3`, which will + # recurse to an upstream compute for `e1`, which has the same hash. If we don't have a + # mechanism for handling it, this will deadlock. + script["n"]["user"]["p4"].getValue() + if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/LoopTest.py b/python/GafferTest/LoopTest.py index 33835f0c6a0..0cf4d975f19 100644 --- a/python/GafferTest/LoopTest.py +++ b/python/GafferTest/LoopTest.py @@ -282,5 +282,30 @@ def plugDirtied( plug ) : for plug, value in valuesWhenDirtied.items() : self.assertEqual( plugValue( plug ), value ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } ) + def testHashAliasingDeadlock( self ) : + + script = Gaffer.ScriptNode() + + script["loop"] = Gaffer.Loop() + script["loop"].setup( Gaffer.StringPlug() ) + + # Dumb expression that just sets the value for the next iteration to + # the value from the previous iteration. Because of de8ab79d6f958cef3b80954798f8083a346945a7, + # the hash for the expression output is identical for every iteration of + # the loop, even though the context differs. + script["expression"] = Gaffer.Expression() + script["expression"].setExpression( """parent["loop"]["next"] = parent["loop"]["previous"]""" ) + + # Get the result of the loop. This actually computes the _first_ iteration of the loop first, + # while computing the hash of the result, and reuses the result for every other loop iteration. + script["loop"]["out"].getValue() + # Simulate cache eviction by clearing the compute cache. + Gaffer.ValuePlug.clearCache() + # Get the value again. Now, because the hash is still cached, this will first start the + # compute for the _last_ iteration. This leads to a recursive compute, which can cause deadlock + # if not handled appropriately. + script["loop"]["out"].getValue() + if __name__ == "__main__": unittest.main() From 8ad8716c57bf6871ed7ab52db807553d026c6d29 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 9 Oct 2023 12:38:38 +0100 Subject: [PATCH 06/11] ValuePlug : Add `Default` CachePolicy and deprecate others Our rationale is as follows : - Standard has no right to exist - if a compute is slow then it should be multithreaded and TaskCollaboration should be used. - TaskIsolation has no right to exist either. If a compute is heavy enough to use TBB then it's essential that we can collaborate on it. And since we're no longer using a `task_arena` to perform collaboration, we have less concern about potential overhead. - Legacy is actually a good policy for lightweight computes. True, it can mean redundant identical computes being performed concurrently. But this is better than making one thread wait for the other because it allows both threads to collaborate on an upstream TaskCollaboration. - Changing the behaviour of the Standard policy would be a breaking change, so Default is introduced as the new "blessed" name for Legacy. --- Changes.md | 1 + include/Gaffer/ValuePlug.h | 50 ++++++++++---------------- include/GafferScene/ObjectProcessor.h | 4 +-- python/GafferTest/ValuePlugTest.py | 2 +- src/Gaffer/ComputeNode.cpp | 6 ++-- src/Gaffer/Expression.cpp | 4 --- src/Gaffer/ValuePlug.cpp | 6 ++-- src/GafferModule/ExpressionBinding.cpp | 4 +-- src/GafferModule/ValuePlugBinding.cpp | 1 + src/GafferOSL/OSLExpressionEngine.cpp | 2 +- src/GafferScene/ObjectProcessor.cpp | 2 +- src/GafferScene/SceneReader.cpp | 6 ++-- 12 files changed, 36 insertions(+), 52 deletions(-) diff --git a/Changes.md b/Changes.md index df6c120bc6c..a0f9179b5e8 100644 --- a/Changes.md +++ b/Changes.md @@ -27,6 +27,7 @@ API --- - Process : Added `acquireCollaborativeResult()` method, providing an improved mechanism for multiple threads to collaborate on TBB tasks spawned by a single process they all depend on. +- ValuePlug : Added `Default` CachePolicy and deprecated `Standard`, `TaskIsolation` and `Legacy` policies. 1.3.5.0 (relative to 1.3.4.0) ======= diff --git a/include/Gaffer/ValuePlug.h b/include/Gaffer/ValuePlug.h index 4913e88e953..706ea41f349 100644 --- a/include/Gaffer/ValuePlug.h +++ b/include/Gaffer/ValuePlug.h @@ -119,41 +119,29 @@ class GAFFER_API ValuePlug : public Plug /// and hash for output plugs. enum class CachePolicy { - /// No caching is performed. Suitable for - /// extremely quick processes. Also useful - /// to avoid double-counting of cache memory when - /// a compute always returns a sub-object of another - /// cache entry. + /// No caching is performed. Suitable for extremely quick processes. + /// Also useful to avoid double-counting of cache memory when a + /// compute always returns a sub-object of another cache entry. Uncached, - /// Suitable for regular processes that don't spawn - /// TBB tasks. It is essential that any task-spawning - /// processes use one of the dedicated policies below. - /// \todo It isn't actually clear that the locking of the - /// Standard policy is an improvement over the non-locked - /// Legacy policy. Locking on a downstream Standard - /// compute might prevent multiple threads from participating - /// in an upstream TaskCollaboration. And for small computes - /// that are unlikely to be needed by multiple threads, - /// we may well prefer to avoid the contention. Note that - /// many scene computes may fit this category, as every - /// non-filtered location is implemented as a very cheap - /// pass-through compute. There's also a decent argument - /// that any non-trivial amount of work should be using TBB, - /// so it would be a mistake to do anything expensive with - /// a Standard policy anyway. + /// Deprecated synonym for TaskCollaboration (for + /// `computeCachePolicy()`) and Default (for `hashCachePolicy()`). + /// Will be removed in a future release. Standard, - /// Suitable for processes that spawn TBB tasks. - /// Threads waiting for the same result will collaborate - /// to perform tasks together until the work is complete. + /// Must be used for processes that spawn TBB tasks. Results are + /// stored in a global cache, and threads waiting for the same + /// result will collaborate to perform tasks together until the work + /// is complete. TaskCollaboration, - /// Suitable for processes that spawn TBB tasks. Threads - /// waiting for an in-progress compute will block until - /// it is complete. In theory this is inferior to TaskCollaboration, - /// but due to TBB overhead it may be preferable for small - /// but frequent computes. + /// Deprecated synonym for TaskCollaboration. Will be removed in a + /// future release. TaskIsolation, - /// Legacy policy, to be removed. - Legacy + /// Suitable for relatively lightweight processes that could benefit + /// from caching, but do not spawn TBB tasks, and are unlikely to be + /// required from multiple threads concurrently. + Default, + /// Deprecated synonym for Default. Will be removed in a future + /// release. + Legacy = Default }; /// @name Cache management diff --git a/include/GafferScene/ObjectProcessor.h b/include/GafferScene/ObjectProcessor.h index 170859e993b..99b0b6e1e65 100644 --- a/include/GafferScene/ObjectProcessor.h +++ b/include/GafferScene/ObjectProcessor.h @@ -79,8 +79,8 @@ class GAFFERSCENE_API ObjectProcessor : public FilteredSceneProcessor virtual void hashProcessedObject( const ScenePath &path, const Gaffer::Context *context, IECore::MurmurHash &h ) const = 0; /// Must be implemented by derived classes to return the processed object. virtual IECore::ConstObjectPtr computeProcessedObject( const ScenePath &path, const Gaffer::Context *context, const IECore::Object *inputObject ) const = 0; - /// Must be implemented to return an appropriate policy if `computeProcessedObject()` spawns - /// TBB tasks. The default implementation returns `ValuePlug::CachePolicy::Legacy`. + /// Must be implemented to return `ValuePlug::CachePolicy::TaskCollaboration` if `computeProcessedObject()` spawns + /// TBB tasks. The default implementation returns `ValuePlug::CachePolicy::Default`. virtual Gaffer::ValuePlug::CachePolicy processedObjectComputeCachePolicy() const; void hash( const Gaffer::ValuePlug *output, const Gaffer::Context *context, IECore::MurmurHash &h ) const override; diff --git a/python/GafferTest/ValuePlugTest.py b/python/GafferTest/ValuePlugTest.py index cd913ee67bc..561dcb019fc 100644 --- a/python/GafferTest/ValuePlugTest.py +++ b/python/GafferTest/ValuePlugTest.py @@ -951,7 +951,7 @@ def computeCachePolicy( self, output ) : IECore.registerRunTimeTyped( InfiniteLoop ) for cachePolicy in ( - Gaffer.ValuePlug.CachePolicy.Legacy, + Gaffer.ValuePlug.CachePolicy.Default, Gaffer.ValuePlug.CachePolicy.Standard, Gaffer.ValuePlug.CachePolicy.TaskIsolation, # Omitting TaskCollaboration, because if our second compute joins as diff --git a/src/Gaffer/ComputeNode.cpp b/src/Gaffer/ComputeNode.cpp index 1816a0d013c..92152840a79 100644 --- a/src/Gaffer/ComputeNode.cpp +++ b/src/Gaffer/ComputeNode.cpp @@ -80,7 +80,7 @@ void ComputeNode::compute( ValuePlug *output, const Context *context ) const ValuePlug::CachePolicy ComputeNode::hashCachePolicy( const ValuePlug *output ) const { - return ValuePlug::CachePolicy::Standard; + return ValuePlug::CachePolicy::Default; } ValuePlug::CachePolicy ComputeNode::computeCachePolicy( const ValuePlug *output ) const @@ -89,7 +89,5 @@ ValuePlug::CachePolicy ComputeNode::computeCachePolicy( const ValuePlug *output { return ValuePlug::CachePolicy::Uncached; } - /// \todo Return `Standard` once all task-spawning computes are - /// known to be declaring an appropriate policy. - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } diff --git a/src/Gaffer/Expression.cpp b/src/Gaffer/Expression.cpp index d2580f5f1e7..b605cb3330f 100644 --- a/src/Gaffer/Expression.cpp +++ b/src/Gaffer/Expression.cpp @@ -336,10 +336,6 @@ Gaffer::ValuePlug::CachePolicy Expression::computeCachePolicy( const Gaffer::Val { return m_engine->executeCachePolicy(); } - else - { - return ValuePlug::CachePolicy::Legacy; - } } return ComputeNode::computeCachePolicy( output ); } diff --git a/src/Gaffer/ValuePlug.cpp b/src/Gaffer/ValuePlug.cpp index 5d32a087781..ad2087c0693 100644 --- a/src/Gaffer/ValuePlug.cpp +++ b/src/Gaffer/ValuePlug.cpp @@ -266,7 +266,7 @@ class ValuePlug::HashProcess : public Process // No value in local cache, so either compute it directly or get it via // the global cache if it's expensive enough to warrant collaboration. IECore::MurmurHash result; - if( cachePolicy == CachePolicy::Legacy || cachePolicy == CachePolicy::Standard ) + if( cachePolicy == CachePolicy::Default || cachePolicy == CachePolicy::Standard ) { result = HashProcess( p, plug, computeNode ).run(); } @@ -538,7 +538,7 @@ class ValuePlug::ComputeProcess : public Process { // Type conversion will be implemented by `setFrom()`. // \todo Determine if caching is actually worthwhile for this. - cachePolicy = CachePolicy::Legacy; + cachePolicy = CachePolicy::Default; } else if( computeNode ) { @@ -580,7 +580,7 @@ class ValuePlug::ComputeProcess : public Process // The value isn't in the cache, so we'll need to compute it, // taking account of the cache policy. - if( cachePolicy == CachePolicy::Legacy ) + if( cachePolicy == CachePolicy::Default ) { // Do the compute ourselves, without worrying if the same // compute is in flight elsewhere. We assume the compute is diff --git a/src/GafferModule/ExpressionBinding.cpp b/src/GafferModule/ExpressionBinding.cpp index 6ae88c7457f..4da38fa1173 100644 --- a/src/GafferModule/ExpressionBinding.cpp +++ b/src/GafferModule/ExpressionBinding.cpp @@ -129,9 +129,9 @@ ValuePlug::CachePolicy defaultExecuteCachePolicy() { return ValuePlug::CachePolicy::TaskIsolation; } - else if( !strcmp( cp, "Legacy" ) ) + else if( !strcmp( cp, "Legacy" ) || !strcmp( cp, "Default" ) ) { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } else { diff --git a/src/GafferModule/ValuePlugBinding.cpp b/src/GafferModule/ValuePlugBinding.cpp index f3fdef36bea..ef8d4af0847 100644 --- a/src/GafferModule/ValuePlugBinding.cpp +++ b/src/GafferModule/ValuePlugBinding.cpp @@ -160,6 +160,7 @@ void GafferModule::bindValuePlug() .value( "Standard", ValuePlug::CachePolicy::Standard ) .value( "TaskCollaboration", ValuePlug::CachePolicy::TaskCollaboration ) .value( "TaskIsolation", ValuePlug::CachePolicy::TaskIsolation ) + .value( "Default", ValuePlug::CachePolicy::Default ) .value( "Legacy", ValuePlug::CachePolicy::Legacy ) ; diff --git a/src/GafferOSL/OSLExpressionEngine.cpp b/src/GafferOSL/OSLExpressionEngine.cpp index 597b7ad87c6..5d2fc17ee94 100644 --- a/src/GafferOSL/OSLExpressionEngine.cpp +++ b/src/GafferOSL/OSLExpressionEngine.cpp @@ -475,7 +475,7 @@ class OSLExpressionEngine : public Gaffer::Expression::Engine ValuePlug::CachePolicy executeCachePolicy() const override { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } void apply( Gaffer::ValuePlug *proxyOutput, const Gaffer::ValuePlug *topLevelProxyOutput, const IECore::Object *value ) const override diff --git a/src/GafferScene/ObjectProcessor.cpp b/src/GafferScene/ObjectProcessor.cpp index 120cabd2aee..568a55ca020 100644 --- a/src/GafferScene/ObjectProcessor.cpp +++ b/src/GafferScene/ObjectProcessor.cpp @@ -158,7 +158,7 @@ void ObjectProcessor::hashProcessedObject( const ScenePath &path, const Gaffer:: Gaffer::ValuePlug::CachePolicy ObjectProcessor::processedObjectComputeCachePolicy() const { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } void ObjectProcessor::hashObject( const ScenePath &path, const Gaffer::Context *context, const ScenePlug *parent, IECore::MurmurHash &h ) const diff --git a/src/GafferScene/SceneReader.cpp b/src/GafferScene/SceneReader.cpp index 1c2110a3a02..a52fe482134 100644 --- a/src/GafferScene/SceneReader.cpp +++ b/src/GafferScene/SceneReader.cpp @@ -135,9 +135,9 @@ ValuePlug::CachePolicy cachePolicyFromEnv( const char *name ) { return ValuePlug::CachePolicy::TaskIsolation; } - else if( !strcmp( cp, "Legacy" ) ) + else if( !strcmp( cp, "Legacy" ) || !strcmp( cp, "Default" ) ) { - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } else { @@ -148,7 +148,7 @@ ValuePlug::CachePolicy cachePolicyFromEnv( const char *name ) } } - return ValuePlug::CachePolicy::Legacy; + return ValuePlug::CachePolicy::Default; } const ValuePlug::CachePolicy g_objectCachePolicy = cachePolicyFromEnv( "GAFFERSCENE_SCENEREADER_OBJECT_CACHEPOLICY" ); From 7add87faaf46366c1d4f830f70269c822e08118c Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 9 Oct 2023 17:08:58 +0100 Subject: [PATCH 07/11] ValuePlugTest/LRUCache : Move comment to more relevant place This also disables the test for `Standard` and `TaskIsolation` policies because they are now the same as `TaskCollaboration`. This does highlight the fact that we now have the possibility of new stalls in the UI, but only for cache policies that aren't ideal anyway. We need to focus our energies in to fixing the shared cancellation for TaskCollaboration, and making as many processes as possible us TaskCollaboration. --- .../Gaffer/Private/IECorePreview/LRUCache.inl | 13 +--------- python/GafferTest/ValuePlugTest.py | 25 ++++++++++++++++--- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/LRUCache.inl b/include/Gaffer/Private/IECorePreview/LRUCache.inl index 86895fb2061..67286c27258 100644 --- a/include/Gaffer/Private/IECorePreview/LRUCache.inl +++ b/include/Gaffer/Private/IECorePreview/LRUCache.inl @@ -836,18 +836,7 @@ class TaskParallel // The only canceller being checked at that // point will be the one passed to the // `LRUCache::get()` call that we work in - // service of. This isn't ideal, as it can cause - // UI stalls if one UI element is waiting to - // cancel an operation, but it's tasks have been - // "captured" by collaboration on a compute - // started by another UI element (which hasn't - // requested cancellation). One alternative is - // that we would only accept work if our - // canceller matches the one in use by the - // original caller. This would rule out - // collaboration between UI elements, but would - // still allow diamond dependencies in graph - // evaluation to use collaboration. + // service of. return (!canceller || !canceller->cancelled()); } ); diff --git a/python/GafferTest/ValuePlugTest.py b/python/GafferTest/ValuePlugTest.py index 561dcb019fc..95b245c9900 100644 --- a/python/GafferTest/ValuePlugTest.py +++ b/python/GafferTest/ValuePlugTest.py @@ -952,11 +952,28 @@ def computeCachePolicy( self, output ) : for cachePolicy in ( Gaffer.ValuePlug.CachePolicy.Default, - Gaffer.ValuePlug.CachePolicy.Standard, - Gaffer.ValuePlug.CachePolicy.TaskIsolation, # Omitting TaskCollaboration, because if our second compute joins as - # a worker, there is currently no way we can recall it. See comments - # in `LRUCachePolicy.TaskParallel.Handle.acquire`. + # a worker, there is currently no way we can recall it. This is not + # ideal as it means the UI stalls if one UI element is waiting to + # cancel an operation, but its tasks have been "captured" by + # collaboration on a compute started by another UI element (which + # hasn't requested cancellation). + # + ## \todo Improve this situation. Possibilities include : + # + # 1. Only collaborating on work if our canceller matches the one used + # by the original caller. This would cause redundant computes in + # the UI though. + # 2. Finding a way to join the cancellers so that cancellation on the + # second one triggers cancellation on the first. + # 3. Finding a way for the second canceller to trigger a call to + # `task_group::cancel()`. + # 4. Getting the two UI elements to use the same canceller in the first + # place. Perhaps this is the most promising avenue? Making their + # combined fates explicit in the API might not be a bad thing, and if + # we combined this with #1, a third UI element would have the option + # of doing its own compute with its own canceller, making it safe + # from combined cancellation. ) : script = Gaffer.ScriptNode() From fcdce4c45d35a979a84403b545aad1cfe7d07800 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Fri, 6 Oct 2023 12:32:26 +0100 Subject: [PATCH 08/11] Process : Replace `task_arena` with `isolated_task_group` This provides some substantial performance improvements : ``` - testFanOutPerformance (GafferTest.CollectTest.CollectTest) : was 0.83s now 0.44s (47% reduction) - testLoop (GafferTest.CollectTest.CollectTest) : was 1.64s now 0.09s (95% reduction) - testDeepTreePerformance (GafferTest.ProcessTest.ProcessTest) : was 15.86s now 0.04s (100% reduction) ``` Full disclosure : it also shows a roughly 30% increase in runtime for `ProcessTest.testCollaborationTransitionPerformance`, and I'm not sure why. But that's small beans compared to the orders of magnitude in `testLoop` and `testDeepTreePerformance`. Note that `isolated_task_group` is actually a TBB preview feature, so technically could be taken away from us in the future. But it is still present in the latest TBB release (2021.10.0) and is still in the latest source code at https://github.com/oneapi-src/oneTBB/blob/master/include/oneapi/tbb/task_group.h. It is also [used in Embree](https://github.com/embree/embree/blob/master/man/man3/rtcCommitScene.4embree4#L34-L38) where the documentation cautions against the alternative of using a `task_arena` "due to its high runtime overhead". So I think we should be OK. If it ever disappears, alternatives include : - Rolling our own, which should be doable provided that TBB continues to expose `isolate_within_arena()`, or might also be doable using public API by constructing a regular `task_group` from inside `this_task_arena::isolate()`. - Using `tbb::collaborative_call_once()`, which is not available in our current TBB version but is available in future versions. --- .../Gaffer/Private/IECorePreview/TaskMutex.h | 1 + include/Gaffer/Process.inl | 97 +++++++++---------- 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/TaskMutex.h b/include/Gaffer/Private/IECorePreview/TaskMutex.h index da9474fa73d..db91edf2e3a 100644 --- a/include/Gaffer/Private/IECorePreview/TaskMutex.h +++ b/include/Gaffer/Private/IECorePreview/TaskMutex.h @@ -44,6 +44,7 @@ #include "tbb/spin_rw_mutex.h" #include "tbb/task_arena.h" +#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #include "tbb/task_group.h" // Enable preview feature that allows us to construct a `task_scheduler_observer` // for a specific `task_arena`. This feature becomes officially supported in diff --git a/include/Gaffer/Process.inl b/include/Gaffer/Process.inl index 19d67cccbbc..25f81a60767 100644 --- a/include/Gaffer/Process.inl +++ b/include/Gaffer/Process.inl @@ -36,7 +36,7 @@ #include "tbb/concurrent_hash_map.h" #include "tbb/spin_mutex.h" -#include "tbb/task_arena.h" +#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #include "tbb/task_group.h" #include @@ -239,10 +239,9 @@ class GAFFER_API Process::Collaboration : public IECore::RefCounted IE_CORE_DECLAREMEMBERPTR( Collaboration ); - // Arena and task group used to allow waiting threads to participate - // in collaborative work. - tbb::task_arena arena; - tbb::task_group taskGroup; + // Task group used to allow waiting threads to participate in + // collaborative work. + tbb::isolated_task_group taskGroup; using Set = std::unordered_set; // Collaborations depending directly on this one. @@ -329,8 +328,8 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( } // We've found an in-flight process we can wait on without causing - // deadlock. Join its `task_arena` and wait on the result, so we get to - // work on any TBB tasks it has created. + // deadlock. Wait on the result, so we get to work on any TBB tasks it + // has created. // // > Note : We need to own a reference to `collaboration` because the // thread that created it may drop its own reference as soon as we call @@ -345,9 +344,7 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( CollaborationTypePtr collaboration = candidate; accessor.release(); - collaboration->arena.execute( - [&]{ return collaboration->taskGroup.wait(); } - ); + collaboration->taskGroup.wait(); if( collaboration->result ) { @@ -378,49 +375,45 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( std::exception_ptr exception; - auto status = collaboration->arena.execute( + auto status = collaboration->taskGroup.run_and_wait( [&] { - return collaboration->taskGroup.run_and_wait( - [&] { - // Publish ourselves so that other threads can collaborate - // by calling `collaboration->taskGroup.wait()`. - accessor->second.push_back( collaboration ); - accessor.release(); - - try - { - ProcessType process( std::forward( args )... ); - process.m_collaboration = collaboration.get(); - collaboration->result = process.run(); - // Publish result to cache before we remove ourself from - // `g_pendingCollaborations`, so that other threads will - // be able to get the result one way or the other. - ProcessType::g_cache.setIfUncached( - cacheKey, *collaboration->result, - ProcessType::cacheCostFunction - ); - } - catch( ... ) - { - // Don't allow `task_group::wait()` to see exceptions, - // because then we'd hit a thread-safety bug in - // `tbb::task_group_context::reset()`. - exception = std::current_exception(); - } - - // Now we're done, remove `collaboration` from the pending collaborations. - [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); - assert( found ); - auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); - assert( toErase != accessor->second.end() ); - accessor->second.erase( toErase ); - if( accessor->second.empty() ) - { - CollaborationType::g_pendingCollaborations.erase( accessor ); - } - accessor.release(); - } - ); + // Publish ourselves so that other threads can collaborate + // by calling `collaboration->taskGroup.wait()`. + accessor->second.push_back( collaboration ); + accessor.release(); + + try + { + ProcessType process( std::forward( args )... ); + process.m_collaboration = collaboration.get(); + collaboration->result = process.run(); + // Publish result to cache before we remove ourself from + // `g_pendingCollaborations`, so that other threads will + // be able to get the result one way or the other. + ProcessType::g_cache.setIfUncached( + cacheKey, *collaboration->result, + ProcessType::cacheCostFunction + ); + } + catch( ... ) + { + // Don't allow `task_group::wait()` to see exceptions, + // because then we'd hit a thread-safety bug in + // `tbb::task_group_context::reset()`. + exception = std::current_exception(); + } + + // Now we're done, remove `collaboration` from the pending collaborations. + [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); + assert( found ); + auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); + assert( toErase != accessor->second.end() ); + accessor->second.erase( toErase ); + if( accessor->second.empty() ) + { + CollaborationType::g_pendingCollaborations.erase( accessor ); + } + accessor.release(); } ); From 34251d0b45d93829aa5a89d3197c817bfffa648a Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 23 Nov 2022 14:48:09 +0000 Subject: [PATCH 09/11] TaskMutex : Remove `WorkerRead` lock type This was a poor man's deadlock-avoidance scheme which is no longer needed now that ValuePlug is using `Process:acquireCollaborativeResult()`. --- .../Gaffer/Private/IECorePreview/LRUCache.inl | 24 ++-- .../Gaffer/Private/IECorePreview/TaskMutex.h | 130 +++--------------- .../IECorePreviewTest/LRUCacheTest.py | 9 +- .../IECorePreviewTest/TaskMutexTest.py | 4 - src/GafferTestModule/TaskMutexTest.cpp | 58 +------- 5 files changed, 35 insertions(+), 190 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/LRUCache.inl b/include/Gaffer/Private/IECorePreview/LRUCache.inl index 67286c27258..13ca667b77d 100644 --- a/include/Gaffer/Private/IECorePreview/LRUCache.inl +++ b/include/Gaffer/Private/IECorePreview/LRUCache.inl @@ -177,6 +177,10 @@ class Serial // always be OK to write. But we return false for recursive // calls to avoid unnecessary overhead updating the LRU list // for inner calls. + /// \todo Is this distinction worth it, and do we really need + /// to support recursion on a single item in the Serial policy? + /// This is a remnant of a more complex system that allowed recursion + /// in the TaskParallel policy, but that has since been removed. return m_it->handleCount == 1; } @@ -733,20 +737,19 @@ class TaskParallel CacheEntry &writable() { - assert( m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write ); + assert( m_itemLock.isWriter() ); return m_item->cacheEntry; } - // May return false for AcquireMode::Insert if a GetterFunction recurses. bool isWritable() const { - return m_itemLock.lockType() == Item::Mutex::ScopedLock::LockType::Write; + return m_itemLock.isWriter(); } template void execute( F &&f ) { - if( m_spawnsTasks && m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + if( m_spawnsTasks ) { // The getter function will spawn tasks. Execute // it via the TaskMutex, so that other threads trying @@ -814,16 +817,9 @@ class TaskParallel // found a pre-existing item we optimistically take // just a read lock, because it is faster when // many threads just need to read from the same - // cached item. We accept WorkerRead locks when necessary, - // to support Getter recursion. - TaskMutex::ScopedLock::LockType lockType = TaskMutex::ScopedLock::LockType::WorkerRead; - if( inserted || mode == FindWritable || mode == InsertWritable ) - { - lockType = TaskMutex::ScopedLock::LockType::Write; - } - + // cached item. const bool acquired = m_itemLock.acquireOr( - it->mutex, lockType, + it->mutex, /* write = */ inserted || mode == FindWritable || mode == InsertWritable, // Work accepter [&binLock, canceller] ( bool workAvailable ) { // Release the bin lock prior to accepting work, because @@ -844,7 +840,7 @@ class TaskParallel if( acquired ) { if( - m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Read && + !m_itemLock.isWriter() && mode == Insert && it->cacheEntry.status() == LRUCache::Uncached ) { diff --git a/include/Gaffer/Private/IECorePreview/TaskMutex.h b/include/Gaffer/Private/IECorePreview/TaskMutex.h index db91edf2e3a..1ff73c75a79 100644 --- a/include/Gaffer/Private/IECorePreview/TaskMutex.h +++ b/include/Gaffer/Private/IECorePreview/TaskMutex.h @@ -46,11 +46,6 @@ #include "tbb/task_arena.h" #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #include "tbb/task_group.h" -// Enable preview feature that allows us to construct a `task_scheduler_observer` -// for a specific `task_arena`. This feature becomes officially supported in -// Intel TBB 2019 Update 5, so it is not going to be removed. -#define TBB_PREVIEW_LOCAL_OBSERVER 1 -#include "tbb/task_scheduler_observer.h" #include #include @@ -102,6 +97,9 @@ namespace IECorePreview /// } /// // Use resource here, while lock is still held. /// ``` +/// +/// \todo Investigate `tbb::collaborative_call_once`, once VFXPlatform has moved to OneTBB. +/// It appears to provide very similar functionality. class TaskMutex : boost::noncopyable { @@ -122,7 +120,7 @@ class TaskMutex : boost::noncopyable public : ScopedLock() - : m_mutex( nullptr ), m_lockType( LockType::None ) + : m_mutex( nullptr ), m_writer( false ) { } @@ -144,9 +142,8 @@ class TaskMutex : boost::noncopyable /// work on behalf of `execute()` while waiting. void acquire( TaskMutex &mutex, bool write = true, bool acceptWork = true ) { - const LockType l = write ? LockType::Write : LockType::Read; tbb::internal::atomic_backoff backoff; - while( !acquireOr( mutex, l, [acceptWork]( bool workAvailable ){ return acceptWork; } ) ) + while( !acquireOr( mutex, write, [acceptWork]( bool workAvailable ){ return acceptWork; } ) ) { backoff.pause(); } @@ -157,8 +154,8 @@ class TaskMutex : boost::noncopyable /// temporarily releasing the lock, and false otherwise. bool upgradeToWriter() { - assert( m_mutex && (m_lockType == LockType::Read) ); - m_lockType = LockType::Write; + assert( m_mutex && !m_writer ); + m_writer = true; return m_lock.upgrade_to_writer(); } @@ -167,7 +164,7 @@ class TaskMutex : boost::noncopyable template void execute( F &&f ) { - assert( m_mutex && m_lockType == LockType::Write ); + assert( m_mutex && m_writer ); ExecutionStateMutex::scoped_lock executionStateLock( m_mutex->m_executionStateMutex ); assert( !m_mutex->m_executionState ); @@ -220,8 +217,7 @@ class TaskMutex : boost::noncopyable /// Acquires mutex or returns false. Never does TBB tasks. bool tryAcquire( TaskMutex &mutex, bool write = true ) { - const LockType l = write ? LockType::Write : LockType::Read; - return acquireOr( mutex, l, []( bool workAvailable ){ return false; } ); + return acquireOr( mutex, write, []( bool workAvailable ){ return false; } ); } /// Releases the lock. This will be done automatically @@ -230,12 +226,9 @@ class TaskMutex : boost::noncopyable void release() { assert( m_mutex ); - if( m_lockType != LockType::WorkerRead ) - { - m_lock.release(); - } + m_lock.release(); m_mutex = nullptr; - m_lockType = LockType::None; + m_writer = false; } /// Advanced API @@ -245,40 +238,20 @@ class TaskMutex : boost::noncopyable /// in Gaffer's LRUCache. They should not be considered part of the canonical /// API. - enum class LockType - { - None, - // Multiple readers may coexist. - Read, - // Only a single writer can exist at a time, and the presence - // of a writer prevents read locks being acquired. - Write, - // Artificial read lock, available only to threads performing - // TBB tasks on behalf of `execute()`. These readers are - // protected only by the original write lock held by the caller - // of `execute()`. This means the caller of `execute()` _must_ - // delay any writes until _after_ `execute()` has returned. - // A WorkerRead lock can not be upgraded via `upgradeToWriter()`. - WorkerRead, - }; - /// Tries to acquire the mutex, returning true on success. On failure, /// calls `workNotifier( bool workAvailable )`. If work is available and /// `workNotifier` returns true, then this thread will perform TBB tasks /// spawned by `execute()` until the work is complete. Returns false on /// failure regardless of whether or not work is done. template - bool acquireOr( TaskMutex &mutex, LockType lockType, WorkNotifier &&workNotifier ) + bool acquireOr( TaskMutex &mutex, bool write, WorkNotifier &&workNotifier ) { assert( !m_mutex ); - assert( m_lockType == LockType::None ); - assert( lockType != LockType::None ); - - if( m_lock.try_acquire( mutex.m_mutex, /* write = */ lockType == LockType::Write ) ) + if( m_lock.try_acquire( mutex.m_mutex, write ) ) { // Success! m_mutex = &mutex; - m_lockType = lockType == LockType::WorkerRead ? LockType::Read : lockType; + m_writer = write; return true; } @@ -287,14 +260,6 @@ class TaskMutex : boost::noncopyable // current call to `execute()`. ExecutionStateMutex::scoped_lock executionStateLock( mutex.m_executionStateMutex ); - if( lockType == LockType::WorkerRead && mutex.m_executionState && mutex.m_executionState->arenaObserver.containsThisThread() ) - { - // We're already doing work on behalf of `execute()`, so we can - // take a WorkerRead lock. - m_mutex = &mutex; - m_lockType = lockType; - return true; - } const bool workAvailable = mutex.m_executionState.get(); if( !workNotifier( workAvailable ) || !workAvailable ) @@ -314,20 +279,16 @@ class TaskMutex : boost::noncopyable return false; } - /// Returns the type of the lock currently held. If `acquireOr( WorkerRead )` - /// is called successfully, this will return `Read` for an external lock and - /// `WorkerRead` for an internal lock acquired by virtue of performing tasks - /// for `execute()`. - LockType lockType() const + bool isWriter() const { - return m_lockType; + return m_writer; } private : InternalMutex::scoped_lock m_lock; TaskMutex *m_mutex; - LockType m_lockType; + bool m_writer; }; @@ -336,64 +297,10 @@ class TaskMutex : boost::noncopyable // The actual mutex that is held by the ScopedLock. InternalMutex m_mutex; - // Tracks worker threads as they enter and exit an arena, so we can determine - // whether or not the current thread is inside the arena. We use this to detect - // recursion and allow any worker thread to obtain a recursive lock provided - // they are currently performing work in service of `ScopedLock::execute()`. - class ArenaObserver : public tbb::task_scheduler_observer - { - - public : - - ArenaObserver( tbb::task_arena &arena ) - : tbb::task_scheduler_observer( arena ) - { - observe( true ); - } - - ~ArenaObserver() override - { - observe( false ); - } - - bool containsThisThread() - { - Mutex::scoped_lock lock( m_mutex ); - return m_threadIdSet.find( std::this_thread::get_id() ) != m_threadIdSet.end(); - } - - private : - - void on_scheduler_entry( bool isWorker ) override - { - assert( !containsThisThread() ); - Mutex::scoped_lock lock( m_mutex ); - m_threadIdSet.insert( std::this_thread::get_id() ); - } - - void on_scheduler_exit( bool isWorker ) override - { - assert( containsThisThread() ); - Mutex::scoped_lock lock( m_mutex ); - m_threadIdSet.erase( std::this_thread::get_id() ); - } - - using Mutex = tbb::spin_mutex; - using ThreadIdSet = boost::container::flat_set; - Mutex m_mutex; - ThreadIdSet m_threadIdSet; - - }; - // The mechanism we use to allow waiting threads // to participate in the work done by `execute()`. struct ExecutionState : public IECore::RefCounted { - ExecutionState() - : arenaObserver( arena ) - { - } - // Work around https://bugs.llvm.org/show_bug.cgi?id=32978 ~ExecutionState() noexcept( true ) override { @@ -403,9 +310,6 @@ class TaskMutex : boost::noncopyable // waiting threads to participate in work. tbb::task_arena arena; tbb::task_group taskGroup; - // Observer used to track which threads are - // currently inside the arena. - ArenaObserver arenaObserver; }; IE_CORE_DECLAREPTR( ExecutionState ); diff --git a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py index 2a49068aa4e..b647b020d19 100644 --- a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py +++ b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py @@ -158,10 +158,6 @@ def testRecursionOnOneItemSerial( self ) : GafferTest.testLRUCacheRecursionOnOneItem( "serial" ) - def testRecursionOnOneItemTaskParallel( self ) : - - GafferTest.testLRUCacheRecursionOnOneItem( "taskParallel" ) - def testClearFromGetSerial( self ) : GafferTest.testLRUCacheClearFromGet( "serial" ) @@ -238,10 +234,7 @@ def testSetIfUncached( self ) : def testSetIfUncachedRecursion( self ) : - # `parallel` policy omitted because it doesn't support recursion. - for policy in [ "serial", "taskParallel" ] : - with self.subTest( policy = policy ) : - GafferTest.testLRUCacheSetIfUncachedRecursion( policy ) + GafferTest.testLRUCacheSetIfUncachedRecursion( "serial" ) if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/IECorePreviewTest/TaskMutexTest.py b/python/GafferTest/IECorePreviewTest/TaskMutexTest.py index bf20cb53c82..885319f7900 100644 --- a/python/GafferTest/IECorePreviewTest/TaskMutexTest.py +++ b/python/GafferTest/IECorePreviewTest/TaskMutexTest.py @@ -64,10 +64,6 @@ def testHeavyContentionWithoutWorkAcceptance( self ) : GafferTest.testTaskMutexHeavyContention( False ) - def testWorkerRecursion( self ) : - - GafferTest.testTaskMutexWorkerRecursion() - def testAcquireOr( self ) : GafferTest.testTaskMutexAcquireOr() diff --git a/src/GafferTestModule/TaskMutexTest.cpp b/src/GafferTestModule/TaskMutexTest.cpp index e394835f001..756b18f2f3a 100644 --- a/src/GafferTestModule/TaskMutexTest.cpp +++ b/src/GafferTestModule/TaskMutexTest.cpp @@ -74,12 +74,12 @@ void testTaskMutex() TaskMutex::ScopedLock lock( mutex, /* write = */ false ); gotLock.local() = true; - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Read ) + GAFFERTEST_ASSERT( !lock.isWriter() ) if( !initialised ) { lock.upgradeToWriter(); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ); + GAFFERTEST_ASSERT( lock.isWriter() ); if( !initialised ) // Check again, because upgrading to writer may lose the lock temporarily. { @@ -134,7 +134,7 @@ void testTaskMutexWithinIsolate() tbb::this_task_arena::isolate( [&mutex]() { TaskMutex::ScopedLock lock( mutex ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ); std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) ); } ); @@ -175,7 +175,7 @@ void testTaskMutexJoiningOuterTasks() TaskMutex::ScopedLock lock( mutex ); gotLock.local() = true; - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ) if( !initialised ) { @@ -213,7 +213,7 @@ void testTaskMutexJoiningOuterTasks() for( size_t i = r.begin(); i < r.end(); ++i ) { TaskMutex::ScopedLock lock( *independentTasks[i] ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ) lock.execute( [&]() { initialise(); @@ -247,56 +247,13 @@ void testTaskMutexHeavyContention( bool acceptWork ) for( size_t i = r.begin(); i < r.end(); ++i ) { TaskMutex::ScopedLock lock( mutex, /* write = */ false, acceptWork ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Read ); + GAFFERTEST_ASSERT( !lock.isWriter() ); GAFFERTEST_ASSERTEQUAL( initialised, true ); } } ); } -void testTaskMutexWorkerRecursion() -{ - TaskMutex mutex; - tbb::enumerable_thread_specific gotLock; - - std::function recurse; - recurse = [&mutex, &gotLock, &recurse] ( int depth ) { - - TaskMutex::ScopedLock lock; - const bool acquired = lock.acquireOr( - mutex, TaskMutex::ScopedLock::LockType::WorkerRead, - []( bool workAvailable ) { return true; } - ); - - GAFFERTEST_ASSERT( acquired ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::WorkerRead ); - - gotLock.local() = true; - - if( depth > 4 ) - { - std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) ); - } - else - { - tbb::parallel_for( - 0, 4, - [&recurse, depth] ( int i ) { - recurse( depth + 1 ); - } - ); - } - - }; - - TaskMutex::ScopedLock lock( mutex ); - lock.execute( - [&recurse] { recurse( 0 ); } - ); - - GAFFERTEST_ASSERTEQUAL( gotLock.size(), tbb::tbb_thread::hardware_concurrency() ); -} - void testTaskMutexAcquireOr() { TaskMutex mutex; @@ -305,7 +262,7 @@ void testTaskMutexAcquireOr() TaskMutex::ScopedLock lock2; bool workAvailable = true; const bool acquired = lock2.acquireOr( - mutex, TaskMutex::ScopedLock::LockType::Write, + mutex, /* write = */ true, [&workAvailable] ( bool wa ) { workAvailable = wa; return true; } ); @@ -522,7 +479,6 @@ void GafferTestModule::bindTaskMutexTest() def( "testTaskMutexWithinIsolate", &testTaskMutexWithinIsolate ); def( "testTaskMutexJoiningOuterTasks", &testTaskMutexJoiningOuterTasks ); def( "testTaskMutexHeavyContention", &testTaskMutexHeavyContention ); - def( "testTaskMutexWorkerRecursion", &testTaskMutexWorkerRecursion ); def( "testTaskMutexAcquireOr", &testTaskMutexAcquireOr ); def( "testTaskMutexExceptions", &testTaskMutexExceptions ); def( "testTaskMutexWorkerExceptions", &testTaskMutexWorkerExceptions ); From 06dd2204ad2f41826f9da3105d748d044a2b69f2 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 9 Oct 2023 16:35:25 +0100 Subject: [PATCH 10/11] LRUCache : Simplify handling of `Handle::isWritable()` Now that WorkerRead recursion has been removed, writability is determined entirely by AquireMode, so we can replace conditionals with asserts. --- .../Gaffer/Private/IECorePreview/LRUCache.inl | 38 +++----- .../IECorePreviewTest/LRUCacheTest.py | 8 -- src/GafferTestModule/LRUCacheTest.cpp | 87 ------------------- 3 files changed, 14 insertions(+), 119 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/LRUCache.inl b/include/Gaffer/Private/IECorePreview/LRUCache.inl index 13ca667b77d..49a12bbf197 100644 --- a/include/Gaffer/Private/IECorePreview/LRUCache.inl +++ b/include/Gaffer/Private/IECorePreview/LRUCache.inl @@ -167,21 +167,12 @@ class Serial return m_it->cacheEntry; } - // Returns true if it is OK to call `writable()`. - // This is typically determined by the AcquireMode - // passed to `acquire()`, with special cases for - // recursion. + // Returns true if it is OK to call `writable()`. This is determined + // by the AcquireMode passed to `acquire()`. bool isWritable() const { - // Because this policy is serial, it would technically - // always be OK to write. But we return false for recursive - // calls to avoid unnecessary overhead updating the LRU list - // for inner calls. - /// \todo Is this distinction worth it, and do we really need - /// to support recursion on a single item in the Serial policy? - /// This is a remnant of a more complex system that allowed recursion - /// in the TaskParallel policy, but that has since been removed. - return m_it->handleCount == 1; + // Because this policy is serial, it is always OK to write + return true; } // Executes the functor F. This is used to @@ -1093,6 +1084,7 @@ Value LRUCache::get( const GetterKey &key, const if( status==Uncached ) { + assert( handle.isWritable() ); Value value = Value(); Cost cost = 0; try @@ -1105,24 +1097,21 @@ Value LRUCache::get( const GetterKey &key, const } catch( ... ) { - if( handle.isWritable() && m_cacheErrors ) + if( m_cacheErrors ) { handle.writable().state = std::current_exception(); } throw; } - if( handle.isWritable() ) - { - assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow - assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention. + assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow + assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention. - setInternal( key, handle.writable(), value, cost ); - m_policy.push( handle ); + setInternal( key, handle.writable(), value, cost ); + m_policy.push( handle ); - handle.release(); - limitCost( m_maxCost ); - } + handle.release(); + limitCost( m_maxCost ); return value; } @@ -1187,8 +1176,9 @@ bool LRUCache::setIfUncached( const Key &key, con const Status status = cacheEntry.status(); bool result = false; - if( status == Uncached && handle.isWritable() ) + if( status == Uncached ) { + assert( handle.isWritable() ); result = setInternal( key, handle.writable(), value, costFunction( value ) ); m_policy.push( handle ); diff --git a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py index b647b020d19..f38ad0d3f4a 100644 --- a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py +++ b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py @@ -154,10 +154,6 @@ def testRecursionWithEvictionsTaskParallel( self ) : GafferTest.testLRUCacheRecursion( "taskParallel", numIterations = 100000, numValues = 1000, maxCost = 100 ) - def testRecursionOnOneItemSerial( self ) : - - GafferTest.testLRUCacheRecursionOnOneItem( "serial" ) - def testClearFromGetSerial( self ) : GafferTest.testLRUCacheClearFromGet( "serial" ) @@ -232,9 +228,5 @@ def testSetIfUncached( self ) : with self.subTest( policy = policy ) : GafferTest.testLRUCacheSetIfUncached( policy ) - def testSetIfUncachedRecursion( self ) : - - GafferTest.testLRUCacheSetIfUncachedRecursion( "serial" ) - if __name__ == "__main__": unittest.main() diff --git a/src/GafferTestModule/LRUCacheTest.cpp b/src/GafferTestModule/LRUCacheTest.cpp index 85713d9571e..46f6a52268a 100644 --- a/src/GafferTestModule/LRUCacheTest.cpp +++ b/src/GafferTestModule/LRUCacheTest.cpp @@ -308,53 +308,6 @@ void testLRUCacheRecursion( const std::string &policy, int numIterations, size_t DispatchTest()( policy, numIterations, numValues, maxCost ); } -template class Policy> -struct TestLRUCacheRecursionOnOneItem -{ - - void operator()() - { - using Cache = LRUCache; - using CachePtr = std::unique_ptr; - int recursionDepth = 0; - - CachePtr cache; - cache.reset( - new Cache( - // Getter that calls back into the cache with the _same_ - // key, up to a certain limit, and then actually returns - // a value. This is basically insane, but it models - // situations that can occur in Gaffer. - [&cache, &recursionDepth]( int key, size_t &cost, const IECore::Canceller *canceller ) { - cost = 1; - if( ++recursionDepth == 100 ) - { - return key; - } - else - { - return cache->get( key ); - } - }, - // Max cost is small enough that we'll be trying to evict - // keys while unwinding the recursion. - 20 - ) - ); - - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 0 ); - GAFFERTEST_ASSERTEQUAL( cache->get( 1 ), 1 ); - GAFFERTEST_ASSERTEQUAL( recursionDepth, 100 ); - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 1 ); - } - -}; - -void testLRUCacheRecursionOnOneItem( const std::string &policy ) -{ - DispatchTest()( policy ); -} - template class Policy> struct TestLRUCacheClearFromGet { @@ -847,44 +800,6 @@ void testLRUCacheSetIfUncached( const std::string &policy ) DispatchTest()( policy ); } -template class Policy> -struct TestLRUCacheSetIfUncachedRecursion -{ - - void operator()() - { - using Cache = LRUCache; - using CachePtr = std::unique_ptr; - - CachePtr cache; - cache.reset( - new Cache( - // Getter that calls `setIfUncached()` with the _same_ key. This - // is basically insane, but it models situations that can occur - // in Gaffer. - [&cache]( int key, size_t &cost, const IECore::Canceller *canceller ) { - cost = 1; - // We expect the call to fail, because the lock is held by the - // outer call to `get()`. - GAFFERTEST_ASSERT( !cache->setIfUncached( key, key, []( int ) { return 1; } ) ); - return key; - }, - 1000 - ) - ); - - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 0 ); - GAFFERTEST_ASSERTEQUAL( cache->get( 1 ), 1 ); - GAFFERTEST_ASSERTEQUAL( cache->currentCost(), 1 ); - } - -}; - -void testLRUCacheSetIfUncachedRecursion( const std::string &policy ) -{ - DispatchTest()( policy ); -} - } // namespace void GafferTestModule::bindLRUCacheTest() @@ -893,7 +808,6 @@ void GafferTestModule::bindLRUCacheTest() def( "testLRUCacheRemovalCallback", &testLRUCacheRemovalCallback ); def( "testLRUCacheContentionForOneItem", &testLRUCacheContentionForOneItem, arg( "withCanceller" ) = false ); def( "testLRUCacheRecursion", &testLRUCacheRecursion, ( arg( "numIterations" ), arg( "numValues" ), arg( "maxCost" ) ) ); - def( "testLRUCacheRecursionOnOneItem", &testLRUCacheRecursionOnOneItem ); def( "testLRUCacheClearFromGet", &testLRUCacheClearFromGet ); def( "testLRUCacheExceptions", &testLRUCacheExceptions ); def( "testLRUCacheCancellation", &testLRUCacheCancellation ); @@ -901,5 +815,4 @@ void GafferTestModule::bindLRUCacheTest() def( "testLRUCacheUncacheableItem", &testLRUCacheUncacheableItem ); def( "testLRUCacheGetIfCached", &testLRUCacheGetIfCached ); def( "testLRUCacheSetIfUncached", &testLRUCacheSetIfUncached ); - def( "testLRUCacheSetIfUncachedRecursion", &testLRUCacheSetIfUncachedRecursion ); } From a62f89236a11079b38f70341ae1a0ce58d7c5589 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Tue, 17 Oct 2023 13:06:03 +0100 Subject: [PATCH 11/11] Revert "Process : Replace `task_arena` with `isolated_task_group`" This reverts commit 148291d26b2fd8df07282ea7c75feba1d70ac7bb. --- .../Gaffer/Private/IECorePreview/TaskMutex.h | 1 - include/Gaffer/Process.inl | 97 ++++++++++--------- 2 files changed, 52 insertions(+), 46 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/TaskMutex.h b/include/Gaffer/Private/IECorePreview/TaskMutex.h index 1ff73c75a79..8e8a96252c8 100644 --- a/include/Gaffer/Private/IECorePreview/TaskMutex.h +++ b/include/Gaffer/Private/IECorePreview/TaskMutex.h @@ -44,7 +44,6 @@ #include "tbb/spin_rw_mutex.h" #include "tbb/task_arena.h" -#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 #include "tbb/task_group.h" #include diff --git a/include/Gaffer/Process.inl b/include/Gaffer/Process.inl index 25f81a60767..19d67cccbbc 100644 --- a/include/Gaffer/Process.inl +++ b/include/Gaffer/Process.inl @@ -36,7 +36,7 @@ #include "tbb/concurrent_hash_map.h" #include "tbb/spin_mutex.h" -#define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 +#include "tbb/task_arena.h" #include "tbb/task_group.h" #include @@ -239,9 +239,10 @@ class GAFFER_API Process::Collaboration : public IECore::RefCounted IE_CORE_DECLAREMEMBERPTR( Collaboration ); - // Task group used to allow waiting threads to participate in - // collaborative work. - tbb::isolated_task_group taskGroup; + // Arena and task group used to allow waiting threads to participate + // in collaborative work. + tbb::task_arena arena; + tbb::task_group taskGroup; using Set = std::unordered_set; // Collaborations depending directly on this one. @@ -328,8 +329,8 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( } // We've found an in-flight process we can wait on without causing - // deadlock. Wait on the result, so we get to work on any TBB tasks it - // has created. + // deadlock. Join its `task_arena` and wait on the result, so we get to + // work on any TBB tasks it has created. // // > Note : We need to own a reference to `collaboration` because the // thread that created it may drop its own reference as soon as we call @@ -344,7 +345,9 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( CollaborationTypePtr collaboration = candidate; accessor.release(); - collaboration->taskGroup.wait(); + collaboration->arena.execute( + [&]{ return collaboration->taskGroup.wait(); } + ); if( collaboration->result ) { @@ -375,45 +378,49 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult( std::exception_ptr exception; - auto status = collaboration->taskGroup.run_and_wait( + auto status = collaboration->arena.execute( [&] { - // Publish ourselves so that other threads can collaborate - // by calling `collaboration->taskGroup.wait()`. - accessor->second.push_back( collaboration ); - accessor.release(); - - try - { - ProcessType process( std::forward( args )... ); - process.m_collaboration = collaboration.get(); - collaboration->result = process.run(); - // Publish result to cache before we remove ourself from - // `g_pendingCollaborations`, so that other threads will - // be able to get the result one way or the other. - ProcessType::g_cache.setIfUncached( - cacheKey, *collaboration->result, - ProcessType::cacheCostFunction - ); - } - catch( ... ) - { - // Don't allow `task_group::wait()` to see exceptions, - // because then we'd hit a thread-safety bug in - // `tbb::task_group_context::reset()`. - exception = std::current_exception(); - } - - // Now we're done, remove `collaboration` from the pending collaborations. - [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); - assert( found ); - auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); - assert( toErase != accessor->second.end() ); - accessor->second.erase( toErase ); - if( accessor->second.empty() ) - { - CollaborationType::g_pendingCollaborations.erase( accessor ); - } - accessor.release(); + return collaboration->taskGroup.run_and_wait( + [&] { + // Publish ourselves so that other threads can collaborate + // by calling `collaboration->taskGroup.wait()`. + accessor->second.push_back( collaboration ); + accessor.release(); + + try + { + ProcessType process( std::forward( args )... ); + process.m_collaboration = collaboration.get(); + collaboration->result = process.run(); + // Publish result to cache before we remove ourself from + // `g_pendingCollaborations`, so that other threads will + // be able to get the result one way or the other. + ProcessType::g_cache.setIfUncached( + cacheKey, *collaboration->result, + ProcessType::cacheCostFunction + ); + } + catch( ... ) + { + // Don't allow `task_group::wait()` to see exceptions, + // because then we'd hit a thread-safety bug in + // `tbb::task_group_context::reset()`. + exception = std::current_exception(); + } + + // Now we're done, remove `collaboration` from the pending collaborations. + [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); + assert( found ); + auto toErase = std::find( accessor->second.begin(), accessor->second.end(), collaboration ); + assert( toErase != accessor->second.end() ); + accessor->second.erase( toErase ); + if( accessor->second.empty() ) + { + CollaborationType::g_pendingCollaborations.erase( accessor ); + } + accessor.release(); + } + ); } );