Skip to content

Commit

Permalink
fix Pool bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Oct 11, 2024
1 parent 414ea3e commit e1e84fb
Showing 1 changed file with 48 additions and 15 deletions.
63 changes: 48 additions & 15 deletions src/Utils/Pool/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
namespace Workbunny\WebmanCoroutine\Utils\Pool;

use Workbunny\WebmanCoroutine\Exceptions\PoolException;
use Workerman\Worker;
use function Workbunny\WebmanCoroutine\wait_for;

class Pool
{

/**
* @var Pool[][]
*/
Expand Down Expand Up @@ -46,6 +46,13 @@ class Pool
*/
protected bool $_idle;

/**
* 是否是深拷贝
*
* @var bool
*/
protected bool $_clone;

/**
* 是否强制回收
*
Expand All @@ -56,18 +63,19 @@ class Pool
/**
* 创建
*
* @param string $name
* @param int $count
* @param mixed $element
* @param string $name 区域
* @param int $count 区域索引
* @param mixed $element 元素
* @param bool $clone 是否开启深拷贝
* @return Pool[]
*/
public static function create(string $name, int $count, mixed $element): array
public static function create(string $name, int $count, mixed $element, bool $clone = true): array
{
if (static::get($name, null)) {
throw new PoolException("Pools $name already exists. ", -1);
}
foreach (range(1, $count) as $i) {
self::$pools[$name][$i] = new Pool($name, $i, $element);
self::$pools[$name][$i] = new Pool($name, $i, $element, $clone);
}
return self::$pools[$name];
}
Expand Down Expand Up @@ -156,6 +164,9 @@ protected static function _deepCopyArray(array $array): array
{
$copy = [];
foreach ($array as $key => $value) {
if (is_callable($value)) {
Worker::log("Pool::deepCopyArray $key value is callable. ");
}
if (is_array($value)) {
$copy[$key] = self::_deepCopyArray($value);
} elseif (is_object($value)) {
Expand All @@ -170,25 +181,32 @@ protected static function _deepCopyArray(array $array): array
/**
* 构造
*
* @param string $name
* @param int $index 索引
* @param object|array|resource|mixed $element
* @param string $name 区域名称
* @param int $index 区域索引
* @param object|array|resource|mixed $element 元素
* @param bool $clone 是否执行深拷贝
*/
public function __construct(string $name, int $index, mixed $element)
public function __construct(string $name, int $index, mixed $element, bool $clone = true)
{
if (static::get($name, $index)) {
throw new PoolException("Pool $name#$index already exists. ", -2);
}
$this->_name = $name;
$this->_index = $index;
$this->_clone = $clone;
$this->setForce(false);
$this->setIdle(true);
$this->_element = match (true) {
/*
* 由于callable类型数据无法做到完美深拷贝,涉及到参数引用上下文问题,谨慎使用
*/
if (is_callable($element)) {
Worker::log("Pool $name#$index element is callable. ");
}
$this->_element = $clone ? match (true) {
is_object($element) => clone $element,
is_array($element) => self::_deepCopyArray($element),
is_callable($element) => call_user_func($element),
default => $element,
};
} : $element;
}

/**
Expand Down Expand Up @@ -231,6 +249,16 @@ public function getElement(): mixed
return $this->_element;
}

/**
* 是否是深拷贝
*
* @return bool
*/
public function isClone(): bool
{
return $this->_clone;
}

/**
* 是否闲置
*
Expand Down Expand Up @@ -276,7 +304,7 @@ public function setForce(bool $force): void
/**
* 等待至闲置
*
* @param \Closure|null $closure
* @param \Closure|null $closure 需要执行的逻辑 = function ($this) {}
* @return void
*/
public function wait(?\Closure $closure = null): void
Expand All @@ -285,7 +313,12 @@ public function wait(?\Closure $closure = null): void
return $this->isIdle();
});
if ($closure) {
call_user_func($closure, $this);
$this->setIdle(false);
try {
call_user_func($closure, $this);
} finally {
$this->setIdle(true);
}
}
}

Expand Down

0 comments on commit e1e84fb

Please sign in to comment.