diff --git a/src/Handlers/DefaultHandler.php b/src/Handlers/DefaultHandler.php index ebe0755..28b6104 100644 --- a/src/Handlers/DefaultHandler.php +++ b/src/Handlers/DefaultHandler.php @@ -58,7 +58,7 @@ public static function waitFor(?\Closure $closure = null, float|int $timeout = - return; } // @codeCoverageIgnoreEnd - sleep(max(intval($timeout), 0)); + sleep(0); } } } diff --git a/src/Handlers/RevoltHandler.php b/src/Handlers/RevoltHandler.php index 349109b..413d705 100644 --- a/src/Handlers/RevoltHandler.php +++ b/src/Handlers/RevoltHandler.php @@ -48,7 +48,7 @@ public static function waitFor(?\Closure $closure = null, float|int $timeout = - if ($timeout > 0 && microtime(true) - $time >= $timeout) { throw new TimeoutException("Timeout after $timeout seconds."); } - RevoltHandler::sleep($timeout); + RevoltHandler::sleep(0); } } diff --git a/src/Handlers/RippleHandler.php b/src/Handlers/RippleHandler.php index dc34fda..c060dd5 100644 --- a/src/Handlers/RippleHandler.php +++ b/src/Handlers/RippleHandler.php @@ -47,7 +47,7 @@ public static function waitFor(?\Closure $closure = null, float|int $timeout = - if ($timeout > 0 && microtime(true) - $time >= $timeout) { throw new TimeoutException("Timeout after $timeout seconds."); } - static::_sleep($timeout); + static::_sleep(0); } } diff --git a/src/Handlers/SwooleHandler.php b/src/Handlers/SwooleHandler.php index f302747..5094080 100644 --- a/src/Handlers/SwooleHandler.php +++ b/src/Handlers/SwooleHandler.php @@ -40,7 +40,7 @@ public static function waitFor(?\Closure $closure = null, float|int $timeout = - if ($timeout > 0 && microtime(true) - $time >= $timeout) { throw new TimeoutException("Timeout after $timeout seconds."); } - usleep(max((int) ($timeout * 1000 * 1000), 0)); + usleep(0); } } } diff --git a/src/Handlers/SwowHandler.php b/src/Handlers/SwowHandler.php index 8851bf5..b176c65 100644 --- a/src/Handlers/SwowHandler.php +++ b/src/Handlers/SwowHandler.php @@ -42,7 +42,7 @@ public static function waitFor(?\Closure $closure = null, float|int $timeout = - if ($timeout > 0 && microtime(true) - $time >= $timeout) { throw new TimeoutException("Timeout after $timeout seconds."); } - usleep(max((int) ($timeout * 1000 * 1000), 0)); + usleep(0); } } } diff --git a/src/Utils/WaitGroup/Handlers/RevoltWaitGroup.php b/src/Utils/WaitGroup/Handlers/RevoltWaitGroup.php index 9a43d57..0a49400 100644 --- a/src/Utils/WaitGroup/Handlers/RevoltWaitGroup.php +++ b/src/Utils/WaitGroup/Handlers/RevoltWaitGroup.php @@ -7,13 +7,16 @@ namespace Workbunny\WebmanCoroutine\Utils\WaitGroup\Handlers; -use Workbunny\WebmanCoroutine\Handlers\RevoltHandler; +use Revolt\EventLoop\Suspension; +use Revolt\EventLoop; class RevoltWaitGroup implements WaitGroupInterface { /** @var int */ protected int $_count; + protected ?Suspension $_suspension = null; + /** @inheritdoc */ public function __construct() { @@ -32,6 +35,7 @@ public function __destruct() } } finally { $this->_count = 0; + $this->_suspension = null; } } @@ -47,6 +51,9 @@ public function add(int $delta = 1): bool public function done(): bool { $this->_count--; + if ($this->_count <= 0) { + $this->_suspension?->resume(); + } return true; } @@ -60,15 +67,12 @@ public function count(): int /** @inheritdoc */ public function wait(int|float $timeout = -1): void { - $time = microtime(true); - while (1) { - if ($timeout > 0 and microtime(true) - $time >= $timeout) { - return; - } - if ($this->_count <= 0) { - return; - } - RevoltHandler::sleep(max($timeout, 0)); + $this->_suspension = EventLoop::getSuspension(); + if ($timeout > 0) { + EventLoop::delay($timeout, function () { + $this->_suspension?->resume(); + }); } + $this->_suspension->suspend(); } } diff --git a/src/Utils/WaitGroup/Handlers/RippleWaitGroup.php b/src/Utils/WaitGroup/Handlers/RippleWaitGroup.php index 187ed74..260d707 100644 --- a/src/Utils/WaitGroup/Handlers/RippleWaitGroup.php +++ b/src/Utils/WaitGroup/Handlers/RippleWaitGroup.php @@ -7,11 +7,19 @@ namespace Workbunny\WebmanCoroutine\Utils\WaitGroup\Handlers; +use Closure; +use Revolt\EventLoop\Suspension; + class RippleWaitGroup implements WaitGroupInterface { /** @var int */ protected int $_count; + /** + * @var Suspension|null + */ + protected ?Suspension $_suspension = null; + /** @inheritdoc */ public function __construct() { @@ -30,6 +38,7 @@ public function __destruct() } } finally { $this->_count = 0; + $this->_suspension = null; } } @@ -45,6 +54,9 @@ public function add(int $delta = 1): bool public function done(): bool { $this->_count--; + if ($this->_count <= 0) { + $this->_suspension?->resume(); + } return true; } @@ -58,25 +70,32 @@ public function count(): int /** @inheritdoc */ public function wait(int|float $timeout = -1): void { - $time = microtime(true); - while (1) { - if ($timeout > 0 and microtime(true) - $time >= $timeout) { - return; - } - if ($this->_count <= 0) { - return; - } - $this->_sleep(0); + $this->_suspension = $this->_getSuspension(); + if ($timeout > 0) { + $this->_delay(function () { + $this->_suspension?->resume(); + }, $timeout); } + $this->_suspension->suspend(); + } + + /** + * @codeCoverageIgnore 用于测试mock,忽略覆盖 + * @param Closure $closure + * @param int|float $timeout + * @return string + */ + protected function _delay(Closure $closure, int|float $timeout): string + { + return \Co\delay($closure, max($timeout, 0.1)); } /** - * @codeCoverageIgnore 测试mock,忽略覆盖 - * @param int|float $second - * @return void + * @codeCoverageIgnore 用于测试mock,忽略覆盖 + * @return Suspension */ - protected function _sleep(int|float $second): void + protected function _getSuspension(): Suspension { - \Co\sleep(max($second, 0)); + return \Co\getSuspension(); } } diff --git a/tests/UtilsCase/WaitGroup/RippleWaitGroupTest.php b/tests/UtilsCase/WaitGroup/RippleWaitGroupTest.php index ff0626e..c75aed3 100644 --- a/tests/UtilsCase/WaitGroup/RippleWaitGroupTest.php +++ b/tests/UtilsCase/WaitGroup/RippleWaitGroupTest.php @@ -41,14 +41,24 @@ public function testCount() public function testWait() { + $suspensionMock = Mockery::mock('alias:\Revolt\EventLoop\Suspension'); + $suspensionMock->shouldReceive('resume')->andReturnNull(); + $suspensionMock->shouldReceive('suspend')->andReturnNull(); $partialMock = Mockery::mock(RippleWaitGroup::class, [1])->makePartial(); $partialMock->add(); $partialMock->shouldAllowMockingProtectedMethods()->shouldReceive('_sleep')->andReturnNull(); + $partialMock->shouldAllowMockingProtectedMethods()->shouldReceive('_getSuspension')->andReturn($suspensionMock); + $partialMock->done(); $partialMock->wait(); $this->assertEquals(0, $partialMock->count()); + $partialMock->shouldAllowMockingProtectedMethods()->shouldReceive('_delay')->andReturnUsing(function ($callback, $timeout) { + $this->assertEquals(1, $timeout); + $callback(); + return 'delayEventId'; + }); $partialMock->add(); $partialMock->wait(1); $this->assertEquals(1, $partialMock->count()); diff --git a/tests/UtilsCase/WaitGroup/SwowWaitGroupTest.php b/tests/UtilsCase/WaitGroup/SwowWaitGroupTest.php index f6cd416..b5608ad 100644 --- a/tests/UtilsCase/WaitGroup/SwowWaitGroupTest.php +++ b/tests/UtilsCase/WaitGroup/SwowWaitGroupTest.php @@ -8,9 +8,6 @@ use Workbunny\Tests\TestCase; use Workbunny\WebmanCoroutine\Utils\WaitGroup\Handlers\SwowWaitGroup; -/** - * @runTestsInSeparateProcesses - */ class SwowWaitGroupTest extends TestCase { protected int $_count = 0; @@ -25,13 +22,9 @@ protected function tearDown(): void public function testAdd() { $swowMock = Mockery::mock('alias:Swow\Sync\WaitGroup'); - $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function () { + $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function ($delta) { // 模拟增加计数 - $this->_count++; - }); - $swowMock->shouldReceive('count')->with(1)->andReturnUsing(function () { - // 模拟增加计数 - return $this->_count; + $this->_count += $delta; }); $wg = new SwowWaitGroup(); @@ -47,18 +40,14 @@ public function testAdd() public function testDone() { $swowMock = Mockery::mock('alias:Swow\Sync\WaitGroup'); - $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function () { + $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function ($delta) { // 模拟增加计数 - $this->_count++; + $this->_count += $delta; }); $swowMock->shouldReceive('done')->andReturnUsing(function () { // 模拟减少计数 $this->_count--; }); - $swowMock->shouldReceive('count')->with(1)->andReturnUsing(function () { - // 模拟增加计数 - return $this->_count; - }); $wg = new SwowWaitGroup(); $reflection = new \ReflectionClass($wg); @@ -74,13 +63,9 @@ public function testDone() public function testCount() { $swowMock = Mockery::mock('alias:Swow\Sync\WaitGroup'); - $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function () { + $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function ($delta) { // 模拟增加计数 - $this->_count++; - }); - $swowMock->shouldReceive('count')->with(1)->andReturnUsing(function () { - // 模拟增加计数 - return $this->_count; + $this->_count += $delta; }); $wg = new SwowWaitGroup(); $reflection = new \ReflectionClass($wg); @@ -96,18 +81,14 @@ public function testCount() public function testWait() { $swowMock = Mockery::mock('alias:Swow\Sync\WaitGroup'); - $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function () { + $swowMock->shouldReceive('add')->with(1)->andReturnUsing(function ($delta) { // 模拟增加计数 - $this->_count++; + $this->_count += $delta; }); $swowMock->shouldReceive('done')->andReturnUsing(function () { // 模拟减少计数 $this->_count--; }); - $swowMock->shouldReceive('count')->with(1)->andReturnUsing(function () { - // 模拟增加计数 - return $this->_count; - }); $swowMock->shouldReceive('wait')->with(-1)->andReturnNull(); $wg = new SwowWaitGroup();