Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed May 24, 2022
1 parent 0a64151 commit 7f22fd7
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 77 deletions.
18 changes: 9 additions & 9 deletions src/Drivers/EvLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use EventLoop\Exception\LoopException;
use EventLoop\Storage;
use EvLoop as BaseEvLoop;
use Closure;

class EvLoop implements LoopInterface
{
Expand Down Expand Up @@ -38,7 +39,7 @@ public function __construct()
}

/** @inheritDoc */
public function addReadStream($stream, callable $handler): void
public function addReadStream($stream, Closure $handler): void
{
if(is_resource($stream)){
$event = new \EvIo($stream,\Ev::READ, $handler);
Expand All @@ -58,7 +59,7 @@ public function delReadStream($stream): void
}

/** @inheritDoc */
public function addWriteStream($stream, callable $handler): void
public function addWriteStream($stream, Closure $handler): void
{
if(is_resource($stream)){
$event = new \EvIo($stream, \Ev::WRITE, $handler);
Expand All @@ -78,14 +79,14 @@ public function delWriteStream($stream): void
}

/** @inheritDoc */
public function addSignal(int $signal, callable $handler): void
public function addSignal(int $signal, Closure $handler): void
{
$event = new \EvSignal($signal, $handler);
$this->_signals[$signal] = $event;
}

/** @inheritDoc */
public function delSignal(int $signal, callable $handler): void
public function delSignal(int $signal, Closure $handler): void
{
if(isset($this->_signals[$signal])){
/** @var \EvSignal $event */
Expand All @@ -96,15 +97,14 @@ public function delSignal(int $signal, callable $handler): void
}

/** @inheritDoc */
public function addTimer(float $delay, float $repeat, callable $callback): int
public function addTimer(float $delay, float $repeat, Closure $callback): string
{
return $this->_storage->add(
$event = new \EvTimer($delay, $repeat, $callback)
);
$event = new \EvTimer($delay, $repeat, $callback);
return $this->_storage->add(spl_object_hash($event), $event);
}

/** @inheritDoc */
public function delTimer(int $timerId): void
public function delTimer(string $timerId): void
{
/** @var \EvTimer $event */
if($event = $this->_storage->get($timerId)){
Expand Down
20 changes: 10 additions & 10 deletions src/Drivers/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use EventBase;
use Event;
use EventLoop\Storage;
use Closure;

class EventLoop implements LoopInterface
{
Expand Down Expand Up @@ -36,7 +37,7 @@ public function __construct()
}

/** @inheritDoc */
public function addReadStream($stream, callable $handler): void
public function addReadStream($stream, Closure $handler): void
{
if(is_resource($stream)){
$event = new Event($this->_eventBase, $stream, \Event::READ | \Event::PERSIST, $handler);
Expand All @@ -58,7 +59,7 @@ public function delReadStream($stream): void
}

/** @inheritDoc */
public function addWriteStream($stream, callable $handler): void
public function addWriteStream($stream, Closure $handler): void
{
if(is_resource($stream)){
$event = new Event($this->_eventBase, $stream, Event::WRITE | Event::PERSIST, $handler);
Expand All @@ -80,7 +81,7 @@ public function delWriteStream($stream): void
}

/** @inheritDoc */
public function addSignal(int $signal, callable $handler): void
public function addSignal(int $signal, Closure $handler): void
{
$event = Event::signal($this->_eventBase, $signal, $handler);
if ($event or $event->add()) {
Expand All @@ -90,7 +91,7 @@ public function addSignal(int $signal, callable $handler): void
}

/** @inheritDoc */
public function delSignal(int $signal, callable $handler): void
public function delSignal(int $signal, Closure $handler): void
{
if(isset($this->_signals[$signal])){
/** @var Event $event */
Expand All @@ -101,11 +102,11 @@ public function delSignal(int $signal, callable $handler): void
}

/** @inheritDoc */
public function addTimer(float $delay, float $repeat, callable $callback): int
public function addTimer(float $delay, float $repeat, Closure $callback): string
{
$id = $this->_storage->id();
$event = new Event($this->_eventBase, -1, \Event::TIMEOUT, function () use(&$event, $repeat, $callback){
$id = spl_object_hash($event);

$event = new Event($this->_eventBase, -1, \Event::TIMEOUT, function () use($repeat, $id, $callback){
$callback();

if($repeat === 0.0){
Expand All @@ -115,15 +116,14 @@ public function addTimer(float $delay, float $repeat, callable $callback): int
$event->add($repeat);
$this->_storage->set($id, $event);
}

});
$event->add($delay);

return $this->_storage->add($event);
return $this->_storage->add(spl_object_hash($event), $event);
}

/** @inheritDoc */
public function delTimer(int $timerId): void
public function delTimer(string $timerId): void
{
/** @var Event $events */
if($event = $this->_storage->get($timerId)){
Expand Down
27 changes: 14 additions & 13 deletions src/Drivers/LoopInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,33 @@

namespace EventLoop\Drivers;

use Closure;
use EventLoop\Exception\LoopException;

interface LoopInterface
{
/**
* 创建信号处理
* @param int $signal
* @param callable $handler
* @param Closure $handler
* @throws LoopException
*/
public function addSignal(int $signal, callable $handler): void;
public function addSignal(int $signal, Closure $handler): void;

/**
* 移除信号处理
* @param int $signal
* @param callable $handler
* @param Closure $handler
*/
public function delSignal(int $signal, callable $handler): void;
public function delSignal(int $signal, Closure $handler): void;

/**
* 创建读流
* @param resource $stream
* @param callable $handler
* @param Closure $handler
* @throws LoopException
*/
public function addReadStream($stream, callable $handler): void;
public function addReadStream($stream, Closure $handler): void;

/**
* 移除读流
Expand All @@ -39,10 +40,10 @@ public function delReadStream($stream): void;
/**
* 创建写流
* @param resource $stream
* @param callable $handler
* @param Closure $handler
* @throws LoopException
*/
public function addWriteStream($stream, callable $handler): void;
public function addWriteStream($stream, Closure $handler): void;

/**
* 移除写流
Expand All @@ -54,16 +55,16 @@ public function delWriteStream($stream): void;
* 创建定时器
* @param float $delay
* @param float $repeat
* @param callable $callback
* @return int
* @param Closure $callback
* @return string
*/
public function addTimer(float $delay, float $repeat, callable $callback): int;
public function addTimer(float $delay, float $repeat, Closure $callback): string;

/**
* 移除定时触发器
* @param int $timerId
* @param string $timerId
*/
public function delTimer(int $timerId): void;
public function delTimer(string $timerId): void;

/**
* main loop.
Expand Down
37 changes: 19 additions & 18 deletions src/Drivers/NativeLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

use EventLoop\Exception\LoopException;
use EventLoop\Storage;
use EventLoop\Timer;
use SplPriorityQueue;
use Closure;

class NativeLoop implements LoopInterface
{
Expand All @@ -26,8 +29,8 @@ class NativeLoop implements LoopInterface
/** @var Storage 定时器容器 */
protected Storage $_storage;

/** @var \SplPriorityQueue 优先队列 */
protected \SplPriorityQueue $_queue;
/** @var SplPriorityQueue 优先队列 */
protected SplPriorityQueue $_queue;

protected bool $_stopped = false;

Expand All @@ -41,14 +44,14 @@ public function __construct()
throw new LoopException('not support: ext-pcntl');
}
$this->_storage = new Storage();
$this->_queue = new \SplPriorityQueue();
$this->_queue->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
$this->_queue = new SplPriorityQueue();
$this->_queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
$this->_readFds = [];
$this->_writeFds = [];
}

/** @inheritDoc */
public function addReadStream($stream, callable $handler): void
public function addReadStream($stream, Closure $handler): void
{
if(is_resource($stream)){
$key = (int) $stream;
Expand All @@ -72,7 +75,7 @@ public function delReadStream($stream): void
}

/** @inheritDoc */
public function addWriteStream($stream, callable $handler): void
public function addWriteStream($stream, Closure $handler): void
{
if(is_resource($stream)){
$key = (int) $stream;
Expand All @@ -96,7 +99,7 @@ public function delWriteStream($stream): void
}

/** @inheritDoc */
public function addSignal(int $signal, callable $handler): void
public function addSignal(int $signal, Closure $handler): void
{
$this->_signals[$signal] = $handler;
\pcntl_signal($signal, function($signal){
Expand All @@ -105,26 +108,23 @@ public function addSignal(int $signal, callable $handler): void
}

/** @inheritDoc */
public function delSignal(int $signal, callable $handler): void
public function delSignal(int $signal, Closure $handler): void
{
unset($this->_signals[$signal]);
\pcntl_signal($signal, \SIG_IGN);
}

/** @inheritDoc */
public function addTimer(float $delay, float $repeat, callable $callback): int
public function addTimer(float $delay, float $repeat, Closure $callback): string
{
$timer = new Timer($delay, $repeat, $callback);
$runTime = \hrtime(true) * 1e-9 + $delay;
$this->_queue->insert($this->_storage->id(), -$runTime);
return $this->_storage->add([
'delay' => $delay,
'repeat' => $repeat,
'callback' => $callback
]);
$this->_queue->insert($id = spl_object_hash($timer), -$runTime);
return $this->_storage->add($id, $timer);
}

/** @inheritDoc */
public function delTimer(int $timerId): void
public function delTimer(string $timerId): void
{
$this->_storage->del($timerId);
}
Expand Down Expand Up @@ -189,9 +189,10 @@ protected function _tick(): void
$data = $this->_queue->top();
$runTime = -$data['priority'];
$timerId = $data['data'];
/** @var Timer $data */
if($data = $this->_storage->get($timerId)){
$repeat = $data['repeat'];
$callback = $data['callback'];
$repeat = $data->getRepeat();
$callback = $data->getHandler();
$timeNow = \hrtime(true) * 1e-9;
if (($runTime - $timeNow) <= 0) {
$this->_queue->extract();
Expand Down
Loading

0 comments on commit 7f22fd7

Please sign in to comment.