Skip to content

Commit

Permalink
Process : Fix exception handling in acquireCollaborativeResult()
Browse files Browse the repository at this point in the history
When an exception was thrown from `ProcessType::run()`, we were catching it and rethrowing it from the code path that initiated the collaboration. But since we were rethrowing after the `run_and_wait()`, collaborating threads could leave their `wait()` before the exception was rethrown. In this case, they would throw their own vague exception about there being no result available, and this exception could be the first thrown, and therefore the first one to surface back to the caller. We now store the original exception in `TypedCollaboration::result` (now a variant) so that both the initiator and the collaborator throw the same exception.

_Technically_ this is an ABI break, because it changes a member in TypedCollaboration. But for it to cause a problem, the following conditions would need to be met :
- Someone would need to be using `acquireCollaborativeResult()` already. I'm not aware of anyone ever creating their own Process subclass, so the chances of someone doing that _and_ using a new feature that was only released yesterday are vanishingly small.
- They would also need to compile some code with the old definition, and some with the new, which again is vanishingly unlikely, as Process subclasses are usually completely hidden.

I think it's pretty clear that the lesser evil in this case is to fix the bug.
  • Loading branch information
johnhaddon committed Nov 4, 2023
1 parent f7a4f97 commit 6a02316
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 26 deletions.
3 changes: 3 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
1.3.x.x (relative to 1.3.6.0)
=======

Fixes
-----

- Process : Fixed bug which caused a `No result found` exception to be thrown when a more descriptive exception should have been thrown instead.

1.3.6.0 (relative to 1.3.5.0)
=======
Expand Down
59 changes: 33 additions & 26 deletions include/Gaffer/Process.inl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "tbb/task_group.h"

#include <unordered_set>
#include <variant>

namespace Gaffer
{
Expand Down Expand Up @@ -267,7 +268,30 @@ class Process::TypedCollaboration : public Process::Collaboration
{
public :

std::optional<typename ProcessType::ResultType> result;
std::variant<std::monostate, std::exception_ptr, typename ProcessType::ResultType> result;

typename ProcessType::ResultType resultOrException() const
{
return std::visit(
[] ( auto &&v ) -> typename ProcessType::ResultType
{
using T = std::decay_t<decltype( v )>;
if constexpr( std::is_same_v<T, typename ProcessType::ResultType> )
{
return v;
}
else if constexpr( std::is_same_v<T, std::exception_ptr> )
{
std::rethrow_exception( v );
}
else
{
throw IECore::Cancelled();
}
},
result
);
}

IE_CORE_DECLAREMEMBERPTR( TypedCollaboration );

Expand Down Expand Up @@ -349,14 +373,7 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(
[&]{ return collaboration->taskGroup.wait(); }
);

if( collaboration->result )
{
return *collaboration->result;
}
else
{
throw IECore::Exception( "Process::acquireCollaborativeResult : No result found" );
}
return collaboration->resultOrException();
}

// No suitable in-flight collaborations, so we'll create one of our own.
Expand All @@ -376,9 +393,7 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(
collaboration->dependents.insert( currentCollaboration );
}

std::exception_ptr exception;

auto status = collaboration->arena.execute(
collaboration->arena.execute(
[&] {
return collaboration->taskGroup.run_and_wait(
[&] {
Expand All @@ -396,16 +411,17 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(
// `g_pendingCollaborations`, so that other threads will
// be able to get the result one way or the other.
ProcessType::g_cache.setIfUncached(
cacheKey, *collaboration->result,
cacheKey, std::get<typename ProcessType::ResultType>( collaboration->result ),
ProcessType::cacheCostFunction
);
}
catch( ... )
{
// Don't allow `task_group::wait()` to see exceptions,
// because then we'd hit a thread-safety bug in
// We want to manage the exception ourselves anyway,
// but its also imperative that we don't allow `task_group::wait()`
// to see it, because then we'd hit a thread-safety bug in
// `tbb::task_group_context::reset()`.
exception = std::current_exception();
collaboration->result = std::current_exception();
}

// Now we're done, remove `collaboration` from the pending collaborations.
Expand All @@ -424,16 +440,7 @@ typename ProcessType::ResultType Process::acquireCollaborativeResult(
}
);

if( exception )
{
std::rethrow_exception( exception );
}
else if( status == tbb::task_group_status::canceled )
{
throw IECore::Cancelled();
}

return *collaboration->result;
return collaboration->resultOrException();
}

inline bool Process::forceMonitoring( const ThreadState &s, const Plug *plug, const IECore::InternedString &processType )
Expand Down
24 changes: 24 additions & 0 deletions python/GafferSceneTest/DuplicateTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,29 @@ def testExistingTransform( self ) :
imath.M44f().translate( imath.V3f( 5, 0, 0 ) )
)

def testUpstreamError( self ):

sphereFilter = GafferScene.PathFilter()
sphereFilter["paths"].setValue( IECore.StringVectorData( [ '/sphere' ] ) )

sphere = GafferScene.Sphere()

attributes = GafferScene.CustomAttributes()
attributes["in"].setInput( sphere["out"] )
attributes["filter"].setInput( sphereFilter["out"] )
attributes["attributes"]["attribute1"] = ( Gaffer.NameValuePlug( "testAttribute", 0 ) )
attributes["expression"] = Gaffer.Expression()
attributes["expression"].setExpression( 'parent["attributes"]["attribute1"]["value"] = 1 / 0', "python" )

duplicate = GafferScene.Duplicate()
duplicate["in"].setInput( attributes["out"] )
duplicate["filter"].setInput( sphereFilter["out"] )
duplicate["copies"].setValue( 100 )

for i in range( 20 ) :
with self.subTest( i = i ) :
with self.assertRaisesRegex( RuntimeError, "division by zero" ):
GafferSceneTest.traverseScene( duplicate["out"] )

if __name__ == "__main__":
unittest.main()

0 comments on commit 6a02316

Please sign in to comment.