From cc440e4b6d48e7e97a504ebcd763f69493b66085 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sat, 19 Aug 2023 15:39:44 +0200 Subject: [PATCH] Add re-entry support to synchronized (#26) --- src/functions.php | 12 +++++++ test/SynchronizedTest.php | 73 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/src/functions.php b/src/functions.php index 4513854..3090be9 100644 --- a/src/functions.php +++ b/src/functions.php @@ -4,6 +4,7 @@ use Amp\Pipeline\Queue; use Amp\Sync\Internal\ConcurrentIteratorChannel; +use Revolt\EventLoop\FiberLocal; /** * Invokes the given Closure while maintaining a lock from the provided mutex. @@ -18,11 +19,22 @@ */ function synchronized(Semaphore $semaphore, \Closure $synchronized, mixed ...$args): mixed { + static $reentry; + $reentry ??= new FiberLocal(fn () => new \WeakMap()); + + /** @var \WeakMap $existingLocks */ + $existingLocks = $reentry->get(); + if ($existingLocks[$semaphore] ?? false) { + return $synchronized(...$args); + } + $lock = $semaphore->acquire(); + $existingLocks[$semaphore] = true; try { return $synchronized(...$args); } finally { + unset($existingLocks[$semaphore]); $lock->release(); } } diff --git a/test/SynchronizedTest.php b/test/SynchronizedTest.php index 5b52d81..9f9db41 100644 --- a/test/SynchronizedTest.php +++ b/test/SynchronizedTest.php @@ -27,4 +27,77 @@ public function testSynchronized(): void self::assertEquals([0, 1, 2], await($futures)); } + + public function testSynchronizedReentry(): void + { + $mutex = new LocalMutex; + $count = 0; + + synchronized($mutex, function () use ($mutex, &$count) { + $count++; + + synchronized($mutex, function () use (&$count) { + $count++; + }); + }); + + self::assertSame(2, $count); + } + + public function testSynchronizedReentryAsync(): void + { + $mutex = new LocalMutex; + $count = 0; + + synchronized($mutex, function () use ($mutex, &$count) { + async(function () use ($mutex, &$count) { + synchronized($mutex, function () use (&$count) { + $count++; + }); + }); + + delay(1); + + $count = 10; + }); + + delay(2); + + // The async synchronized block must be executed after $count = 10 is executed + self::assertSame(11, $count); + } + + public function testSynchronizedReentryDifferentLocks(): void + { + $mutexA = new LocalMutex; + $mutexB = new LocalMutex; + + $lock = $mutexB->acquire(); + + $op = async(function () use ($mutexA, $mutexB) { + print 'Before '; + + synchronized($mutexA, function () use ($mutexB) { + print 'before '; + + synchronized($mutexB, function () { + print 'X '; + }); + + print 'after '; + }); + + print 'After '; + }); + + delay(1); + + print 'Unlock '; + + $lock->release(); + + $op->await(); + + self::expectOutputString('Before before Unlock X after After '); + } }