Skip to content

Commit

Permalink
Merge pull request #5493 from johnhaddon/improvedCollaborationPR
Browse files Browse the repository at this point in the history
TaskCollaboration improvements for review
  • Loading branch information
johnhaddon authored Oct 27, 2023
2 parents cabe4ab + a62f892 commit 1b156f8
Show file tree
Hide file tree
Showing 30 changed files with 1,860 additions and 696 deletions.
7 changes: 7 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@ Improvements
Fixes
-----

- ValuePlug : Fixed hangs and poor performance caused by plugs depending on upstream plugs with an identical hash (#4978).
- Filter : Fixed bug which allowed the `scene:path` context variable to "leak" upstream via the `Filter.enabled` plug. This caused unnecessary evaluations of the input, and also provided a loophole via which the filter result could be made inconsistent with respect to descendant and ancestor matches.
- Windows :
- Fixed a bug preventing anything except strings from being copied and pasted.
- Fixed likely cause of crash when resizing Spreadsheet column width (#5296).
- Reference : Fixed rare reloading error.

API
---

- Process : Added `acquireCollaborativeResult()` method, providing an improved mechanism for multiple threads to collaborate on TBB tasks spawned by a single process they all depend on.
- ValuePlug : Added `Default` CachePolicy and deprecated `Standard`, `TaskIsolation` and `Legacy` policies.

1.3.5.0 (relative to 1.3.4.0)
=======

Expand Down
67 changes: 21 additions & 46 deletions include/Gaffer/Private/IECorePreview/LRUCache.inl
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,12 @@ class Serial
return m_it->cacheEntry;
}

// Returns true if it is OK to call `writable()`.
// This is typically determined by the AcquireMode
// passed to `acquire()`, with special cases for
// recursion.
// Returns true if it is OK to call `writable()`. This is determined
// by the AcquireMode passed to `acquire()`.
bool isWritable() const
{
// Because this policy is serial, it would technically
// always be OK to write. But we return false for recursive
// calls to avoid unnecessary overhead updating the LRU list
// for inner calls.
return m_it->handleCount == 1;
// Because this policy is serial, it is always OK to write
return true;
}

// Executes the functor F. This is used to
Expand Down Expand Up @@ -733,20 +728,19 @@ class TaskParallel

CacheEntry &writable()
{
assert( m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write );
assert( m_itemLock.isWriter() );
return m_item->cacheEntry;
}

// May return false for AcquireMode::Insert if a GetterFunction recurses.
bool isWritable() const
{
return m_itemLock.lockType() == Item::Mutex::ScopedLock::LockType::Write;
return m_itemLock.isWriter();
}

template<typename F>
void execute( F &&f )
{
if( m_spawnsTasks && m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Write )
if( m_spawnsTasks )
{
// The getter function will spawn tasks. Execute
// it via the TaskMutex, so that other threads trying
Expand Down Expand Up @@ -814,16 +808,9 @@ class TaskParallel
// found a pre-existing item we optimistically take
// just a read lock, because it is faster when
// many threads just need to read from the same
// cached item. We accept WorkerRead locks when necessary,
// to support Getter recursion.
TaskMutex::ScopedLock::LockType lockType = TaskMutex::ScopedLock::LockType::WorkerRead;
if( inserted || mode == FindWritable || mode == InsertWritable )
{
lockType = TaskMutex::ScopedLock::LockType::Write;
}

// cached item.
const bool acquired = m_itemLock.acquireOr(
it->mutex, lockType,
it->mutex, /* write = */ inserted || mode == FindWritable || mode == InsertWritable,
// Work accepter
[&binLock, canceller] ( bool workAvailable ) {
// Release the bin lock prior to accepting work, because
Expand All @@ -836,26 +823,15 @@ class TaskParallel
// The only canceller being checked at that
// point will be the one passed to the
// `LRUCache::get()` call that we work in
// service of. This isn't ideal, as it can cause
// UI stalls if one UI element is waiting to
// cancel an operation, but it's tasks have been
// "captured" by collaboration on a compute
// started by another UI element (which hasn't
// requested cancellation). One alternative is
// that we would only accept work if our
// canceller matches the one in use by the
// original caller. This would rule out
// collaboration between UI elements, but would
// still allow diamond dependencies in graph
// evaluation to use collaboration.
// service of.
return (!canceller || !canceller->cancelled());
}
);

if( acquired )
{
if(
m_itemLock.lockType() == TaskMutex::ScopedLock::LockType::Read &&
!m_itemLock.isWriter() &&
mode == Insert && it->cacheEntry.status() == LRUCache::Uncached
)
{
Expand Down Expand Up @@ -1108,6 +1084,7 @@ Value LRUCache<Key, Value, Policy, GetterKey>::get( const GetterKey &key, const

if( status==Uncached )
{
assert( handle.isWritable() );
Value value = Value();
Cost cost = 0;
try
Expand All @@ -1120,24 +1097,21 @@ Value LRUCache<Key, Value, Policy, GetterKey>::get( const GetterKey &key, const
}
catch( ... )
{
if( handle.isWritable() && m_cacheErrors )
if( m_cacheErrors )
{
handle.writable().state = std::current_exception();
}
throw;
}

if( handle.isWritable() )
{
assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow
assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention.
assert( cacheEntry.status() != Cached ); // this would indicate that another thread somehow
assert( cacheEntry.status() != Failed ); // loaded the same thing as us, which is not the intention.

setInternal( key, handle.writable(), value, cost );
m_policy.push( handle );
setInternal( key, handle.writable(), value, cost );
m_policy.push( handle );

handle.release();
limitCost( m_maxCost );
}
handle.release();
limitCost( m_maxCost );

return value;
}
Expand Down Expand Up @@ -1202,8 +1176,9 @@ bool LRUCache<Key, Value, Policy, GetterKey>::setIfUncached( const Key &key, con
const Status status = cacheEntry.status();

bool result = false;
if( status == Uncached && handle.isWritable() )
if( status == Uncached )
{
assert( handle.isWritable() );
result = setInternal( key, handle.writable(), value, costFunction( value ) );
m_policy.push( handle );

Expand Down
Loading

0 comments on commit 1b156f8

Please sign in to comment.