From 467368569eaa1775b34ef2c2e3ddd70d9b1fbf8b Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 28 Nov 2022 14:32:38 +0000 Subject: [PATCH 01/10] ParentTest : Expose problems with WorkerRead recursion --- python/GafferSceneTest/ParentTest.py | 308 +++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) diff --git a/python/GafferSceneTest/ParentTest.py b/python/GafferSceneTest/ParentTest.py index be8031ee210..c66bfac1b3c 100644 --- a/python/GafferSceneTest/ParentTest.py +++ b/python/GafferSceneTest/ParentTest.py @@ -36,6 +36,7 @@ import pathlib import inspect +import functools import imath @@ -937,5 +938,312 @@ def testInvalidDestinationNames( self ) : with self.assertRaisesRegex( Gaffer.ProcessException, r".*Invalid destination `/woops/a\*`. Name `a\*` is invalid \(because it contains filter wildcards\)" ) : parent["out"].childNames( "/" ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskScheduling:hashAliasing" } ) + def testChainedNodesWithIdenticalBranches( self ) : + + # infiniteScene + # | + # parent2 + # | + # parent1 + + # Trick to make a large scene without needing a cache file + # and without using a BranchCreator. This scene is infinite + # but very cheap to compute. + + infiniteScene = GafferScene.ScenePlug() + infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + # Filter that will search the scene to a fixed depth, but + # never find anything. + + nothingFilter = GafferScene.PathFilter() + nothingFilter["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*/*/*/*/thisDoesntExist" ] ) ) + + # Two Parent nodes one after another, using the same filter. + # These will generate the same (empty) set of branches, and + # therefore have the same hash. + + parent2 = GafferScene.Parent() + parent2["in"].setInput( infiniteScene ) + parent2["filter"].setInput( nothingFilter["out"] ) + + parent1 = GafferScene.Parent() + parent1["in"].setInput( parent2["out"] ) + parent1["filter"].setInput( nothingFilter["out"] ) + + self.assertEqual( parent1["__branches"].hash(), parent2["__branches"].hash() ) + + # Simulate the effects of a previous computation being evicted + # from the cache. + + parent1["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # We are now a situation where the hash for `parent1.__branches` is + # cached, but the value isn't. This is significant, because it means + # that in the next step, the downstream compute for `parent1.__branches` + # starts _before_ the upstream one for `parent2.__branches`. If the hash + # wasn't cached, then the hash for `parent1` would trigger + # an upstream compute for `parent2.__branches` first. + + # Trigger scene generation. We can't use `traverseScene()` because the + # scene is infinite, so we use `matchingPaths()` to generate up to a fixed + # depth. The key effect here is that lots of threads are indirectly pulling on + # `parent1.__branches`, triggering task collaboration. + + with Gaffer.PerformanceMonitor() as monitor : + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), parent1["out"], paths ) + + # We only expect to see a single hash/compute for `__branches` on each + # node. A previous bug meant that this was not the case, and thousands + # of unnecessary evaluations of `parent2.__branches` could occur. + + self.assertEqual( monitor.plugStatistics( parent1["__branches"] ).computeCount, 1 ) + self.assertEqual( monitor.plugStatistics( parent2["__branches"] ).computeCount, 1 ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskScheduling:hashAliasing" } ) + def testChainedNodesWithIdenticalBranchesAndIntermediateCompute( self ) : + + # As for `testChainedNodesWithIdenticalBranches` above, but with a + # _different_ Parent node inserted between the identical two. + # + # infiniteScene + # | + # parentA2 + # | + # parentB + # | + # parentA1 + # + + infiniteScene = GafferScene.ScenePlug() + infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + filterA = GafferScene.PathFilter() + filterA["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/thisDoesntExist" ] ) ) + + somethingFilter = GafferScene.PathFilter() + somethingFilter["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*" ] ) ) + + parentA2 = GafferScene.Parent() + parentA2["in"].setInput( infiniteScene ) + parentA2["filter"].setInput( filterA["out"] ) + + parentB = GafferScene.Parent() + parentB["in"].setInput( parentA2["out"] ) + parentB["filter"].setInput( somethingFilter["out"] ) + + parentA1 = GafferScene.Parent() + parentA1["in"].setInput( parentB["out"] ) + parentA1["filter"].setInput( filterA["out"] ) + + self.assertEqual( parentA1["__branches"].hash(), parentA2["__branches"].hash() ) + + # Simulate the effects of a previous computation being evicted + # from the cache. + + parentA1["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # We are now a situation where the hash for `parentA1.__branches` is + # cached, but the value isn't. This means that the next call will + # trigger the compute for `parentA1.__branches` _first_. In turn, that + # will trigger a compute on `parentB.__branches`, and _that_ will + # recurse to `parentA2.__branches`. There are now two paths for worker + # threads to need the result from `parentA2` : + # + # 1. The worker enters the task arena for `parentA1`, then enters the + # nested task arena for `parentB`, and finally tries to get the result + # from `parentA2`. + # 2. The worker enters the task arena for `parentB` directly, takes a task, then + # tries to get the result from `parentA1`. + # + # In the first case we were able to detect the recursion from `parentA1` to + # `parentA2` using a `task_arena_observer`. But in the second case, the worker + # enters the `parentB` arena directly, and isn't considered to be in the `parentA1` + # arena by `task_arena_observer`. So this worker tries to do task collaboration + # on A, and is therefore waiting on A to finish. But A can't finish until B finishes + # on the first worker thread. And B can't finish on the first worker thread because + # it is waiting on a task that is trapped on the second worker thread. Deadlock! + # + # This deficiency in using `task_arena_observer` to detect recursion lead + # us to develop a different method, one that avoids the deadlock + # that was being triggered by the call below. + + parentA1["__branches"].getValue() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskScheduling:hashAliasing" } ) + def testSplitNodesWithIdenticalBranchesAndIntermediateCompute( self ) : + + # As for `testChainedNodesWithIdenticalBranches` above, but with node + # `parentC` branching off in parallel with `parentA1`. This branch + # arrives at `parentA2` without having `parentA1` in the history. This + # is similar to the case above - it is another mechanism whereby B is + # needed before A on one thread in particular. + # + # infiniteScene + # | + # parentA2 + # | + # parentB + # / \ + # parentA1 parentC + + script = Gaffer.ScriptNode() + + script["infiniteScene"] = GafferScene.ScenePlug() + script["infiniteScene"]["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + script["filterA"] = GafferScene.PathFilter() + script["filterA"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*thisDoesntExist" ] ) ) + + script["filterB"] = GafferScene.PathFilter() + script["filterB"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/one" ] ) ) + + script["filterC"] = GafferScene.PathFilter() + script["filterC"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/two" ] ) ) + + script["parentA2"] = GafferScene.Parent() + script["parentA2"]["in"].setInput( script["infiniteScene"] ) + script["parentA2"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentB"] = GafferScene.Parent() + script["parentB"]["in"].setInput( script["parentA2"]["out"] ) + script["parentB"]["filter"].setInput( script["filterB"]["out"] ) + + script["parentA1"] = GafferScene.Parent() + script["parentA1"]["in"].setInput( script["parentB"]["out"] ) + script["parentA1"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentC"] = GafferScene.Parent() + script["parentC"]["in"].setInput( script["parentB"]["out"] ) + script["parentC"]["filter"].setInput( script["filterC"]["out"] ) + + self.assertEqual( script["parentA1"]["__branches"].hash(), script["parentA2"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentB"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + self.assertNotEqual( script["parentB"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + + # Repeat this one, because it is more sensitive to specific task timings. + for i in range( 0, 100 ) : + + # Simulate the effects of a previous computation being evicted + # from the cache. + + script["parentA1"]["__branches"].getValue() + script["parentC"]["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # Trigger a bunch of parallel computes on both `parentA1` and `parentC`. + + def evaluateScene( scene ) : + + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), scene, paths ) + + parentA1Task = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentA1"]["out"], functools.partial( evaluateScene, script["parentA1"]["out"] ) ) + parentCTask = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentC"]["out"], functools.partial( evaluateScene, script["parentC"]["out"] ) ) + + parentA1Task.wait() + parentCTask.wait() + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskScheduling:hashAliasing" } ) + def testDependencyPropagationThroughIntermediateCompute( self ) : + + # As for `testSplitNodesWithIdenticalBranchesAndIntermediateCompute` above, but + # with an additional task collaboration between `parentB` and `parentA2`. This + # means that we need to track the dependency from `parentB` to `parentA2` through + # an intermediate node. + # + # infiniteScene + # | + # parentA2 + # | + # parentI + # | + # parentB + # / \ + # parentA1 parentC + + script = Gaffer.ScriptNode() + + script["infiniteScene"] = GafferScene.ScenePlug() + script["infiniteScene"]["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) ) + + script["filterA"] = GafferScene.PathFilter() + script["filterA"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*thisDoesntExist" ] ) ) + + script["filterB"] = GafferScene.PathFilter() + script["filterB"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/one" ] ) ) + + script["filterC"] = GafferScene.PathFilter() + script["filterC"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/two" ] ) ) + + script["filterI"] = GafferScene.PathFilter() + script["filterI"]["paths"].setValue( IECore.StringVectorData( [ "/*/*/*/*/*/*/*/*two" ] ) ) + + script["parentA2"] = GafferScene.Parent() + script["parentA2"]["in"].setInput( script["infiniteScene"] ) + script["parentA2"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentI"] = GafferScene.Parent() + script["parentI"]["in"].setInput( script["parentA2"]["out"] ) + script["parentI"]["filter"].setInput( script["filterI"]["out"] ) + + script["parentB"] = GafferScene.Parent() + script["parentB"]["in"].setInput( script["parentI"]["out"] ) + script["parentB"]["filter"].setInput( script["filterB"]["out"] ) + + script["parentA1"] = GafferScene.Parent() + script["parentA1"]["in"].setInput( script["parentB"]["out"] ) + script["parentA1"]["filter"].setInput( script["filterA"]["out"] ) + + script["parentC"] = GafferScene.Parent() + script["parentC"]["in"].setInput( script["parentB"]["out"] ) + script["parentC"]["filter"].setInput( script["filterC"]["out"] ) + + self.assertEqual( script["parentA1"]["__branches"].hash(), script["parentA2"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentB"]["__branches"].hash() ) + self.assertNotEqual( script["parentA1"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + self.assertNotEqual( script["parentB"]["__branches"].hash(), script["parentC"]["__branches"].hash() ) + + # Repeat this one, because it is more sensitive to specific task timings. + for i in range( 0, 100 ) : + + # Simulate the effects of a previous computation being evicted + # from the cache. + + script["parentA1"]["__branches"].getValue() + script["parentC"]["__branches"].getValue() + Gaffer.ValuePlug.clearCache() + + # Trigger a bunch of parallel computes on both `parentA1` and + # `parentC`. The sequence of computes we're concerned about is this : + # + # 1. `parentC` + # 2. `parentB` + # 3. `parentI` + # 4. `parentA1` + # 5. `parentB` (collaboration from `parentA1`) + # 6. `parentA2` (mustn't collaborate with `parentA1`) + # + # This means realising that `parentA2` is being waited on by + # `parentA1`, even though `parentI` started computing _before_ + # collaboration on `parentB` started. So we can't simply track + # dependents up the chain at the point each process starts. + + def evaluateScene( scene ) : + + paths = IECore.PathMatcher() + GafferScene.SceneAlgo.matchingPaths( IECore.PathMatcher( [ "/*/*/*/*/*/*/*/*/*" ] ), scene, paths ) + + parentCTask = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentC"]["out"], functools.partial( evaluateScene, script["parentC"]["out"] ) ) + parentA1Task = Gaffer.ParallelAlgo.callOnBackgroundThread( script["parentA1"]["out"], functools.partial( evaluateScene, script["parentA1"]["out"] ) ) + + parentCTask.wait() + parentA1Task.wait() + if __name__ == "__main__": unittest.main() From 30c104a16cc3b40bd7a30b2430421b270ad1b98b Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 7 Dec 2022 14:52:47 +0000 Subject: [PATCH 02/10] ExpressionTest : Demonstrate recursion due to hash aliasing --- python/GafferTest/ExpressionTest.py | 55 +++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/python/GafferTest/ExpressionTest.py b/python/GafferTest/ExpressionTest.py index 76dc9ebd2b0..fd4ead3a017 100644 --- a/python/GafferTest/ExpressionTest.py +++ b/python/GafferTest/ExpressionTest.py @@ -1650,5 +1650,60 @@ def testPathForStringPlug( self ) : self.assertEqual( script["node"]["in"].getValue(), pathlib.Path.cwd().as_posix() ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskScheduling:hashAliasing" } ) + def testHashAliasing( self ) : + + # p1 + # | + # e1 + # | + # p2 + # | + # e2 + # | + # p3 + # | + # e3 (aliases the hash from e1) + # | + # p4 + + script = Gaffer.ScriptNode() + + script["n"] = Gaffer.Node() + script["n"]["user"]["p1"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p2"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p3"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + script["n"]["user"]["p4"] = Gaffer.StringPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic ) + + script["e1"] = Gaffer.Expression() + script["e1"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p2"] = parent["n"]["user"]["p1"]""" ) + + script["e2"] = Gaffer.Expression() + script["e2"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p3"] = parent["n"]["user"]["p2"] and ''""" ) + + script["e3"] = Gaffer.Expression() + script["e3"].setExpression( """import time; time.sleep( 0.01 ); parent["n"]["user"]["p4"] = parent["n"]["user"]["p3"]""" ) + + # Because the input StringPlugs hash by value (see de8ab79d6f958cef3b80954798f8083a346945a7), + # and the expression is stored in an internalised form that omits the names of the source + # and destination plugs, the hashes for `e3` and `e1`` are identical, even though one depends + # on the other. + self.assertEqual( script["e1"]["__execute"].hash(), script["e3"]["__execute"].hash() ) + # But the hash for `e2` is different, because it has different expression source code + # (even though it will generate the same value). + self.assertNotEqual( script["e2"]["__execute"].hash(), script["e1"]["__execute"].hash() ) + + # Get the value for `p4`, which will actually compute and cache `p2` first due to the hash + # computation done before the compute. So we never actually do a compute on `p4`, as the value + # for that comes from the cache. + script["n"]["user"]["p4"].getValue() + # Simulate a cache eviction, so we have the hash cached, but not the value. + Gaffer.ValuePlug.clearCache() + + # Now, getting the value of `p4` will trigger an immediate compute for `e3`, which will + # recurse to an upstream compute for `e1`, which has the same hash. If we don't have a + # mechanism for handling it, this will deadlock. + script["n"]["user"]["p4"].getValue() + if __name__ == "__main__": unittest.main() From 8c7711cbd65531d16e4c27a3cf4ed7ee748600e5 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 7 Dec 2022 15:18:02 +0000 Subject: [PATCH 03/10] LoopTest : Demonstrate deadlock due to hash aliasing Although this is contrived, it demonstrates an important point : hashes can alias on the _same_ plug, just in different contexts. --- python/GafferTest/LoopTest.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/python/GafferTest/LoopTest.py b/python/GafferTest/LoopTest.py index 33835f0c6a0..38303387648 100644 --- a/python/GafferTest/LoopTest.py +++ b/python/GafferTest/LoopTest.py @@ -282,5 +282,30 @@ def plugDirtied( plug ) : for plug, value in valuesWhenDirtied.items() : self.assertEqual( plugValue( plug ), value ) + @GafferTest.TestRunner.CategorisedTestMethod( { "taskScheduling:hashAliasing" } ) + def testHashAliasingDeadlock( self ) : + + script = Gaffer.ScriptNode() + + script["loop"] = Gaffer.Loop() + script["loop"].setup( Gaffer.StringPlug() ) + + # Dumb expression that just sets the value for the next iteration to + # the value from the previous iteration. Because of de8ab79d6f958cef3b80954798f8083a346945a7, + # the hash for the expression output is identical for every iteration of + # the loop, even though the context differs. + script["expression"] = Gaffer.Expression() + script["expression"].setExpression( """parent["loop"]["next"] = parent["loop"]["previous"]""" ) + + # Get the result of the loop. This actually computes the _first_ iteration of the loop first, + # while computing the hash of the result, and reuses the result for every other loop iteration. + script["loop"]["out"].getValue() + # Simulate cache eviction by clearing the compute cache. + Gaffer.ValuePlug.clearCache() + # Get the value again. Now, because the hash is still cached, this will first start the + # compute for the _last_ iteration. This leads to a recursive compute, which can cause deadlock + # if not handled appropriately. + script["loop"]["out"].getValue() + if __name__ == "__main__": unittest.main() From 14a24b4fcc36ffc6defaaaca11febd03e1a0a58a Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 23 Nov 2022 14:48:09 +0000 Subject: [PATCH 04/10] TaskMutex : Remove `WorkerRead` lock type --- .../Gaffer/Private/IECorePreview/LRUCache.inl | 24 ++-- .../Gaffer/Private/IECorePreview/TaskMutex.h | 130 +++--------------- .../IECorePreviewTest/LRUCacheTest.py | 9 +- .../IECorePreviewTest/TaskMutexTest.py | 4 - src/GafferTestModule/TaskMutexTest.cpp | 58 +------- 5 files changed, 35 insertions(+), 190 deletions(-) diff --git a/include/Gaffer/Private/IECorePreview/LRUCache.inl b/include/Gaffer/Private/IECorePreview/LRUCache.inl index 86895fb2061..233f561d561 100644 --- a/include/Gaffer/Private/IECorePreview/LRUCache.inl +++ b/include/Gaffer/Private/IECorePreview/LRUCache.inl @@ -177,6 +177,10 @@ class Serial // always be OK to write. But we return false for recursive // calls to avoid unnecessary overhead updating the LRU list // for inner calls. + /// \todo Is this distinction worth it, and do we really need + /// to support recursion on a single item in the Serial policy? + /// This is a remnant of a more complex system that allowed recursion + /// in the TaskParallel policy, but that has since been removed. return m_it->handleCount == 1; } @@ -733,20 +737,19 @@ class TaskParallel CacheEntry &writable() { - assert( m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write ); + assert( m_itemLock.isWriter() ); return m_item->cacheEntry; } - // May return false for AcquireMode::Insert if a GetterFunction recurses. bool isWritable() const { - return m_itemLock.lockType() == Item::Mutex::ScopedLock::LockType::Write; + return m_itemLock.isWriter(); } template void execute( F &&f ) { - if( m_spawnsTasks && m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + if( m_spawnsTasks ) { // The getter function will spawn tasks. Execute // it via the TaskMutex, so that other threads trying @@ -814,16 +817,9 @@ class TaskParallel // found a pre-existing item we optimistically take // just a read lock, because it is faster when // many threads just need to read from the same - // cached item. We accept WorkerRead locks when necessary, - // to support Getter recursion. - TaskMutex::ScopedLock::LockType lockType = TaskMutex::ScopedLock::LockType::WorkerRead; - if( inserted || mode == FindWritable || mode == InsertWritable ) - { - lockType = TaskMutex::ScopedLock::LockType::Write; - } - + // cached item. const bool acquired = m_itemLock.acquireOr( - it->mutex, lockType, + it->mutex, /* write = */ inserted || mode == FindWritable || mode == InsertWritable, // Work accepter [&binLock, canceller] ( bool workAvailable ) { // Release the bin lock prior to accepting work, because @@ -855,7 +851,7 @@ class TaskParallel if( acquired ) { if( - m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Read && + !m_itemLock.isWriter() && mode == Insert && it->cacheEntry.status() == LRUCache::Uncached ) { diff --git a/include/Gaffer/Private/IECorePreview/TaskMutex.h b/include/Gaffer/Private/IECorePreview/TaskMutex.h index da9474fa73d..8e8a96252c8 100644 --- a/include/Gaffer/Private/IECorePreview/TaskMutex.h +++ b/include/Gaffer/Private/IECorePreview/TaskMutex.h @@ -45,11 +45,6 @@ #include "tbb/task_arena.h" #include "tbb/task_group.h" -// Enable preview feature that allows us to construct a `task_scheduler_observer` -// for a specific `task_arena`. This feature becomes officially supported in -// Intel TBB 2019 Update 5, so it is not going to be removed. -#define TBB_PREVIEW_LOCAL_OBSERVER 1 -#include "tbb/task_scheduler_observer.h" #include #include @@ -101,6 +96,9 @@ namespace IECorePreview /// } /// // Use resource here, while lock is still held. /// ``` +/// +/// \todo Investigate `tbb::collaborative_call_once`, once VFXPlatform has moved to OneTBB. +/// It appears to provide very similar functionality. class TaskMutex : boost::noncopyable { @@ -121,7 +119,7 @@ class TaskMutex : boost::noncopyable public : ScopedLock() - : m_mutex( nullptr ), m_lockType( LockType::None ) + : m_mutex( nullptr ), m_writer( false ) { } @@ -143,9 +141,8 @@ class TaskMutex : boost::noncopyable /// work on behalf of `execute()` while waiting. void acquire( TaskMutex &mutex, bool write = true, bool acceptWork = true ) { - const LockType l = write ? LockType::Write : LockType::Read; tbb::internal::atomic_backoff backoff; - while( !acquireOr( mutex, l, [acceptWork]( bool workAvailable ){ return acceptWork; } ) ) + while( !acquireOr( mutex, write, [acceptWork]( bool workAvailable ){ return acceptWork; } ) ) { backoff.pause(); } @@ -156,8 +153,8 @@ class TaskMutex : boost::noncopyable /// temporarily releasing the lock, and false otherwise. bool upgradeToWriter() { - assert( m_mutex && (m_lockType == LockType::Read) ); - m_lockType = LockType::Write; + assert( m_mutex && !m_writer ); + m_writer = true; return m_lock.upgrade_to_writer(); } @@ -166,7 +163,7 @@ class TaskMutex : boost::noncopyable template void execute( F &&f ) { - assert( m_mutex && m_lockType == LockType::Write ); + assert( m_mutex && m_writer ); ExecutionStateMutex::scoped_lock executionStateLock( m_mutex->m_executionStateMutex ); assert( !m_mutex->m_executionState ); @@ -219,8 +216,7 @@ class TaskMutex : boost::noncopyable /// Acquires mutex or returns false. Never does TBB tasks. bool tryAcquire( TaskMutex &mutex, bool write = true ) { - const LockType l = write ? LockType::Write : LockType::Read; - return acquireOr( mutex, l, []( bool workAvailable ){ return false; } ); + return acquireOr( mutex, write, []( bool workAvailable ){ return false; } ); } /// Releases the lock. This will be done automatically @@ -229,12 +225,9 @@ class TaskMutex : boost::noncopyable void release() { assert( m_mutex ); - if( m_lockType != LockType::WorkerRead ) - { - m_lock.release(); - } + m_lock.release(); m_mutex = nullptr; - m_lockType = LockType::None; + m_writer = false; } /// Advanced API @@ -244,40 +237,20 @@ class TaskMutex : boost::noncopyable /// in Gaffer's LRUCache. They should not be considered part of the canonical /// API. - enum class LockType - { - None, - // Multiple readers may coexist. - Read, - // Only a single writer can exist at a time, and the presence - // of a writer prevents read locks being acquired. - Write, - // Artificial read lock, available only to threads performing - // TBB tasks on behalf of `execute()`. These readers are - // protected only by the original write lock held by the caller - // of `execute()`. This means the caller of `execute()` _must_ - // delay any writes until _after_ `execute()` has returned. - // A WorkerRead lock can not be upgraded via `upgradeToWriter()`. - WorkerRead, - }; - /// Tries to acquire the mutex, returning true on success. On failure, /// calls `workNotifier( bool workAvailable )`. If work is available and /// `workNotifier` returns true, then this thread will perform TBB tasks /// spawned by `execute()` until the work is complete. Returns false on /// failure regardless of whether or not work is done. template - bool acquireOr( TaskMutex &mutex, LockType lockType, WorkNotifier &&workNotifier ) + bool acquireOr( TaskMutex &mutex, bool write, WorkNotifier &&workNotifier ) { assert( !m_mutex ); - assert( m_lockType == LockType::None ); - assert( lockType != LockType::None ); - - if( m_lock.try_acquire( mutex.m_mutex, /* write = */ lockType == LockType::Write ) ) + if( m_lock.try_acquire( mutex.m_mutex, write ) ) { // Success! m_mutex = &mutex; - m_lockType = lockType == LockType::WorkerRead ? LockType::Read : lockType; + m_writer = write; return true; } @@ -286,14 +259,6 @@ class TaskMutex : boost::noncopyable // current call to `execute()`. ExecutionStateMutex::scoped_lock executionStateLock( mutex.m_executionStateMutex ); - if( lockType == LockType::WorkerRead && mutex.m_executionState && mutex.m_executionState->arenaObserver.containsThisThread() ) - { - // We're already doing work on behalf of `execute()`, so we can - // take a WorkerRead lock. - m_mutex = &mutex; - m_lockType = lockType; - return true; - } const bool workAvailable = mutex.m_executionState.get(); if( !workNotifier( workAvailable ) || !workAvailable ) @@ -313,20 +278,16 @@ class TaskMutex : boost::noncopyable return false; } - /// Returns the type of the lock currently held. If `acquireOr( WorkerRead )` - /// is called successfully, this will return `Read` for an external lock and - /// `WorkerRead` for an internal lock acquired by virtue of performing tasks - /// for `execute()`. - LockType lockType() const + bool isWriter() const { - return m_lockType; + return m_writer; } private : InternalMutex::scoped_lock m_lock; TaskMutex *m_mutex; - LockType m_lockType; + bool m_writer; }; @@ -335,64 +296,10 @@ class TaskMutex : boost::noncopyable // The actual mutex that is held by the ScopedLock. InternalMutex m_mutex; - // Tracks worker threads as they enter and exit an arena, so we can determine - // whether or not the current thread is inside the arena. We use this to detect - // recursion and allow any worker thread to obtain a recursive lock provided - // they are currently performing work in service of `ScopedLock::execute()`. - class ArenaObserver : public tbb::task_scheduler_observer - { - - public : - - ArenaObserver( tbb::task_arena &arena ) - : tbb::task_scheduler_observer( arena ) - { - observe( true ); - } - - ~ArenaObserver() override - { - observe( false ); - } - - bool containsThisThread() - { - Mutex::scoped_lock lock( m_mutex ); - return m_threadIdSet.find( std::this_thread::get_id() ) != m_threadIdSet.end(); - } - - private : - - void on_scheduler_entry( bool isWorker ) override - { - assert( !containsThisThread() ); - Mutex::scoped_lock lock( m_mutex ); - m_threadIdSet.insert( std::this_thread::get_id() ); - } - - void on_scheduler_exit( bool isWorker ) override - { - assert( containsThisThread() ); - Mutex::scoped_lock lock( m_mutex ); - m_threadIdSet.erase( std::this_thread::get_id() ); - } - - using Mutex = tbb::spin_mutex; - using ThreadIdSet = boost::container::flat_set; - Mutex m_mutex; - ThreadIdSet m_threadIdSet; - - }; - // The mechanism we use to allow waiting threads // to participate in the work done by `execute()`. struct ExecutionState : public IECore::RefCounted { - ExecutionState() - : arenaObserver( arena ) - { - } - // Work around https://bugs.llvm.org/show_bug.cgi?id=32978 ~ExecutionState() noexcept( true ) override { @@ -402,9 +309,6 @@ class TaskMutex : boost::noncopyable // waiting threads to participate in work. tbb::task_arena arena; tbb::task_group taskGroup; - // Observer used to track which threads are - // currently inside the arena. - ArenaObserver arenaObserver; }; IE_CORE_DECLAREPTR( ExecutionState ); diff --git a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py index 2a49068aa4e..b647b020d19 100644 --- a/python/GafferTest/IECorePreviewTest/LRUCacheTest.py +++ b/python/GafferTest/IECorePreviewTest/LRUCacheTest.py @@ -158,10 +158,6 @@ def testRecursionOnOneItemSerial( self ) : GafferTest.testLRUCacheRecursionOnOneItem( "serial" ) - def testRecursionOnOneItemTaskParallel( self ) : - - GafferTest.testLRUCacheRecursionOnOneItem( "taskParallel" ) - def testClearFromGetSerial( self ) : GafferTest.testLRUCacheClearFromGet( "serial" ) @@ -238,10 +234,7 @@ def testSetIfUncached( self ) : def testSetIfUncachedRecursion( self ) : - # `parallel` policy omitted because it doesn't support recursion. - for policy in [ "serial", "taskParallel" ] : - with self.subTest( policy = policy ) : - GafferTest.testLRUCacheSetIfUncachedRecursion( policy ) + GafferTest.testLRUCacheSetIfUncachedRecursion( "serial" ) if __name__ == "__main__": unittest.main() diff --git a/python/GafferTest/IECorePreviewTest/TaskMutexTest.py b/python/GafferTest/IECorePreviewTest/TaskMutexTest.py index bf20cb53c82..885319f7900 100644 --- a/python/GafferTest/IECorePreviewTest/TaskMutexTest.py +++ b/python/GafferTest/IECorePreviewTest/TaskMutexTest.py @@ -64,10 +64,6 @@ def testHeavyContentionWithoutWorkAcceptance( self ) : GafferTest.testTaskMutexHeavyContention( False ) - def testWorkerRecursion( self ) : - - GafferTest.testTaskMutexWorkerRecursion() - def testAcquireOr( self ) : GafferTest.testTaskMutexAcquireOr() diff --git a/src/GafferTestModule/TaskMutexTest.cpp b/src/GafferTestModule/TaskMutexTest.cpp index e394835f001..756b18f2f3a 100644 --- a/src/GafferTestModule/TaskMutexTest.cpp +++ b/src/GafferTestModule/TaskMutexTest.cpp @@ -74,12 +74,12 @@ void testTaskMutex() TaskMutex::ScopedLock lock( mutex, /* write = */ false ); gotLock.local() = true; - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Read ) + GAFFERTEST_ASSERT( !lock.isWriter() ) if( !initialised ) { lock.upgradeToWriter(); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ); + GAFFERTEST_ASSERT( lock.isWriter() ); if( !initialised ) // Check again, because upgrading to writer may lose the lock temporarily. { @@ -134,7 +134,7 @@ void testTaskMutexWithinIsolate() tbb::this_task_arena::isolate( [&mutex]() { TaskMutex::ScopedLock lock( mutex ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ); std::this_thread::sleep_for( std::chrono::milliseconds( 1 ) ); } ); @@ -175,7 +175,7 @@ void testTaskMutexJoiningOuterTasks() TaskMutex::ScopedLock lock( mutex ); gotLock.local() = true; - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ) if( !initialised ) { @@ -213,7 +213,7 @@ void testTaskMutexJoiningOuterTasks() for( size_t i = r.begin(); i < r.end(); ++i ) { TaskMutex::ScopedLock lock( *independentTasks[i] ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Write ) + GAFFERTEST_ASSERT( lock.isWriter() ) lock.execute( [&]() { initialise(); @@ -247,56 +247,13 @@ void testTaskMutexHeavyContention( bool acceptWork ) for( size_t i = r.begin(); i < r.end(); ++i ) { TaskMutex::ScopedLock lock( mutex, /* write = */ false, acceptWork ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::Read ); + GAFFERTEST_ASSERT( !lock.isWriter() ); GAFFERTEST_ASSERTEQUAL( initialised, true ); } } ); } -void testTaskMutexWorkerRecursion() -{ - TaskMutex mutex; - tbb::enumerable_thread_specific gotLock; - - std::function recurse; - recurse = [&mutex, &gotLock, &recurse] ( int depth ) { - - TaskMutex::ScopedLock lock; - const bool acquired = lock.acquireOr( - mutex, TaskMutex::ScopedLock::LockType::WorkerRead, - []( bool workAvailable ) { return true; } - ); - - GAFFERTEST_ASSERT( acquired ); - GAFFERTEST_ASSERT( lock.lockType() == TaskMutex::ScopedLock::LockType::WorkerRead ); - - gotLock.local() = true; - - if( depth > 4 ) - { - std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) ); - } - else - { - tbb::parallel_for( - 0, 4, - [&recurse, depth] ( int i ) { - recurse( depth + 1 ); - } - ); - } - - }; - - TaskMutex::ScopedLock lock( mutex ); - lock.execute( - [&recurse] { recurse( 0 ); } - ); - - GAFFERTEST_ASSERTEQUAL( gotLock.size(), tbb::tbb_thread::hardware_concurrency() ); -} - void testTaskMutexAcquireOr() { TaskMutex mutex; @@ -305,7 +262,7 @@ void testTaskMutexAcquireOr() TaskMutex::ScopedLock lock2; bool workAvailable = true; const bool acquired = lock2.acquireOr( - mutex, TaskMutex::ScopedLock::LockType::Write, + mutex, /* write = */ true, [&workAvailable] ( bool wa ) { workAvailable = wa; return true; } ); @@ -522,7 +479,6 @@ void GafferTestModule::bindTaskMutexTest() def( "testTaskMutexWithinIsolate", &testTaskMutexWithinIsolate ); def( "testTaskMutexJoiningOuterTasks", &testTaskMutexJoiningOuterTasks ); def( "testTaskMutexHeavyContention", &testTaskMutexHeavyContention ); - def( "testTaskMutexWorkerRecursion", &testTaskMutexWorkerRecursion ); def( "testTaskMutexAcquireOr", &testTaskMutexAcquireOr ); def( "testTaskMutexExceptions", &testTaskMutexExceptions ); def( "testTaskMutexWorkerExceptions", &testTaskMutexWorkerExceptions ); From ccd059eece3b6610b7c69959351ba3aaa9c7e4b1 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 7 Aug 2023 15:45:21 +0100 Subject: [PATCH 05/10] ComputeNodeTest : Test exception handling for all cache policies --- python/GafferTest/ComputeNodeTest.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/python/GafferTest/ComputeNodeTest.py b/python/GafferTest/ComputeNodeTest.py index 943ebd24f91..7a3792f3082 100644 --- a/python/GafferTest/ComputeNodeTest.py +++ b/python/GafferTest/ComputeNodeTest.py @@ -608,6 +608,7 @@ class ThrowingNode( Gaffer.ComputeNode ) : def __init__( self, name="ThrowingNode" ) : self.hashFail = False + self.cachePolicy = Gaffer.ValuePlug.CachePolicy.Standard Gaffer.ComputeNode.__init__( self, name ) @@ -639,11 +640,11 @@ def compute( self, plug, context ) : def hashCachePolicy( self, plug ) : - return Gaffer.ValuePlug.CachePolicy.Standard + return self.cachePolicy def computeCachePolicy( self, plug ) : - return Gaffer.ValuePlug.CachePolicy.Standard + return self.cachePolicy IECore.registerRunTimeTyped( ThrowingNode ) @@ -684,18 +685,25 @@ def testProcessException( self ) : def testProcessExceptionNotShared( self ) : - thrower1 = self.ThrowingNode( "thrower1" ) - thrower2 = self.ThrowingNode( "thrower2" ) + for policy in Gaffer.ValuePlug.CachePolicy.names.values() : + with self.subTest( policy = policy ) : - with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower1.out : [\s\S]*Eeek!' ) as raised : - thrower1["out"].getValue() + Gaffer.ValuePlug.clearCache() - self.assertEqual( raised.exception.plug(), thrower1["out"] ) + thrower1 = self.ThrowingNode( "thrower1" ) + thrower1.cachePolicy = policy + thrower2 = self.ThrowingNode( "thrower2" ) + thrower2.cachePolicy = policy - with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower2.out : [\s\S]*Eeek!' ) as raised : - thrower2["out"].getValue() + with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower1.out : [\s\S]*Eeek!' ) as raised : + thrower1["out"].getValue() + + self.assertEqual( raised.exception.plug(), thrower1["out"] ) + + with self.assertRaisesRegex( Gaffer.ProcessException, r'thrower2.out : [\s\S]*Eeek!' ) as raised : + thrower2["out"].getValue() - self.assertEqual( raised.exception.plug(), thrower2["out"] ) + self.assertEqual( raised.exception.plug(), thrower2["out"] ) def testProcessExceptionRespectsNameChanges( self ) : From c64698706626adaf6e6c57c256418c99d8660238 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 23 Aug 2023 17:19:55 +0100 Subject: [PATCH 06/10] Process : Add `acquireCollaborativeResult()` method --- include/Gaffer/Process.h | 27 +++ include/Gaffer/Process.inl | 401 +++++++++++++++++++++++++++++++++++++ src/Gaffer/Process.cpp | 42 +++- 3 files changed, 468 insertions(+), 2 deletions(-) diff --git a/include/Gaffer/Process.h b/include/Gaffer/Process.h index e69eb11f8b1..769e482e540 100644 --- a/include/Gaffer/Process.h +++ b/include/Gaffer/Process.h @@ -101,8 +101,34 @@ class GAFFER_API Process : private ThreadState::Scope /// \todo This just exists for ABI compatibility. Remove it. void handleException(); + /// Searches for an in-flight process and waits for its result, collaborating + /// on any TBB tasks it spawns. If no such process exists, constructs one + /// using `args` and makes it available for collaboration by other threads, + /// publishing the result to a cache when the process completes. + /// + /// Requirements : + /// + /// - `ProcessType( args... )` constructs a process suitable for computing + /// the result for `cacheKey`. + /// - `ProcessType::ResultType` defines the result type for the process. + /// - `ProcessType::run()` does the work for the process and returns the + /// result. + /// - `ProcessType::g_cache` is a static LRUCache of type `ProcessType::CacheType` + /// to be used for the caching of the result. + /// - `ProcessType::cacheCostFunction()` is a static function suitable + /// for use with `CacheType::setIfUncached()`. + /// + template + static typename ProcessType::ResultType acquireCollaborativeResult( + const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments... args + ); + private : + class Collaboration; + template + class TypedCollaboration; + static bool forceMonitoringInternal( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType ); void emitError( const std::string &error, const Plug *source = nullptr ) const; @@ -111,6 +137,7 @@ class GAFFER_API Process : private ThreadState::Scope const Plug *m_plug; const Plug *m_destinationPlug; const Process *m_parent; + const Collaboration *m_collaboration; }; diff --git a/include/Gaffer/Process.inl b/include/Gaffer/Process.inl index c8d8a71861b..3e5f40f9c12 100644 --- a/include/Gaffer/Process.inl +++ b/include/Gaffer/Process.inl @@ -34,9 +34,410 @@ // ////////////////////////////////////////////////////////////////////////// +#include "tbb/concurrent_hash_map.h" +#include "tbb/spin_mutex.h" +#include "tbb/task_arena.h" +#include "tbb/task_group.h" + +#include + namespace Gaffer { +/// Process Graph Overview +/// ====================== +/// +/// > Note : These notes (and the Process design itself) are heavily biased +/// towards ValuePlug and ComputeNode, and their associated ComputeProcess and +/// HashProcess. +/// +/// It's tempting to think that because processes are stack-allocated, they each +/// have a single parent process waiting for them to complete, and that each +/// process is only waiting on a single child. It's also tempting to think that +/// there is a one-to-one correspondence between nodes and processes. +/// +/// Node graph Process graph +/// ---------- ------------- +/// +/// AddNode1 o current process +/// | | +/// AddNode2 o waiting process (lower in stack) +/// | | +/// AddNode3 o waiting process (even lower in stack) +/// +/// While that is true for the simple case shown above, the reality is far more +/// complicated due to contexts, multithreading, task collaboration and hash +/// aliasing. +/// +/// Contexts +/// -------- +/// +/// Processes are operations being performed by a node for a particular plug, in +/// a _particular context_. The topology of the process graph does not +/// correspond directly to the topology of the node graph itself. Rather, the +/// process graph is generated dynamically in response to each process launching +/// upstream processes it depends on. +/// +/// Loop <--- o Loop, loop:index=0 +/// | | | +/// v | o AddNode, loop:index=0 +/// AddNode-- | +/// o Loop, loop:index=1 +/// | +/// o AddNode, loop:index=1 +/// | +/// o ... +/// +/// As this example shows, cyclic _connections_ between plugs are even OK +/// provided that each process launches child _processes_ in a different context, +/// meaning that there are no cyclic dependencies between _processes_. +/// Even in this case, every process has only a single child and a single +/// parent, all living on the stack of a single thread, so the topology of +/// our process graph remains completely linear. But that ends as soon as +/// we consider multithreading. +/// +/// Multithreading +/// -------------- +/// +/// A single process can use TBB tasks to launch many child processes that may +/// each be run on a different thread : +/// +/// Random o o o current processes, one per thread +/// | \ | / +/// Collect o waiting process +/// +/// In this case, a single parent process may be waiting for multiple children +/// to complete. Our simple linear "graph" is now a directed tree (I'm using +/// terminology loosely here, I think the official term would be an +/// "arborescence"). +/// +/// This doesn't present any great obstacle in itself - the only new requirement +/// is that each TBB task scopes the ThreadState from the parent process, so +/// that we can associate the tasks's processes with the correct parent and run them in +/// the correct context. But it does highlight that a parent process may have +/// many children, and that processes may perform arbitrarily expensive amounts +/// of work. +/// +/// Task collaboration +/// ------------------ +/// +/// Now that we know there can be processes in-flight on each thread, we need to +/// consider what happens if two or more threads simultaneously want a result +/// from the same not-yet-run upstream process. Gaffer cannot query the upstream +/// dependencies for a process before launching it, and therefore cannot perform +/// any up-front task scheduling. So in the example below, when two threads are +/// each running their own process and they dynamically turn out to require the +/// same upstream dependency, we need to deal with it dynamically. +/// +/// AddNode1 ? ? +/// / \ | | +/// AddNode2 AddNode3 o o +/// +/// One approach is to simply allow each thread to run their own copy of the +/// process redundantly, and in fact this is a reasonable strategy that we do use +/// for lightweight processes. +/// +/// AddNode1 o o +/// / \ | | +/// AddNode2 AddNode3 o o +/// +/// But where a process is expensive, duplication is not +/// an option. We need to arrange things such that we launch the upstream +/// compute on one thread, and have the other wait for its completion. +/// +/// Collect o +/// / \ / \ < second thread waiting for process +/// AddNode2 AddNode3 o o launched by first thread +/// +/// Ideally we don't want the waiting thread to simply block or spin though, as +/// that quickly reduces to only a single thread doing useful work. Instead we +/// want to provide the facility for waiting threads to _collaborate_, by +/// working on any TBB tasks spawned by the upstream process. We now have a new +/// requirement : we need to track the in-flight processes that are available +/// for collaboration, which we do in `Process::acquireCollaborativeResult()`. +/// And our process graphs can now contain diamond connections at collaboration +/// points, making them general directed acyclic graphs rather than simple +/// trees. +/// +/// Hash aliasing +/// ------------- +/// +/// To track in-flight processes we need a way of identifying them, and we do +/// this using the same key that is used to cache their results. In the case of +/// ComputeProcess, the key is a hash generated by `ComputeNode::hash()`, which +/// must uniquely identify the result of the process. +/// +/// But we have a problem : this hash can _alias_, and indeed it is encouraged +/// to. By aliasing, we mean that two processes can have the same hash provided +/// that they will generate the same result. For example, two different +/// SceneReader nodes will share hashes if they are each reading from the same +/// file. Or two locations within a scene will share hashes if they are known to +/// generate identical objects. In both cases, aliasing the hashes allows us to +/// avoid redundant computes and the creation of redundant cache entries. But this +/// adds complexity to the process graph - through hash aliasing, processes can +/// end up collaborating on nodes they have no actual connection to. +/// +/// Collect1 Collect2 o < Collect1 and Collect2 have the same +/// | | / \ < hash, so Expression2 is now +/// Expression1 Expression2 o o < collaborating on Collect1! +/// +/// Again, this is desirable as it reduces redundant work. But hashes can also +/// alias in less predictable ways. As `ExpressionTest.testHashAliasing` +/// shows, it's possible to create a node network such that a downstream node +/// depends on an upstream node with an _identical hash_. If we attempt process +/// collaboration in this case, we create a cyclic dependency that results in +/// a form of deadlock. +/// +/// Expression1 +/// | +/// Expression2 o----- +/// | | | +/// Expression3 o<---- +/// +/// This is _the_ key problem in our management of threaded collaborative +/// processes. We want node authors to be free to alias hashes without +/// constraint, to reduce redundant computes and cache pressure to the maximum +/// extent possible. But with the right node graph, _any_ aliasing may +/// lead to a cyclic dependency evolving dynamically in the corresponding +/// process graph. +/// +/// In practice, such cyclic dependencies are rare, but not rare enough +/// that we can neglect them completely. Our stragegy is therefore to +/// perform collaboration wherever we can, but to replace it with one +/// additional "redundant" process where collaboration would cause a +/// cycle. +/// +/// Expression1 o < this process has the same hash... +/// | | +/// Expression2 o +/// | | +/// Expression3 o < ...as this one +/// +/// Conceptually this is relatively simple, but it is made trickier by the +/// constantly mutating nature of the process graph. Although all new processes +/// are always added at the leafs of the process "tree", collaboration can insert +/// arbitrary diamond dependencies between existing processes anywhere in the +/// graph, at any time, and from any thread, and our cycle checking must account +/// for this without introducing excessive overhead. +/// +/// > Tip : At this point it is useful to forget about nodes and plugs and +/// connections and to instead consider the process graph largely in the +/// abstract. Processes are vertices in the graph. Dependencies are directed +/// edges between processes. Edge insertion may be attempted anywhere by +/// collaboration at any time, and cycles must be avoided. + +/// A "vertex" in the process graph where collaboration may be performed. We +/// only track collaborative processes because non-collaborative processes can't +/// introduce edges that could lead to cycles. +class Process::Collaboration : public IECore::RefCounted +{ + + public : + + // Work around https://bugs.llvm.org/show_bug.cgi?id=32978 + ~Collaboration() noexcept( true ) override; + + IE_CORE_DECLAREMEMBERPTR( Collaboration ); + + // Arena and task group used to allow waiting threads to participate + // in collaborative work. + tbb::task_arena arena; + tbb::task_group taskGroup; + + using Set = std::unordered_set; + static const Collaboration::Set emptySet; // TODO : MAKE PRIVATE? OR REMOVE? + + // Collaborations depending directly on this one. + Set dependents; + + // Returns true if this collaboration depends on `collaboration`, either + // directly or indirectly via other collaborations it depends on. + // The caller of this function must hold `g_dependentsMutex`. + bool dependsOn( const Collaboration *collaboration ) const; + + // Protects access to `dependents` on _all_ Collaborations. + static tbb::spin_mutex g_dependentsMutex; + +}; + +/// Collaboration subclass specific to a single type of process, providing storage for the result +/// and tracking of the currently in-flight collaborations by cache key. +/// +/// > Note : We track dependencies between all types of collaboration, not just between like types. +template +class Process::TypedCollaboration : public Process::Collaboration +{ + public : + + std::optional result; + + IE_CORE_DECLAREMEMBERPTR( TypedCollaboration ); + + using PendingCollaborations = tbb::concurrent_hash_map>; // TODO : WHY FLAT_SET? + static PendingCollaborations g_pendingCollaborations; + +}; + +template +typename Process::TypedCollaboration::PendingCollaborations Process::TypedCollaboration::g_pendingCollaborations; + +template +typename ProcessType::ResultType Process::acquireCollaborativeResult( + const typename ProcessType::CacheType::KeyType &cacheKey, ProcessArguments... args +) +{ + const ThreadState &threadState = ThreadState::current(); // TODO : PASS IN? + const Collaboration *currentCollaboration = threadState.process() ? threadState.process()->m_collaboration : nullptr; + + // Check for any in-flight computes for the same cache key. If we find a + // suitable one, we'll wait for it and use its result. + + using CollaborationType = TypedCollaboration; + using CollaborationTypePtr = typename CollaborationType::Ptr; + + typename CollaborationType::PendingCollaborations::accessor accessor; + CollaborationType::g_pendingCollaborations.insert( accessor, cacheKey ); + + for( const auto &candidate : accessor->second ) + { + // Check to see if we can safely collaborate on `candidate` without + // risking deadlock. We optimistically perform the cheapest checks + // first; if we're not already in a collaboration, or if the + // collaboration we're in already depends on the candidate (via another + // thread of execution) then we're good to go. + // + // The call to `candidate->dependents.find()` is safe even though we + // don't hold `g_dependentsMutex`, because we hold the accessor for + // `candidate`, and that is always held by any writer of + // `candidate->dependents`. + + // TODO : NOT SURE IT'S SAFE TO CALL `dependOn()` IF THE COLLABORATION HAS ALREADY + // COMPLETED. ONCE COMPLETE, DEPENDENTS MAY NO LONGER BE ALIVE, SO WE CAN'T TRAVERSE + // THEM. EITHER NEED TO CHECK FOR COMPLETION, OR REMOVE FROM DEPENDENTS AFTER WE'RE + // DONE WAITING. THE LATTER SEEMS POINTLESS? COULD WE JUST CLEAR DEPENDENTS AT SOME POINT? + if( currentCollaboration && candidate->dependents.find( currentCollaboration ) == candidate->dependents.end() ) + { + // Perform much more expensive check for potential deadlock - we + // mustn't become a dependent of `candidate` if it already depends + // on us. This requires traversing all dependents of + // `currentCollaboration` while holding `g_dependentsMutex` (so they + // can't be modified while we read). + tbb::spin_mutex::scoped_lock dependentsLock( Collaboration::g_dependentsMutex ); + if( !candidate->dependsOn( currentCollaboration ) ) + { + // We're safe to collaborate. Add ourself as a dependent before + // releasing `g_dependentsMutex`. + candidate->dependents.insert( currentCollaboration ); + } + else + { + continue; + } + } + + // We've found an in-flight process we can wait on without causing + // deadlock. Join its `task_arena` and wait on the result, so we get + // to work on any TBB tasks it has created. We need to own a reference + // to this because the thread that created it will drop its own + // reference before we are done using it. + + CollaborationTypePtr collaboration = candidate; + accessor.release(); + + collaboration->arena.execute( + [&]{ return collaboration->taskGroup.wait(); } + ); + + if( collaboration->result ) + { + // NOTE : CURRENTCOLLABORATION CAN DIE IMMEDIATELY NOW, BUT IS STILL IN DEPENDENTS. + // CAN `WAIT()` RETURN BEFORE `RUN_AND_WAIT()`? YEEESS. BUT IT CAN'T RETURN BEFORE + // THE FUNCTOR GIVEN TO `RUN_AND_WAIT()` COMPLETES. + return *collaboration->result; + } + else + { + throw IECore::Exception( "Collaboration ended with null value" ); // TODO : BETTER WORDING. WHEN DO WE GET HERE ANYWAY? FROM CANCELLATION? COMPUTES THAT DON'T SET? + } + } + + // No suitable in-flight collaborations, so we'll create one of our own. + // First though, check the cache one more time, in case another thread has + // started and finished an equivalent collaboration since we first checked. + + if( auto result = ProcessType::g_cache.getIfCached( cacheKey ) ) + { + return *result; + } + + CollaborationTypePtr collaboration = new CollaborationType; + if( currentCollaboration ) + { + // No need to hold `m_dependentsMutex` here because other threads can't + // access `collaboration->dependents` until we publish it and release + // the accessor. + collaboration->dependents.insert( currentCollaboration ); + } + + std::exception_ptr exception; + + auto status = collaboration->arena.execute( + [&] { + return collaboration->taskGroup.run_and_wait( + [&] { + // Publish ourselves so that other threads can collaborate + // by waiting on `pendingResult->taskGroup`. + accessor->second.insert( collaboration ); + accessor.release(); + + try + { + // TODO : FORWARD ARGUMENTS? + ProcessType process( args... ); + process.m_collaboration = collaboration.get(); + collaboration->result = process.run(); + // Publish result to cache before we remove ourself from + // `g_pendingCollaborations`, so that other threads will + // be able to get the result one way or the other. + ProcessType::g_cache.setIfUncached( + cacheKey, *collaboration->result, + ProcessType::cacheCostFunction + ); + } + catch( ... ) + { + // Don't allow `task_group::wait()` to see exceptions, + // because then we'd hit a thread-safety bug in + // `tbb::task_group_context::reset()`. + exception = std::current_exception(); + } + + // Now we're done, remove `collaboration` from the pending collaborations. + [[maybe_unused]] const bool found = CollaborationType::g_pendingCollaborations.find( accessor, cacheKey ); + assert( found ); + accessor->second.erase( collaboration ); + if( accessor->second.empty() ) + { + CollaborationType::g_pendingCollaborations.erase( accessor ); + } + accessor.release(); + } + ); + } + ); + + if( exception ) + { + std::rethrow_exception( exception ); + } + else if( status == tbb::task_group_status::canceled ) + { + throw IECore::Cancelled(); + } + + return *collaboration->result; +} + inline bool Process::forceMonitoring( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType ) { if( s.m_mightForceMonitoring ) diff --git a/src/Gaffer/Process.cpp b/src/Gaffer/Process.cpp index 51a98d249d9..456e7ddb32b 100644 --- a/src/Gaffer/Process.cpp +++ b/src/Gaffer/Process.cpp @@ -71,15 +71,53 @@ std::string prefixedWhat( const IECore::Exception &e ) } // namespace +////////////////////////////////////////////////////////////////////////// +// Collaboration +////////////////////////////////////////////////////////////////////////// + +Process::Collaboration::~Collaboration() noexcept( true ) +{ +} + +bool Process::Collaboration::dependsOn( const Collaboration *collaboration ) const +{ + if( collaboration == this ) // TODO : WHY DO WE HAVE THIS???? + { + return true; + } + + std::unordered_set visited; + std::deque toVisit( { collaboration } ); + + while( !toVisit.empty() ) + { + const Collaboration *c = toVisit.front(); + toVisit.pop_front(); + if( !visited.insert( c ).second ) + { + continue; + } + if( c->dependents.count( this ) ) + { + return true; + } + toVisit.insert( toVisit.end(), c->dependents.begin(), c->dependents.end() ); + } + + return false; +} + +tbb::spin_mutex Process::Collaboration::g_dependentsMutex; + ////////////////////////////////////////////////////////////////////////// // Process ////////////////////////////////////////////////////////////////////////// Process::Process( const IECore::InternedString &type, const Plug *plug, const Plug *destinationPlug ) - : m_type( type ), m_plug( plug ), m_destinationPlug( destinationPlug ? destinationPlug : plug ) + : m_type( type ), m_plug( plug ), m_destinationPlug( destinationPlug ? destinationPlug : plug ), + m_parent( m_threadState->m_process ), m_collaboration( m_parent ? m_parent->m_collaboration : nullptr ) { IECore::Canceller::check( context()->canceller() ); - m_parent = m_threadState->m_process; m_threadState->m_process = this; for( const auto &m : *m_threadState->m_monitors ) From 580eeac21bcf2839f524d6b2d429eb29592c37a9 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Tue, 3 Oct 2023 10:57:08 +0100 Subject: [PATCH 07/10] ProcessTest : Add testing for `acquireCollaborativeResult()` --- python/GafferTest/ProcessTest.py | 321 ++++++++++++++++++++++ python/GafferTest/__init__.py | 1 + src/GafferTestModule/GafferTestModule.cpp | 2 + src/GafferTestModule/ProcessTest.cpp | 198 +++++++++++++ src/GafferTestModule/ProcessTest.h | 44 +++ 5 files changed, 566 insertions(+) create mode 100644 python/GafferTest/ProcessTest.py create mode 100644 src/GafferTestModule/ProcessTest.cpp create mode 100644 src/GafferTestModule/ProcessTest.h diff --git a/python/GafferTest/ProcessTest.py b/python/GafferTest/ProcessTest.py new file mode 100644 index 00000000000..82ea0f17ec4 --- /dev/null +++ b/python/GafferTest/ProcessTest.py @@ -0,0 +1,321 @@ +########################################################################## +# +# Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided with +# the distribution. +# +# * Neither the name of John Haddon nor the names of +# any other contributors to this software may be used to endorse or +# promote products derived from this software without specific prior +# written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +########################################################################## + +import unittest + +import IECore + +import Gaffer +import GafferTest + +class ProcessTest( GafferTest.TestCase ) : + + def setUp( self ) : + + GafferTest.TestCase.setUp( self ) + GafferTest.clearTestProcessCache() + + def testCollaboration( self ) : + + # We expect processes `1...n` to collaborate on + # process 1000000. + # + # n+1 + # / | \ + # 1 ... n + # \ | / + # 0 + + n = 10000 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { x : { n + 1 : {} } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 ) + + def testCollaborationFromNonCollaborativeProcesses( self ) : + + # As above, but the waiting processes are not themselves collaborative. + # + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 + + n = 100000 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { -x : { 1 : {} } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 ) + + def testNoCollaborationOnRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 10 + # | + # 10 + # | + # 10 + # | + # 10 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 1, { 10 : { 10 : { 10 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + def testNoCollaborationOnIndirectRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 1 + # | + # 2 + # | + # 1 + # | + # 0 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 0, { 1 : { 2 : { 1 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + def testNonCollaborativeProcessWithinRecursion( self ) : + + # As above, but using a non-collaborative task in the middle of the recursion. + # + # 1 + # | + # -2 + # | + # 1 + # | + # 0 + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( plug, 0, { 1 : { -2 : { 1 : {} } } } ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 4 ) + + def testNoCollaborationOnDiamondRecursion( self ) : + + # We don't expect any collaboration, because it would lead to + # deadlock. + # + # 1 + # | + # 3 + # / \ + # 1 2 + # \ / + # 0 + + plug = Gaffer.Plug() + + for i in range( 0, 100 ) : + GafferTest.clearTestProcessCache() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { + 1 : { 3 : { 1 : {} } }, + 2 : { 3 : { 1 : {} } } + } + ) + + # There are various possibilities for execution, based on different + # thread timings. + # + # - The `0-2-3-1` branch completes first, so `1` is already cached by + # the time the `0-1` branch wants it. 4 computes total. + # - The `0-1-3-1` branch completes first, with a duplicate compute for + # `1` to avoid deadlock. 5 computes total. + # - The `0-2-3` branch waits on `1` from the `0-1` branch. The `0-1` + # branch performs duplicate computes for `3` and `1` to avoid deadlock. + # 6 computes total. + self.assertIn( monitor.plugStatistics( plug ).computeCount, { 4, 5, 6 } ) + + def testNoCollaborationOnIndirectDiamondRecursion( self ) : + + # As above, but with an additional process (4), meaning we have + # to track non-immediate dependencies between processes. + # + # 1 + # | + # 4 + # | + # 3 + # / \ + # 1 2 + # \ / + # 0 + + plug = Gaffer.Plug() + + for i in range( 0, 100 ) : + GafferTest.clearTestProcessCache() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.runTestProcess( + plug, 0, + { + 1 : { 3 : { 4 : { 1 : {} } } }, + 2 : { 3 : { 4 : { 1 : {} } } }, + } + ) + + self.assertIn( monitor.plugStatistics( plug ).computeCount, { 5, 6, 8 } ) + + @GafferTest.TestRunner.PerformanceTestMethod() + def testFanOutGatherPerformance( self ) : + + # Pathological case for cycle checking - huge permutations + # of paths through the downstream graph. + # + # 0 + # / | \ + # 1 2 3 + # \ | / + # 4 + # / | \ + # 5 6 7 + # \ | / + # 8 + # / | \ + # 9 10 11 + # \ | / + # 12 (for width 3 and depth 3) + + width = 64 + depth = 10 + + dependencies = {} + i = 0 + for d in range( 0, depth ) : + dependencies = { i : dependencies } + i += 1 + dependencies = { w : dependencies for w in range( i, i + width ) } + i += width + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + GafferTest.clearTestProcessCache() + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, i, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, i + 1 ) + + @GafferTest.TestRunner.PerformanceTestMethod() + def testCollaborationPerformance( self ) : + + # -(n+1)...-2n + # \ | / + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 + + n = 100000 + + upstreamDependencies = { -x : {} for x in range( n + 1, 2 * n + 1 ) } + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( + plug, 0, + { -x : { 1 : upstreamDependencies } for x in range( 1, n + 1 ) } + ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, 1 + n + 1 + n ) + + @GafferTest.TestRunner.PerformanceTestMethod() + def testCollaborationTransitionPerformance( self ) : + + # Case we think is probably pretty common - all threads need to migrate + # through a series of collaborations. + # + # 3 + # / | \ + # -(2n+1)...-3n + # \ | / + # 2 + # / | \ + # -(n+1)...-2n + # \ | / + # 1 + # / | \ + # -1 ... -n + # \ | / + # 0 (for depth 3) + + n = IECore.hardwareConcurrency() + depth = 1000 + + dependencies = {} + for d in range( depth, 0, -1 ) : + dependencies = { -x : { d : dependencies } for x in range( (d-1) * n + 1, d * n + 1 ) } + + GafferTest.clearTestProcessCache() + + plug = Gaffer.Plug() + with Gaffer.PerformanceMonitor() as monitor : + with GafferTest.TestRunner.PerformanceScope() : + GafferTest.runTestProcess( plug, 0, dependencies ) + + self.assertEqual( monitor.plugStatistics( plug ).computeCount, depth * ( n + 1 ) + 1 ) + +if __name__ == "__main__": + unittest.main() diff --git a/python/GafferTest/__init__.py b/python/GafferTest/__init__.py index 12f7263f04b..24d0b0dd4ac 100644 --- a/python/GafferTest/__init__.py +++ b/python/GafferTest/__init__.py @@ -159,6 +159,7 @@ def inCI( platforms = set() ) : from .OptionalValuePlugTest import OptionalValuePlugTest from .ThreadMonitorTest import ThreadMonitorTest from .CollectTest import CollectTest +from .ProcessTest import ProcessTest from .IECorePreviewTest import * diff --git a/src/GafferTestModule/GafferTestModule.cpp b/src/GafferTestModule/GafferTestModule.cpp index 24c54280e4c..46f1d1d04b3 100644 --- a/src/GafferTestModule/GafferTestModule.cpp +++ b/src/GafferTestModule/GafferTestModule.cpp @@ -50,6 +50,7 @@ #include "ValuePlugTest.h" #include "MessagesTest.h" #include "MetadataTest.h" +#include "ProcessTest.h" #include "SignalsTest.h" #include "IECorePython/ScopedGILRelease.h" @@ -116,6 +117,7 @@ BOOST_PYTHON_MODULE( _GafferTest ) bindValuePlugTest(); bindMessagesTest(); bindSignalsTest(); + bindProcessTest(); object module( borrowed( PyImport_AddModule( "GafferTest._MetadataTest" ) ) ); scope().attr( "_MetadataTest" ) = module; diff --git a/src/GafferTestModule/ProcessTest.cpp b/src/GafferTestModule/ProcessTest.cpp new file mode 100644 index 00000000000..e327deeb3a5 --- /dev/null +++ b/src/GafferTestModule/ProcessTest.cpp @@ -0,0 +1,198 @@ +////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials provided with +// the distribution. +// +// * Neither the name of John Haddon nor the names of +// any other contributors to this software may be used to endorse or +// promote products derived from this software without specific prior +// written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +////////////////////////////////////////////////////////////////////////// + +#include "boost/python.hpp" + +#include "ProcessTest.h" + +#include "GafferTest/Assert.h" + +#include "Gaffer/Context.h" +#include "Gaffer/NumericPlug.h" +#include "Gaffer/TypedObjectPlug.h" +#include "Gaffer/Private/IECorePreview/LRUCache.h" +#include "Gaffer/Process.h" + +#include "IECorePython/ScopedGILRelease.h" + +#include "tbb/parallel_for_each.h" + +using namespace boost::python; +using namespace IECore; +using namespace Gaffer; + +namespace +{ + +struct Dependencies : public IECore::RefCounted +{ + IE_CORE_DECLAREMEMBERPTR( Dependencies ); + using Map = std::map; + Map map; +}; + +// Test Process +// ============ +// +// This Process subclass is used primarily to test the collaboration mechanism +// provided by `Process::acquireCollaborativeResult()`. The result is an integer +// which is given to the TestProcess directly, and which also provides the cache +// key. The upstream dependencies are also given verbatim to TestProcess as a +// nested dictionary of integers mapping from the result for each dependency to +// the dictionary for _its_ upstream dependencies. Non-negative results are +// computed using `acquireCollaborativeResult()` and negative results are +// computed by constructing a TestProcess directly. This mechanism lets us +// create a variety of process graphs very explicitly from ProcessTestCase. +class TestProcess : public Process +{ + + public : + + TestProcess( const Plug *plug, const Dependencies::ConstPtr &dependencies ) + : Process( g_staticType, plug, plug ), m_dependencies( dependencies ) + { + } + + using ResultType = int; + + ResultType run() const + { + const ThreadState &threadState = ThreadState::current(); + tbb::task_group_context taskGroupContext( tbb::task_group_context::isolated ); + + tbb::parallel_for_each( + m_dependencies->map.begin(), m_dependencies->map.end(), + [&] ( const Dependencies::Map::value_type &dependency ) { + Context::EditableScope context( threadState ); + const int expectedResult = dependency.first; + context.set( g_resultVariable, &expectedResult ); + int actualResult; + if( expectedResult >= 0 ) + { + actualResult = Process::acquireCollaborativeResult( expectedResult, plug(), dependency.second ); + } + else + { + actualResult = TestProcess( plug(), dependency.second ).run(); + } + GAFFERTEST_ASSERTEQUAL( actualResult, expectedResult ); + }, + taskGroupContext + ); + + return context()->get( g_resultVariable ); // TODO : IS IT EVEN WORTH USING THE CONTEXT???? + } + + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; + + static size_t cacheCostFunction( int value ) + { + return 1; + } + + private : + + const Dependencies::ConstPtr m_dependencies; + + static const IECore::InternedString g_staticType; + static const IECore::InternedString g_resultVariable; + +}; + +TestProcess::CacheType TestProcess::g_cache( TestProcess::CacheType::GetterFunction(), 100000 ); +// Spoof type so that we can use PerformanceMonitor to check we get the processes we expect in ProcessTest.py. +const IECore::InternedString TestProcess::g_staticType( "computeNode:compute" ); +const IECore::InternedString TestProcess::g_resultVariable( "testProcess:result" ); + +Dependencies::ConstPtr dependenciesFromDict( dict dependenciesDict, std::unordered_map &converted ) +{ + auto it = converted.find( dependenciesDict.ptr() ); + if( it != converted.end() ) + { + return it->second; + } + + Dependencies::Ptr result = new Dependencies; + auto items = dependenciesDict.items(); + for( size_t i = 0, l = len( items ); i < l; ++i ) + { + int v = extract( items[i][0] ); + dict d = extract( items[i][1] ); + result->map[v] = dependenciesFromDict( d, converted ); + } + + converted[dependenciesDict.ptr()] = result; + return result; +} + +// TODO : REMOVE +// void printDependencies( const Dependencies::ConstPtr &d, int depth = 0 ) +// { +// for( auto &c : d->map ) +// { +// for( int i = 0; i < depth; ++i ) +// { +// std::cerr << " "; +// } +// std::cerr << c.first << " : " << std::endl; +// printDependencies( c.second.get(), depth + 1 ); +// } +// } + +void runTestProcess( const Plug *plug, int expectedResult, dict dependenciesDict ) +{ + std::unordered_map convertedDependencies; + Dependencies::ConstPtr dependencies = dependenciesFromDict( dependenciesDict, convertedDependencies ); + + Context::EditableScope context( Context::current() ); + context.set( "testProcess:result", &expectedResult ); // TODO : IS THERE ANY POINT IN THIS PART? MAYBE TO HELP WITH MONITORING? + int result = TestProcess( plug, dependencies ).run(); + GAFFERTEST_ASSERTEQUAL( result, expectedResult ); +} + +void clearTestProcessCache() +{ + TestProcess::g_cache.clear(); +} + +} // namespace + +void GafferTestModule::bindProcessTest() +{ + def( "runTestProcess", &runTestProcess ); + def( "clearTestProcessCache", &clearTestProcessCache ); +} diff --git a/src/GafferTestModule/ProcessTest.h b/src/GafferTestModule/ProcessTest.h new file mode 100644 index 00000000000..1975e4a1cfc --- /dev/null +++ b/src/GafferTestModule/ProcessTest.h @@ -0,0 +1,44 @@ +////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above +// copyright notice, this list of conditions and the following +// disclaimer. +// +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following +// disclaimer in the documentation and/or other materials provided with +// the distribution. +// +// * Neither the name of John Haddon nor the names of +// any other contributors to this software may be used to endorse or +// promote products derived from this software without specific prior +// written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +////////////////////////////////////////////////////////////////////////// + +#pragma once + +namespace GafferTestModule +{ + +void bindProcessTest(); + +} // namespace GafferTestModule From 84eb7fe293b398447f123f68129fe3f61040c5d9 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Wed, 23 Aug 2023 17:20:13 +0100 Subject: [PATCH 08/10] ValuePlug : Use `Process::acquireCollaborativeResult()` --- src/Gaffer/ValuePlug.cpp | 538 ++++++++++++++++----------------------- 1 file changed, 223 insertions(+), 315 deletions(-) diff --git a/src/Gaffer/ValuePlug.cpp b/src/Gaffer/ValuePlug.cpp index 16ed54e9ffd..a89a22c3380 100644 --- a/src/Gaffer/ValuePlug.cpp +++ b/src/Gaffer/ValuePlug.cpp @@ -52,6 +52,7 @@ #include "fmt/format.h" #include +#include using namespace Gaffer; @@ -118,6 +119,28 @@ struct HashCacheKey return other.plug == plug && other.contextHash == contextHash && dirtyCount == other.dirtyCount; } + // TODO : REMOVE. WE DON'T WANT TO USE A MAP ANYWAY. + bool operator < ( const HashCacheKey &other ) const + { + if( plug < other.plug ) + { + return true; + } + else if( other.plug < plug ) + { + return false; + } + else if( contextHash < other.contextHash ) + { + return true; + } + else if( other.contextHash < contextHash ) + { + return false; + } + return dirtyCount < other.dirtyCount; + } + /// \todo Could we merge all three fields into a single /// MurmurHash, or would hash collisions be too likely? const ValuePlug *plug; @@ -139,41 +162,9 @@ size_t hash_value( const HashCacheKey &key ) return result; } -// Derives from HashCacheKey, adding on everything we need to -// construct a HashProcess. We access our caches via this augmented -// key so that we have all the information we need in our getter -// functions. -// -// Note the requirements the LRUCache places on a GetterKey like this : -// "it must be implicitly castable to Key, and all GetterKeys -// which yield the same Key must also yield the same results -// from the GetterFunction". We meet these requirements as follows : -// -// - `computeNode` and `cachePolicy` are properties of the plug which -// is included in the HashCacheKey. We store them explicitly only -// for convenience and performance. -// - `context` is represented in HashCacheKey via `contextHash`. -// - `destinationPlug` does not influence the results of the computation -// in any way. It is merely used for error reporting. -struct HashProcessKey : public HashCacheKey +size_t tbb_hasher( const HashCacheKey &key ) // TODO : SHOULD MAYBE BE STD::HASH? { - HashProcessKey( const ValuePlug *plug, const ValuePlug *destinationPlug, const Context *context, uint64_t dirtyCount, const ComputeNode *computeNode, ValuePlug::CachePolicy cachePolicy ) - : HashCacheKey( plug, context, dirtyCount ), - destinationPlug( destinationPlug ), - computeNode( computeNode ), - cachePolicy( cachePolicy ) - { - } - - const ValuePlug *destinationPlug; - const ComputeNode *computeNode; - const ValuePlug::CachePolicy cachePolicy; -}; - -// Avoids LRUCache overhead for non-collaborative policies. -bool spawnsTasks( const HashProcessKey &key ) -{ - return key.cachePolicy == ValuePlug::CachePolicy::TaskCollaboration; + return hash_value( key ); } ValuePlug::HashCacheMode defaultHashCacheMode() @@ -210,6 +201,8 @@ class ValuePlug::HashProcess : public Process public : + // Interface used by ValuePlug. + static IECore::MurmurHash hash( const ValuePlug *plug ) { const ValuePlug *p = sourcePlug( plug ); @@ -244,83 +237,114 @@ class ValuePlug::HashProcess : public Process const ComputeNode *computeNode = IECore::runTimeCast( p->node() ); const ThreadState &threadState = ThreadState::current(); const Context *currentContext = threadState.context(); - const HashProcessKey processKey( p, plug, currentContext, p->m_dirtyCount, computeNode, computeNode ? computeNode->hashCachePolicy( p ) : CachePolicy::Uncached ); + const CachePolicy cachePolicy = computeNode ? computeNode->hashCachePolicy( p ) : CachePolicy::Uncached; - if( processKey.cachePolicy == CachePolicy::Uncached ) + if( cachePolicy == CachePolicy::Uncached ) { - HashProcess process( processKey ); - return process.m_result; + return HashProcess( p, plug, computeNode ).run(); } - else if( Process::forceMonitoring( threadState, plug, ValuePlug::HashProcess::staticType ) ) + + // Perform any pending adjustments to our thread-local cache. + + ThreadData &threadData = g_threadData.local(); + if( threadData.clearCache.load( std::memory_order_acquire ) ) { - HashProcess process( processKey ); - auto costFunction = [] ( const IECore::MurmurHash &key ) { return 1; }; - if( - processKey.cachePolicy == CachePolicy::TaskCollaboration || - processKey.cachePolicy == CachePolicy::TaskIsolation - ) - { - g_globalCache.setIfUncached( processKey, process.m_result, costFunction ); - } - else - { - ThreadData &threadData = g_threadData.local(); - threadData.cache.setIfUncached( processKey, process.m_result, costFunction ); - } - return process.m_result; + threadData.cache.clear(); + threadData.clearCache.store( 0, std::memory_order_release ); } - else + + if( threadData.cache.getMaxCost() != g_cacheSizeLimit ) { - // Perform any pending adjustments to our thread-local cache. + threadData.cache.setMaxCost( g_cacheSizeLimit ); + } + + // Then get our hash. We do this using this `acquireHash()` functor so that + // we can repeat the process for `Checked` mode. + + const bool forceMonitoring = Process::forceMonitoring( threadState, plug, staticType ); + + auto acquireHash = [&]( const HashCacheKey &cacheKey ) { - ThreadData &threadData = g_threadData.local(); - if( threadData.clearCache.load( std::memory_order_acquire ) ) + // Before we use this key, check that the dirty count hasn't maxed out + if( + cacheKey.dirtyCount == DIRTY_COUNT_RANGE_MAX || + cacheKey.dirtyCount == DIRTY_COUNT_RANGE_MAX + 1 + DIRTY_COUNT_RANGE_MAX + ) { - threadData.cache.clear(); - threadData.clearCache.store( 0, std::memory_order_release ); + throw IECore::Exception( "Dirty count exceeded max. Either you've left Gaffer running for 100 million years, or a strange bug is incrementing dirty counts way too fast." ); } - if( threadData.cache.getMaxCost() != g_cacheSizeLimit ) + // Check for an already-cached value in our thread-local cache, and return it if we have one. + if( !forceMonitoring ) { - threadData.cache.setMaxCost( g_cacheSizeLimit ); + if( auto result = threadData.cache.getIfCached( cacheKey ) ) + { + return *result; + } } - // And then look up the result in our cache. - if( g_hashCacheMode == HashCacheMode::Standard ) + // No value in local cache, so either compute it directly or get it via + // the global cache if it's expensive enough to warrant collaboration. + IECore::MurmurHash result; + if( cachePolicy == CachePolicy::Legacy || cachePolicy == CachePolicy::Standard ) { - return threadData.cache.get( processKey, currentContext->canceller() ); + result = HashProcess( p, plug, computeNode ).run(); } - else if( g_hashCacheMode == HashCacheMode::Checked ) + else { - HashProcessKey legacyProcessKey( processKey ); - legacyProcessKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; - - const IECore::MurmurHash check = threadData.cache.get( legacyProcessKey, currentContext->canceller() ); - const IECore::MurmurHash result = threadData.cache.get( processKey, currentContext->canceller() ); - - if( result != check ) + std::optional cachedValue; + if( !forceMonitoring ) + { + cachedValue = g_cache.getIfCached( cacheKey ); + } + if( cachedValue ) { - // This isn't exactly a process exception, but we want to treat it the same, in - // terms of associating it with a plug. Creating a ProcessException is the simplest - // approach, which can be done by throwing and then immediately wrapping - try - { - throw IECore::Exception( "Detected undeclared dependency. Fix DependencyNode::affects() implementation." ); - } - catch( ... ) - { - ProcessException::wrapCurrentException( processKey.plug, currentContext, staticType ); - } + result = *cachedValue; + } + else + { + result = Process::acquireCollaborativeResult( cacheKey, p, plug, computeNode ); } - return result; } - else + // Update local cache and return result + threadData.cache.setIfUncached( cacheKey, result, cacheCostFunction ); + return result; + }; + + const HashCacheKey cacheKey( p, currentContext, p->m_dirtyCount ); + if( g_hashCacheMode == HashCacheMode::Standard ) + { + return acquireHash( cacheKey ); + } + else if( g_hashCacheMode == HashCacheMode::Checked ) + { + HashCacheKey legacyCacheKey( cacheKey ); + legacyCacheKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; + const IECore::MurmurHash check = acquireHash( legacyCacheKey ); + const IECore::MurmurHash result = acquireHash( cacheKey ); + + if( result != check ) { - // HashCacheMode::Legacy - HashProcessKey legacyProcessKey( processKey ); - legacyProcessKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; - return threadData.cache.get( legacyProcessKey, currentContext->canceller() ); + // This isn't exactly a process exception, but we want to treat it the same, in + // terms of associating it with a plug. Creating a ProcessException is the simplest + // approach, which can be done by throwing and then immediately wrapping. + try + { + throw IECore::Exception( "Detected undeclared dependency. Fix DependencyNode::affects() implementation." ); + } + catch( ... ) + { + ProcessException::wrapCurrentException( p, currentContext, staticType ); + } } + return result; + } + else + { + // HashCacheMode::Legacy + HashCacheKey legacyCacheKey( cacheKey ); + legacyCacheKey.dirtyCount = g_legacyGlobalDirtyCount + DIRTY_COUNT_RANGE_MAX + 1; + return acquireHash( legacyCacheKey ); } } @@ -332,12 +356,12 @@ class ValuePlug::HashProcess : public Process static void setCacheSizeLimit( size_t maxEntriesPerThread ) { g_cacheSizeLimit = maxEntriesPerThread; - g_globalCache.setMaxCost( g_cacheSizeLimit ); + g_cache.setMaxCost( g_cacheSizeLimit ); } static void clearCache( bool now = false ) { - g_globalCache.clear(); + g_cache.clear(); // It's not documented explicitly, but it is safe to iterate over an // `enumerable_thread_specific` while `local()` is being called on // other threads, because the underlying container is a @@ -364,7 +388,7 @@ class ValuePlug::HashProcess : public Process static size_t totalCacheUsage() { - size_t usage = g_globalCache.currentCost(); + size_t usage = g_cache.currentCost(); tbb::enumerable_thread_specific::iterator it, eIt; for( it = g_threadData.begin(), eIt = g_threadData.end(); it != eIt; ++it ) { @@ -399,32 +423,33 @@ class ValuePlug::HashProcess : public Process static const IECore::InternedString staticType; - private : + // Interface required by `Process::acquireCollaborativeResult()`. + + HashProcess( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode ) + : Process( staticType, plug, destinationPlug ), m_computeNode( computeNode ) + { + } + + using ResultType = IECore::MurmurHash; - HashProcess( const HashProcessKey &key ) - : Process( staticType, key.plug, key.destinationPlug ) + ResultType run() const { try { - if( !key.computeNode ) + if( !m_computeNode ) { throw IECore::Exception( "Plug has no ComputeNode." ); } - // Before we use this key, check that the dirty count hasn't maxed out - if( - key.dirtyCount == DIRTY_COUNT_RANGE_MAX || - key.dirtyCount == DIRTY_COUNT_RANGE_MAX + 1 + DIRTY_COUNT_RANGE_MAX ) - { - throw IECore::Exception( "Dirty count exceeded max. Either you've left Gaffer running for 100 million years, or a strange bug is incrementing dirty counts way too fast." ); - } + IECore::MurmurHash result; + m_computeNode->hash( static_cast( plug() ), context(), result ); - key.computeNode->hash( key.plug, context(), m_result ); - - if( m_result == g_nullHash ) + if( result == g_nullHash ) { throw IECore::Exception( "ComputeNode::hash() not implemented." ); } + + return result; } catch( ... ) { @@ -432,76 +457,27 @@ class ValuePlug::HashProcess : public Process } } - static IECore::MurmurHash globalCacheGetter( const HashProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) - { - // Canceller will be passed to `ComputeNode::hash()` implicitly - // via the context. - assert( canceller == Context::current()->canceller() ); - cost = 1; - IECore::MurmurHash result; - switch( key.cachePolicy ) - { - case CachePolicy::TaskCollaboration : - { - HashProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskIsolation : - { - tbb::this_task_arena::isolate( - [&result, &key] { - HashProcess process( key ); - result = process.m_result; - } - ); - break; - } - default : - // Cache policy not valid for global cache. - assert( false ); - break; - } + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; - return result; - } - - static IECore::MurmurHash localCacheGetter( const HashProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) + static size_t cacheCostFunction( const IECore::MurmurHash &value ) { - assert( canceller == Context::current()->canceller() ); - cost = 1; - switch( key.cachePolicy ) - { - case CachePolicy::TaskCollaboration : - case CachePolicy::TaskIsolation : - return g_globalCache.get( key, canceller ); - default : - { - assert( key.cachePolicy != CachePolicy::Uncached ); - HashProcess process( key ); - return process.m_result; - } - } + return 1; } - // Global cache. We use this for heavy hash computations that will spawn subtasks, - // so that the work and the result is shared among all threads. - using GlobalCache = IECorePreview::LRUCache; - static GlobalCache g_globalCache; - static std::atomic g_legacyGlobalDirtyCount; + private : + const ComputeNode *m_computeNode; + static std::atomic g_legacyGlobalDirtyCount; static HashCacheMode g_hashCacheMode; - // Per-thread cache. This is our default cache, used for hash computations that are - // presumed to be lightweight. Using a per-thread cache limits the contention among - // threads. - using Cache = IECorePreview::LRUCache; - struct ThreadData { - ThreadData() : cache( localCacheGetter, g_cacheSizeLimit, Cache::RemovalCallback(), /* cacheErrors = */ false ), clearCache( 0 ) {} - Cache cache; + // Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. + ThreadData() : cache( CacheType::GetterFunction(), g_cacheSizeLimit, CacheType::RemovalCallback(), /* cacheErrors = */ false ), clearCache( 0 ) {} + using CacheType = IECorePreview::LRUCache; + CacheType cache; // Flag to request that hashCache be cleared. std::atomic_int clearCache; }; @@ -509,15 +485,14 @@ class ValuePlug::HashProcess : public Process static tbb::enumerable_thread_specific, tbb::ets_key_per_instance > g_threadData; static std::atomic_size_t g_cacheSizeLimit; - IECore::MurmurHash m_result; - }; const IECore::InternedString ValuePlug::HashProcess::staticType( ValuePlug::hashProcessType() ); tbb::enumerable_thread_specific, tbb::ets_key_per_instance > ValuePlug::HashProcess::g_threadData; // Default limit corresponds to a cost of roughly 25Mb per thread. std::atomic_size_t ValuePlug::HashProcess::g_cacheSizeLimit( 128000 ); -ValuePlug::HashProcess::GlobalCache ValuePlug::HashProcess::g_globalCache( globalCacheGetter, g_cacheSizeLimit, Cache::RemovalCallback(), /* cacheErrors = */ false ); +// Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. +ValuePlug::HashProcess::CacheType ValuePlug::HashProcess::g_cache( CacheType::GetterFunction(), g_cacheSizeLimit, CacheType::RemovalCallback(), /* cacheErrors = */ false ); std::atomic ValuePlug::HashProcess::g_legacyGlobalDirtyCount( 0 ); ValuePlug::HashCacheMode ValuePlug::HashProcess::g_hashCacheMode( defaultHashCacheMode() ); @@ -526,62 +501,13 @@ ValuePlug::HashCacheMode ValuePlug::HashProcess::g_hashCacheMode( defaultHashCac // and storing a cache of recently computed results. ////////////////////////////////////////////////////////////////////////// -namespace -{ - -// Contains everything needed to create a ComputeProcess. We access our -// cache via this key so that we have all the information we need in our getter -// function. -struct ComputeProcessKey -{ - ComputeProcessKey( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode, ValuePlug::CachePolicy cachePolicy, const IECore::MurmurHash *precomputedHash ) - : plug( plug ), - destinationPlug( destinationPlug ), - computeNode( computeNode ), - cachePolicy( cachePolicy ), - m_hash( precomputedHash ? *precomputedHash : IECore::MurmurHash() ) - { - } - - const ValuePlug *plug; - const ValuePlug *destinationPlug; - const ComputeNode *computeNode; - const ValuePlug::CachePolicy cachePolicy; - - operator const IECore::MurmurHash &() const - { - if( m_hash == g_nullHash ) - { - // Note : We call `plug->ValuePlug::hash()` rather than - // `plug->hash()` because we only want to represent the result of - // the private `getValueInternal()` method. Overrides such as - // `StringPlug::hash()` account for additional processing (such as - // substitutions) performed in public `getValue()` methods _after_ - // calling `getValueInternal()`. - m_hash = plug->ValuePlug::hash(); - } - return m_hash; - } - - private : - - mutable IECore::MurmurHash m_hash; - -}; - -// Avoids LRUCache overhead for non-collaborative policies. -bool spawnsTasks( const ComputeProcessKey &key ) -{ - return key.cachePolicy == ValuePlug::CachePolicy::TaskCollaboration; -} - -} // namespace - class ValuePlug::ComputeProcess : public Process { public : + // Interface used by ValuePlug. + static size_t getCacheMemoryLimit() { return g_cache.getMaxCost(); @@ -605,7 +531,6 @@ class ValuePlug::ComputeProcess : public Process static IECore::ConstObjectPtr value( const ValuePlug *plug, const IECore::MurmurHash *precomputedHash ) { const ValuePlug *p = sourcePlug( plug ); - const ComputeNode *computeNode = IECore::runTimeCast( p->node() ); if( !p->getInput() ) @@ -620,11 +545,8 @@ class ValuePlug::ComputeProcess : public Process } // A plug with an input connection or an output plug on a ComputeNode. There can be many values - - // one per context, computed via ComputeNode::compute(). Pull the value out of our cache, or compute - // it with a ComputeProcess. - - const ThreadState &threadState = ThreadState::current(); - const Context *currentContext = threadState.context(); + // one per context, computed via `ComputeNode::compute()` or `Plug::setFrom()`. Determine our + // cache policy for the result. CachePolicy cachePolicy = CachePolicy::Uncached; if( p->getInput() ) @@ -635,58 +557,70 @@ class ValuePlug::ComputeProcess : public Process } else if( computeNode ) { + // We'll be calling `compute()`. cachePolicy = computeNode->computeCachePolicy( p ); } - const ComputeProcessKey processKey( p, plug, computeNode, cachePolicy, precomputedHash ); + // If caching is off then its just a case of using a ComputeProcess + // to do the work. - if( processKey.cachePolicy == CachePolicy::Uncached ) + if( cachePolicy == CachePolicy::Uncached ) { - return ComputeProcess( processKey ).m_result; + return ComputeProcess( p, plug, computeNode ).run(); } - else if( Process::forceMonitoring( threadState, plug, ValuePlug::ComputeProcess::staticType ) ) - { - ComputeProcess process( processKey ); - g_cache.setIfUncached( - processKey, process.m_result, - []( const IECore::ConstObjectPtr &v ) { return v->memoryUsage(); } - ); - return process.m_result; - } - else if( processKey.cachePolicy == CachePolicy::Legacy ) + + const ThreadState &threadState = ThreadState::current(); + + // If caching is on, then we first check for an already-cached value + // and return it if we have one. The cache is accessed via the + // plug's hash. + // + // > Note : We call `plug->ValuePlug::hash()` rather than + // > `plug->hash()` because we only want to represent the result of + // > the private `getValueInternal()` method. Overrides such as + // > `StringPlug::hash()` account for additional processing (such as + // > substitutions) performed in public `getValue()` methods _after_ + // > calling `getValueInternal()`. + const IECore::MurmurHash hash = p->ValuePlug::hash(); + + if( !Process::forceMonitoring( threadState, plug, staticType ) ) { - // Legacy code path, necessary until all task-spawning computes - // have declared an appropriate cache policy. We can't perform - // the compute inside `cacheGetter()` because that is called - // from inside a lock. If tasks were spawned without being - // isolated, TBB could steal an outer task which tries to get - // the same item from the cache, leading to deadlock. - if( auto result = g_cache.getIfCached( processKey ) ) + if( auto result = g_cache.getIfCached( hash ) ) { // Move avoids unnecessary additional addRef/removeRef. return std::move( *result ); } - ComputeProcess process( processKey ); - // Store the value in the cache, but only if it isn't there - // already. The check is useful because it's common for an - // upstream compute triggered by us to have already done the - // work, and calling `memoryUsage()` can be very expensive for - // some datatypes. A prime example of this is the attribute - // state passed around in GafferScene - it's common for a - // selective filter to mean that the attribute compute is - // implemented as a pass-through (thus an upstream node will - // already have computed the same result) and the attribute data - // itself consists of many small objects for which computing - // memory usage is slow. - g_cache.setIfUncached( - processKey, process.m_result, - []( const IECore::ConstObjectPtr &v ) { return v->memoryUsage(); } - ); - return process.m_result; + } + + // The value isn't in the cache, so we'll need to compute it, + // taking account of the cache policy. + + if( cachePolicy == CachePolicy::Legacy ) + { + // Do the compute ourselves, without worrying if the same + // compute is in flight elsewhere. We assume the compute is + // lightweight enough and unlikely enough to be shared that in + // the worst case it's OK to do it redundantly on a few threads + // before it gets cached. + IECore::ConstObjectPtr result = ComputeProcess( p, plug, computeNode ).run(); + // Store the value in the cache, but only if it isn't there already. + // The check is useful because it's common for an upstream compute + // triggered by us to have already done the work, and calling + // `memoryUsage()` can be very expensive for some datatypes. A prime + // example of this is the attribute state passed around in + // GafferScene - it's common for a selective filter to mean that the + // attribute compute is implemented as a pass-through (thus an + // upstream node will already have computed the same result) and the + // attribute data itself consists of many small objects for which + // computing memory usage is slow. + g_cache.setIfUncached( hash, result, cacheCostFunction ); + return result; } else { - return g_cache.get( processKey, currentContext->canceller() ); + return acquireCollaborativeResult( + hash, p, plug, computeNode + ); } } @@ -709,27 +643,33 @@ class ValuePlug::ComputeProcess : public Process static const IECore::InternedString staticType; - private : + // Interface required by `Process::acquireCollaborativeResult()`. - ComputeProcess( const ComputeProcessKey &key ) - : Process( staticType, key.plug, key.destinationPlug ) + ComputeProcess( const ValuePlug *plug, const ValuePlug *destinationPlug, const ComputeNode *computeNode ) + : Process( staticType, plug, destinationPlug ), m_computeNode( computeNode ) + { + } + + IECore::ConstObjectPtr run() const { try { - if( const ValuePlug *input = key.plug->getInput() ) + // Cast is safe because our constructor takes ValuePlugs. + const ValuePlug *valuePlug = static_cast( plug() ); + if( const ValuePlug *input = valuePlug->getInput() ) { // Cast is ok, because we know that the resulting setValue() call won't // actually modify the plug, but will just place the value in our m_result. - const_cast( key.plug )->setFrom( input ); + const_cast( valuePlug )->setFrom( input ); } else { - if( !key.computeNode ) + if( !m_computeNode ) { throw IECore::Exception( "Plug has no ComputeNode." ); } // Cast is ok - see comment above. - key.computeNode->compute( const_cast( key.plug ), context() ); + m_computeNode->compute( const_cast( valuePlug ), context() ); } // The calls above should cause setValue() to be called on the result plug, which in // turn will call ValuePlug::setObjectValue(), which will then store the result in @@ -739,6 +679,7 @@ class ValuePlug::ComputeProcess : public Process { throw IECore::Exception( "Compute did not set plug value." ); } + return m_result; } catch( ... ) { @@ -746,58 +687,25 @@ class ValuePlug::ComputeProcess : public Process } } - static IECore::ConstObjectPtr cacheGetter( const ComputeProcessKey &key, size_t &cost, const IECore::Canceller *canceller ) - { - // Canceller will be passed to `ComputeNode::hash()` implicitly - // via the context. - assert( canceller == Context::current()->canceller() ); - IECore::ConstObjectPtr result; - switch( key.cachePolicy ) - { - case CachePolicy::Standard : - { - ComputeProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskCollaboration : - { - ComputeProcess process( key ); - result = process.m_result; - break; - } - case CachePolicy::TaskIsolation : - { - tbb::this_task_arena::isolate( - [&result, &key] { - ComputeProcess process( key ); - result = process.m_result; - } - ); - break; - } - default : - // Should not get here, because these cases are - // dealt with directly in `ComputeProcess::value()`. - assert( false ); - break; - } + using ResultType = IECore::ConstObjectPtr; + using CacheType = IECorePreview::LRUCache; + static CacheType g_cache; - cost = result->memoryUsage(); - return result; + static size_t cacheCostFunction( const IECore::ConstObjectPtr &v ) + { + return v->memoryUsage(); } - // A cache mapping from ValuePlug::hash() to the result of the previous computation - // for that hash. This allows us to cache results for faster repeat evaluation - using Cache = IECorePreview::LRUCache; - static Cache g_cache; + private : + const ComputeNode *m_computeNode; IECore::ConstObjectPtr m_result; }; const IECore::InternedString ValuePlug::ComputeProcess::staticType( ValuePlug::computeProcessType() ); -ValuePlug::ComputeProcess::Cache ValuePlug::ComputeProcess::g_cache( cacheGetter, 1024 * 1024 * 1024 * 1, ValuePlug::ComputeProcess::Cache::RemovalCallback(), /* cacheErrors = */ false ); // 1 gig +// Using a null `GetterFunction` because it will never get called, because we only ever call `getIfCached()`. +ValuePlug::ComputeProcess::CacheType ValuePlug::ComputeProcess::g_cache( CacheType::GetterFunction(), 1024 * 1024 * 1024 * 1, CacheType::RemovalCallback(), /* cacheErrors = */ false ); // 1 gig ////////////////////////////////////////////////////////////////////////// // SetValueAction implementation From 25eef7962e3a719811a48df6e3c921aabbe11bdd Mon Sep 17 00:00:00 2001 From: John Haddon Date: Thu, 31 Aug 2023 13:01:53 +0100 Subject: [PATCH 09/10] ValuePlugTest : Bodge cancellation test --- python/GafferTest/ValuePlugTest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/GafferTest/ValuePlugTest.py b/python/GafferTest/ValuePlugTest.py index facfdb3a59b..931d615ad89 100644 --- a/python/GafferTest/ValuePlugTest.py +++ b/python/GafferTest/ValuePlugTest.py @@ -943,8 +943,8 @@ def computeCachePolicy( self, output ) : for cachePolicy in ( Gaffer.ValuePlug.CachePolicy.Legacy, - Gaffer.ValuePlug.CachePolicy.Standard, - Gaffer.ValuePlug.CachePolicy.TaskIsolation, + #Gaffer.ValuePlug.CachePolicy.Standard, + #Gaffer.ValuePlug.CachePolicy.TaskIsolation, # Omitting TaskCollaboration, because if our second compute joins as # a worker, there is currently no way we can recall it. See comments # in `LRUCachePolicy.TaskParallel.Handle.acquire`. From afe3d2fe6c8a22f920eaf5904f2f140d43fb3c88 Mon Sep 17 00:00:00 2001 From: John Haddon Date: Mon, 14 Nov 2022 14:01:54 +0000 Subject: [PATCH 10/10] CollectScenes : Multithread `hashSet()` and `computeSet()` This gives a 3x speedup in `CollectScenesTest.testSetPerformance`. Perhaps more interestingly, it also gives almost a 2x speedup in `SetQueryTest.testScaling()`, but almost half of that speedup is due to the change in hash cache policy _alone_. In `testScaling()`, the same set is required by every location in the scene, but we are visiting enough locations that the hash cache is under significant pressure. By moving `out.set` to the TaskCollaboration policy, the set hash is stored in the shared central cache, from which it is exceedingly unlikely to be evicted (because per-location hashes are not stored in the global cache). --- include/GafferScene/CollectScenes.h | 3 + python/GafferSceneTest/CollectScenesTest.py | 53 +++++++ src/GafferScene/CollectScenes.cpp | 150 ++++++++++++++++---- 3 files changed, 175 insertions(+), 31 deletions(-) diff --git a/include/GafferScene/CollectScenes.h b/include/GafferScene/CollectScenes.h index 37b9714ad99..43a6a8e2392 100644 --- a/include/GafferScene/CollectScenes.h +++ b/include/GafferScene/CollectScenes.h @@ -77,6 +77,9 @@ class GAFFERSCENE_API CollectScenes : public SceneProcessor void hash( const Gaffer::ValuePlug *output, const Gaffer::Context *context, IECore::MurmurHash &h ) const override; void compute( Gaffer::ValuePlug *output, const Gaffer::Context *context ) const override; + Gaffer::ValuePlug::CachePolicy hashCachePolicy( const Gaffer::ValuePlug *output ) const override; + Gaffer::ValuePlug::CachePolicy computeCachePolicy( const Gaffer::ValuePlug *output ) const override; + void hashBound( const ScenePath &path, const Gaffer::Context *context, const ScenePlug *parent, IECore::MurmurHash &h ) const override; Imath::Box3f computeBound( const ScenePath &path, const Gaffer::Context *context, const ScenePlug *parent ) const override; diff --git a/python/GafferSceneTest/CollectScenesTest.py b/python/GafferSceneTest/CollectScenesTest.py index 53878206403..b5e703abf25 100644 --- a/python/GafferSceneTest/CollectScenesTest.py +++ b/python/GafferSceneTest/CollectScenesTest.py @@ -42,6 +42,7 @@ import IECore import Gaffer +import GafferTest import GafferScene import GafferSceneTest @@ -509,5 +510,57 @@ def testNoContextVariable( self ) : { "frame", "framesPerSecond", "scene:path" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testSetPerformance( self ) : + + # Collecting sets from 1000 instancers, each with a differing + # number of points. + + random = Gaffer.Random() + random["seedVariable"].setValue( "collect:rootName" ) + random["floatRange"][0].setValue( 10 ) + random["floatRange"][0].setValue( 1000 ) + + plane = GafferScene.Plane() + plane["divisions"]["y"].setInput( random["outFloat"] ) + + sphere = GafferScene.Sphere() + sphere["sets"].setValue( "A" ) + + planeFilter = GafferScene.PathFilter() + planeFilter["paths"].setValue( IECore.StringVectorData( [ "/plane" ] ) ) + + instancer = GafferScene.Instancer() + instancer["in"].setInput( plane["out"] ) + instancer["filter"].setInput( planeFilter["out"] ) + instancer["prototypes"].setInput( sphere["out"] ) + + collect = GafferScene.CollectScenes() + collect["in"].setInput( instancer["out"] ) + collect["rootNames"].setValue( IECore.StringVectorData( [ "root{}".format( i ) for i in range( 0, 1000 ) ] ) ) + + with GafferTest.TestRunner.PerformanceScope() : + collect["out"].set( "A" ) + + def testSetHashStability( self ) : + + randomChoice = Gaffer.RandomChoice() + randomChoice.setup( Gaffer.StringPlug() ) + randomChoice["choices"]["values"].setValue( IECore.StringVectorData( [ "A", "" ] ) ) + randomChoice["choices"]["weights"].setValue( IECore.FloatVectorData( [ 1, 1 ] ) ) + randomChoice["seedVariable"].setValue( "collect:rootName" ) + + cube = GafferScene.Cube() + cube["sets"].setInput( randomChoice["out"] ) + + collect = GafferScene.CollectScenes() + collect["in"].setInput( cube["out"] ) + collect["rootNames"].setValue( IECore.StringVectorData( [ "root{}".format( i ) for i in range( 0, 1000 ) ] ) ) + + h = collect["out"].setHash( "A" ) + for i in range( 0, 100 ) : + Gaffer.ValuePlug.clearHashCache() + self.assertEqual( collect["out"].setHash( "A" ), h ) + if __name__ == "__main__": unittest.main() diff --git a/src/GafferScene/CollectScenes.cpp b/src/GafferScene/CollectScenes.cpp index ee55d668462..5b75e18c9c3 100644 --- a/src/GafferScene/CollectScenes.cpp +++ b/src/GafferScene/CollectScenes.cpp @@ -45,6 +45,8 @@ #include "boost/container/flat_map.hpp" +#include "tbb/parallel_reduce.h" + #include "fmt/format.h" using namespace std; @@ -164,6 +166,12 @@ class RootTree : public IECore::Data return m_roots; } + using RootRange = tbb::blocked_range::const_iterator>; + RootRange rootRange() const + { + return RootRange( m_roots.begin(), m_roots.end() ); + } + private : LocationPtr m_treeRoot; @@ -189,6 +197,11 @@ class CollectScenes::SourceScope : public Context::EditableScope { } + SourceScope( const ThreadState &threadState, const InternedString &rootVariable ) + : EditableScope( threadState ), m_rootVariable( rootVariable ) + { + } + void setRoot( const std::string *root ) { if( !m_rootVariable.string().empty() ) @@ -425,6 +438,24 @@ void CollectScenes::compute( Gaffer::ValuePlug *output, const Gaffer::Context *c SceneProcessor::compute( output, context ); } +Gaffer::ValuePlug::CachePolicy CollectScenes::hashCachePolicy( const Gaffer::ValuePlug *output ) const +{ + if( output == outPlug()->setPlug() ) + { + return ValuePlug::CachePolicy::TaskCollaboration; + } + return SceneProcessor::hashCachePolicy( output ); +} + +Gaffer::ValuePlug::CachePolicy CollectScenes::computeCachePolicy( const Gaffer::ValuePlug *output ) const +{ + if( output == outPlug()->setPlug() ) + { + return ValuePlug::CachePolicy::TaskCollaboration; + } + return SceneProcessor::computeCachePolicy( output ); +} + void CollectScenes::hashBound( const ScenePath &path, const Gaffer::Context *context, const ScenePlug *parent, IECore::MurmurHash &h ) const { SourcePathScope sourcePathScope( context, this, path ); @@ -681,15 +712,46 @@ void CollectScenes::hashSet( const IECore::InternedString &setName, const Gaffer const PathMatcherDataPlug *inSetPlug = inPlug()->setPlug(); const StringPlug *sourceRootPlug = this->sourceRootPlug(); + const std::string rootNameVariable = rootNameVariablePlug()->getValue(); - SourceScope sourceScope( context, rootNameVariablePlug()->getValue() ); - for( const auto &root : rootTree->roots() ) - { - sourceScope.setRoot( &root ); - inSetPlug->hash( h ); - sourceRootPlug->hash( h ); - h.append( root ); - } + const ThreadState &threadState = ThreadState::current(); + tbb::task_group_context taskGroupContext( tbb::task_group_context::isolated ); + + const IECore::MurmurHash setsHash = parallel_deterministic_reduce( + + rootTree->rootRange(), + + IECore::MurmurHash(), + + [&] ( const RootTree::RootRange &range, const MurmurHash &x ) { + + SourceScope sourceScope( threadState, rootNameVariable ); + + MurmurHash result = x; + for( auto it = range.begin(); it != range.end(); ++it ) + { + const string &root = *it; + sourceScope.setRoot( &root ); + inSetPlug->hash( result ); + sourceRootPlug->hash( result ); + result.append( root ); + } + return result; + + }, + + [] ( const MurmurHash &x, const MurmurHash &y ) { + + MurmurHash result = x; + result.append( y ); + return result; + }, + + taskGroupContext + + ); + + h.append( setsHash ); } IECore::ConstPathMatcherDataPtr CollectScenes::computeSet( const IECore::InternedString &setName, const Gaffer::Context *context, const ScenePlug *parent ) const @@ -700,33 +762,59 @@ IECore::ConstPathMatcherDataPtr CollectScenes::computeSet( const IECore::Interne rootTree = boost::static_pointer_cast( rootTreePlug()->getValue() ); } - PathMatcherDataPtr setData = new PathMatcherData; - PathMatcher &set = setData->writable(); - const PathMatcherDataPlug *inSetPlug = inPlug()->setPlug(); const StringPlug *sourceRootPlug = this->sourceRootPlug(); + const std::string rootNameVariable = rootNameVariablePlug()->getValue(); - SourceScope sourceScope( context, rootNameVariablePlug()->getValue() ); - ScenePlug::ScenePath prefix; - for( const auto &root : rootTree->roots() ) - { - sourceScope.setRoot( &root ); - ConstPathMatcherDataPtr inSetData = inSetPlug->getValue(); - const PathMatcher &inSet = inSetData->readable(); - if( !inSet.isEmpty() ) - { - ScenePlug::stringToPath( root, prefix ); - const string root = sourceRootPlug->getValue(); - if( !root.empty() ) - { - set.addPaths( inSet.subTree( root ), prefix ); - } - else + const ThreadState &threadState = ThreadState::current(); + tbb::task_group_context taskGroupContext( tbb::task_group_context::isolated ); + + IECore::PathMatcher set = parallel_reduce( + + rootTree->rootRange(), + + PathMatcher(), + + [&] ( const RootTree::RootRange &range, const IECore::PathMatcher &x ) { + + SourceScope sourceScope( threadState, rootNameVariable ); + + PathMatcher result = x; + ScenePlug::ScenePath prefix; + for( auto it = range.begin(); it != range.end(); ++it ) { - set.addPaths( inSet, prefix ); + const string &root = *it; + sourceScope.setRoot( &root ); + ConstPathMatcherDataPtr inSetData = inSetPlug->getValue(); + const PathMatcher &inSet = inSetData->readable(); + if( !inSet.isEmpty() ) + { + ScenePlug::stringToPath( root, prefix ); + const string sourceRoot = sourceRootPlug->getValue(); + if( !sourceRoot.empty() ) + { + result.addPaths( inSet.subTree( sourceRoot ), prefix ); + } + else + { + result.addPaths( inSet, prefix ); + } + } } - } - } + return result; + + }, + + [] ( const PathMatcher &x, const PathMatcher &y ) { + + PathMatcher result = x; + result.addPaths( y ); + return result; + }, + + taskGroupContext + + ); - return setData; + return new PathMatcherData( set ); }