Skip to content

Commit

Permalink
refactor(lock): improve CoroutineLock implementation and manage chann…
Browse files Browse the repository at this point in the history
…els with WeakMap (#814)

* refactor(lock): improve CoroutineLock implementation and manage channels with WeakMap

* refactor(lock): streamline CoroutineLock channel management and improve owner tracking

* fix(lock): handle timeout and closing conditions in CoroutineLock

* fix(lock): correct push value in CoroutineLock to ensure proper lock acquisition

* fix(lock): improve lock acquisition logic in CoroutineLock to handle push failures and channel closing

* fix(lock): ensure proper channel assignment in CoroutineLock during closing conditions

* fix(lock): correct lock acquisition logic in CoroutineLock to handle push failures

* fix(lock): initialize owners map in CoroutineLock constructor to prevent null reference

* fix(lock): remove redundant channel closing check in CoroutineLock to streamline logic

* fix(lock): add timer functionality in CoroutineLock to manage lock expiration

* fix(lock): update release method to properly pop from channels in CoroutineLock

* fix(lock): update release method to include timeout for channel pop in CoroutineLock

* fix(lock): simplify constructor initialization in CoroutineLock

* fix(lock): refactor timer management in CoroutineLock to use WeakMap for better memory efficiency

* fix(lock): improve timer assignment and channel pop handling in CoroutineLock

* fix(lock): streamline timer initialization in CoroutineLock constructor

---------

Co-authored-by: Deeka Wong <[email protected]>
  • Loading branch information
huangdijia and huangdijia authored Dec 27, 2024
1 parent 14be4cc commit fdb550b
Showing 1 changed file with 62 additions and 15 deletions.
77 changes: 62 additions & 15 deletions src/lock/src/Driver/CoroutineLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,48 @@

namespace FriendsOfHyperf\Lock\Driver;

use Hyperf\Cache\Driver\CoroutineMemoryDriver;
use Hyperf\Coordinator\Timer;
use Hyperf\Engine\Channel;
use Override;
use Psr\SimpleCache\CacheInterface;

use function Hyperf\Support\make;
use Throwable;
use WeakMap;

class CoroutineLock extends AbstractLock
{
/**
* The cache store implementation.
* @var array<string, Channel>
*/
protected static array $channels = [];

/**
* @var WeakMap<Channel, string>|null
*/
protected CacheInterface $store;
protected static ?WeakMap $owners = null;

protected static ?Timer $timer = null;

/**
* @var WeakMap<Channel, int>|null
*/
protected static ?WeakMap $timers = null;

/**
* Create a new lock instance.
*/
public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = [])
{
public function __construct(
string $name,
int $seconds,
?string $owner = null,
array $constructor = []
) {
$constructor = array_merge(['prefix' => ''], $constructor);
$name = $constructor['prefix'] . $name;

parent::__construct($name, $seconds, $owner);

$constructor = array_merge(['prefix' => ''], $constructor);
$this->store = make(CoroutineMemoryDriver::class, ['config' => $constructor]);
self::$owners ??= new WeakMap();
self::$timers ??= new WeakMap();
self::$timer ??= new Timer();
}

/**
Expand All @@ -41,11 +61,28 @@ public function __construct(string $name, int $seconds, ?string $owner = null, a
#[Override]
public function acquire(): bool
{
if ($this->store->has($this->name)) {
try {
$chan = self::$channels[$this->name] ??= new Channel(1);

if (! $chan->push(1, 0.01)) {
return false;
}

self::$owners[$chan] = $this->owner;

if ($timeId = self::$timers[$chan] ?? null) {
self::$timer?->clear((int) $timeId);
}

if ($this->seconds > 0) {
$timeId = self::$timer?->after($this->seconds * 1000, fn () => $this->forceRelease());
$timeId && self::$timers[$chan] = $timeId;
}
} catch (Throwable) {
return false;
}

return $this->store->set($this->name, $this->owner, $this->seconds);
return true;
}

/**
Expand All @@ -55,7 +92,7 @@ public function acquire(): bool
public function release(): bool
{
if ($this->isOwnedByCurrentProcess()) {
return $this->store->delete($this->name);
return (self::$channels[$this->name] ?? null)?->pop(0.01) ? true : false;
}

return false;
Expand All @@ -67,7 +104,13 @@ public function release(): bool
#[Override]
public function forceRelease(): void
{
$this->store->delete($this->name);
if (! $chan = self::$channels[$this->name] ?? null) {
return;
}

self::$channels[$this->name] = null;

$chan->close();
}

/**
Expand All @@ -76,6 +119,10 @@ public function forceRelease(): void
*/
protected function getCurrentOwner()
{
return $this->store->get($this->name);
if (! $chan = self::$channels[$this->name] ?? null) {
return '';
}

return self::$owners[$chan] ?? '';
}
}

0 comments on commit fdb550b

Please sign in to comment.