Skip to content

Commit

Permalink
Merge pull request #1203 from johnhaddon/numThreads
Browse files Browse the repository at this point in the history
Control over thread count
  • Loading branch information
andrewkaufman committed Feb 24, 2015
2 parents 817d366 + b74412f commit 5d45d2d
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 4 deletions.
8 changes: 6 additions & 2 deletions apps/execute/execute-1.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ def _run( self, args ) :

with context :
for node in nodes :
node.executeSequence( frames )

try :
node.executeSequence( frames )
except Exception as exception :
IECore.msg( IECore.Msg.Level.Error, "gaffer execute : executing %s" % node.relativeName( scriptNode ), str( exception ) )
return 1

return 0

IECore.registerRunTimeTyped( execute )
Expand Down
21 changes: 19 additions & 2 deletions python/Gaffer/Application.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def __init__( self, description="" ) :
self.parameters().addParameters(

[

IECore.IntParameter(
name = "threads",
description = "The maximum number of threads used for computation. "
"The default value of zero causes the number of threads to "
" be chosen automatically based on the available hardware.",
defaultValue = 0,
minValue = 0,
),

IECore.FileNameParameter(
name = "profileFileName",
description = "If this is specified, then the application "
Expand All @@ -59,6 +69,7 @@ def __init__( self, description="" ) :
defaultValue = "",
allowEmptyString = True
),

]

)
Expand Down Expand Up @@ -108,7 +119,13 @@ def _executeStartupFiles( self, applicationName ) :

def __run( self, args ) :

self._executeStartupFiles( self.root().getName() )
return self._run( args )
import _Gaffer

with _Gaffer._tbb_task_scheduler_init(
_Gaffer._tbb_task_scheduler_init.automatic if args["threads"].value == 0 else args["threads"].value
) :

self._executeStartupFiles( self.root().getName() )
return self._run( args )

IECore.registerRunTimeTyped( Application, typeName = "Gaffer::Application" )
54 changes: 54 additions & 0 deletions python/GafferTest/ApplicationTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
##########################################################################
#
# Copyright (c) 2015, Image Engine Design Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with
# the distribution.
#
# * Neither the name of John Haddon nor the names of
# any other contributors to this software may be used to endorse or
# promote products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
##########################################################################

import Gaffer
import GafferTest

class ApplicationTest( GafferTest.TestCase ) :

def testTaskSchedulerInitDoesntSuppressExceptions( self ) :

def f() :

import Gaffer._Gaffer as _Gaffer
with _Gaffer._tbb_task_scheduler_init( _Gaffer._tbb_task_scheduler_init.automatic ) :
raise Exception( "Woops!")

self.assertRaises( Exception, f )

if __name__ == "__main__":
unittest.main()

20 changes: 20 additions & 0 deletions python/GafferTest/ExecuteApplicationTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,26 @@ def testIgnoreScriptLoadErrors( self ) :
self.assertFalse( "Traceback" in error )
self.assertEqual( p.returncode, 0 )

def testErrorReturnStatusForExceptionDuringExecution( self ) :

s = Gaffer.ScriptNode()
s["fileName"].setValue( self.__scriptFileName )
s["t"] = GafferTest.TextWriter()
s["t"]["fileName"].setValue( "" ) # will cause an error
s.save()

p = subprocess.Popen(
"gaffer execute -script " + self.__scriptFileName,
shell=True,
stderr = subprocess.PIPE,
)
p.wait()

error = "".join( p.stderr.readlines() )
self.failUnless( "ERROR" in error )
self.failUnless( "executing t" in error )
self.failUnless( p.returncode )

def tearDown( self ) :

files = [ self.__scriptFileName ]
Expand Down
1 change: 1 addition & 0 deletions python/GafferTest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def wrapper( self ) :
from TaskListTest import TaskListTest
from NodeAlgoTest import NodeAlgoTest
from DotTest import DotTest
from ApplicationTest import ApplicationTest

if __name__ == "__main__":
import unittest
Expand Down
48 changes: 48 additions & 0 deletions src/GafferModule/GafferModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
//
//////////////////////////////////////////////////////////////////////////

#include "tbb/tbb.h"

#include "Gaffer/TimeWarp.h"
#include "Gaffer/ContextVariables.h"
#include "Gaffer/Backdrop.h"
Expand Down Expand Up @@ -85,6 +87,45 @@ using namespace boost::python;
using namespace Gaffer;
using namespace GafferBindings;

namespace
{

// Wraps task_scheduler_init so it can be used as a python
// context manager.
class TaskSchedulerInitWrapper : public tbb::task_scheduler_init
{

public :

TaskSchedulerInitWrapper( int max_threads )
: tbb::task_scheduler_init( deferred ), m_maxThreads( max_threads )
{
if( max_threads != automatic && max_threads <= 0 )
{
PyErr_SetString( PyExc_ValueError, "max_threads must be either automatic or a positive integer" );
throw_error_already_set();
}
}

void enter()
{
initialize( m_maxThreads );
}

bool exit( boost::python::object excType, boost::python::object excValue, boost::python::object excTraceBack )
{
terminate();
return false; // don't suppress exceptions
}

private :

int m_maxThreads;

};

} // namespace

BOOST_PYTHON_MODULE( _Gaffer )
{

Expand Down Expand Up @@ -136,6 +177,13 @@ BOOST_PYTHON_MODULE( _Gaffer )
DependencyNodeClass<SwitchDependencyNode>();
DependencyNodeClass<SwitchComputeNode>();

object tsi = class_<TaskSchedulerInitWrapper, boost::noncopyable>( "_tbb_task_scheduler_init", no_init )
.def( init<int>( arg( "max_threads" ) = int( tbb::task_scheduler_init::automatic ) ) )
.def( "__enter__", &TaskSchedulerInitWrapper::enter, return_self<>() )
.def( "__exit__", &TaskSchedulerInitWrapper::exit )
;
tsi.attr( "automatic" ) = int( tbb::task_scheduler_init::automatic );

object behavioursModule( borrowed( PyImport_AddModule( "Gaffer.Behaviours" ) ) );
scope().attr( "Behaviours" ) = behavioursModule;

Expand Down
81 changes: 81 additions & 0 deletions src/GafferScene/SceneProcedural.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
//////////////////////////////////////////////////////////////////////////

#include "tbb/parallel_for.h"
#include "tbb/task_scheduler_init.h"

#include "boost/lexical_cast.hpp"

#include "OpenEXR/ImathBoxAlgo.h"
#include "OpenEXR/ImathFun.h"
Expand All @@ -57,6 +60,75 @@ using namespace IECore;
using namespace Gaffer;
using namespace GafferScene;

// TBB recommends that you defer decisions about how many threads to create
// to it, so you can write nice high level code and it can decide how best
// to schedule the work. Generally if left to do this, it schedules it by
// making as many threads as there are cores, to make best use of the hardware.
// This is all well and good, until you're running multiple renders side-by-side,
// telling the renderer to use a limited number of threads so they all play nicely
// together. Let's use the example of a 32 core machine with 4 8-thread 3delight
// renders running side by side.
//
// - 3delight will make 8 threads. TBB didn't make them itself, so it considers
// them to be "master" threads.
// - 3delight will then call our procedurals on some subset of those 8 threads.
// We'll execute graphs, which may or may not use TBB internally, but even if they
// don't, we're using parallel_for for child procedural construction.
// - TBB will be invoked from these master threads, see that it hasn't been
// initialised yet, and merrily initialise itself to use 32 threads.
// - We now have 4 side by side renders each trying to take over the machine,
// and a not-so-happy IT department.
//
// The "solution" to this is to explicitly initialise TBB every time a procedural
// is invoked, limiting it to a certain number of threads. Problem solved? Maybe.
// There's another wrinkle, in that TBB is initialised separately for each master
// thread, and if each master asks for a maximum of N threads, and there are M masters,
// TBB might actually make up to `M * N` threads, clamped at the number of cores.
// So with N set to 8, you could still get a single process trying to use the
// whole machine. In practice, it appears that 3delight perhaps doesn't make great
// use of procedural concurrency, so the worst case of M procedurals in flight,
// each trying to use N threads may not occur. What other renderers do in this
// situation is unknown.
//
// I strongly suspect that the long term solution to this is to abandon using
// a procedural hierarchy matching the scene hierarchy, and to do our own
// threaded traversal of the scene, outputting the results to the renderer via
// a single master thread. We could then be sure of our resource usage, and
// also get better performance with renderers unable to make best use of
// procedural concurrency.
//
// In the meantime, we introduce a hack. The GAFFERSCENE_SCENEPROCEDURAL_THREADS
// environment variable may be used to clamp the number of threads used by any
// given master thread. We sincerely hope to have a better solution before too
// long.
//
// Worthwhile reading :
//
// https://software.intel.com/en-us/blogs/2011/04/09/tbb-initialization-termination-and-resource-management-details-juicy-and-gory/
//
void initializeTaskScheduler( tbb::task_scheduler_init &tsi )
{
assert( !tsi.is_active() );

static int g_maxThreads = -1;
if( g_maxThreads == -1 )
{
if( const char *c = getenv( "GAFFERSCENE_SCENEPROCEDURAL_THREADS" ) )
{
g_maxThreads = boost::lexical_cast<int>( c );
}
else
{
g_maxThreads = 0;
}
}

if( g_maxThreads > 0 )
{
tsi.initialize( g_maxThreads );
}
}

tbb::atomic<int> SceneProcedural::g_pendingSceneProcedurals;
tbb::mutex SceneProcedural::g_allRenderedMutex;

Expand All @@ -65,6 +137,9 @@ SceneProcedural::AllRenderedSignal SceneProcedural::g_allRenderedSignal;
SceneProcedural::SceneProcedural( ConstScenePlugPtr scenePlug, const Gaffer::Context *context, const ScenePlug::ScenePath &scenePath )
: m_scenePlug( scenePlug ), m_context( new Context( *context ) ), m_scenePath( scenePath ), m_rendered( false )
{
tbb::task_scheduler_init tsi( tbb::task_scheduler_init::deferred );
initializeTaskScheduler( tsi );

// get a reference to the script node to prevent it being destroyed while we're doing a render:
m_scriptNode = m_scenePlug->ancestor<ScriptNode>();

Expand Down Expand Up @@ -109,6 +184,9 @@ SceneProcedural::SceneProcedural( const SceneProcedural &other, const ScenePlug:
: m_scenePlug( other.m_scenePlug ), m_context( new Context( *(other.m_context), Context::Shared ) ), m_scenePath( scenePath ),
m_options( other.m_options ), m_attributes( other.m_attributes ), m_rendered( false )
{
tbb::task_scheduler_init tsi( tbb::task_scheduler_init::deferred );
initializeTaskScheduler( tsi );

// get a reference to the script node to prevent it being destroyed while we're doing a render:
m_scriptNode = m_scenePlug->ancestor<ScriptNode>();

Expand Down Expand Up @@ -239,6 +317,9 @@ class SceneProcedural::SceneProceduralCreate

void SceneProcedural::render( Renderer *renderer ) const
{
tbb::task_scheduler_init tsi( tbb::task_scheduler_init::deferred );
initializeTaskScheduler( tsi );

Context::Scope scopedContext( m_context.get() );

/// \todo See above.
Expand Down

0 comments on commit 5d45d2d

Please sign in to comment.