Skip to content

Commit

Permalink
AMQP 长连接分布式解决方案 (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
web-flow authored Sep 30, 2021
1 parent a367939 commit d5b6454
Show file tree
Hide file tree
Showing 47 changed files with 2,995 additions and 61 deletions.
44 changes: 44 additions & 0 deletions src/Server/ConnectionContext/StoreHandler/Amqp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Imi\Swoole\Server\ConnectionContext\StoreHandler;

use Imi\Bean\Annotation\Bean;
use Imi\RequestContext;
use Imi\Server\ConnectionContext\StoreHandler\Local;

/**
* @Bean("ConnectionContextAmqp")
*/
class Amqp extends Local
{
/**
* {@inheritDoc}
*/
public function bind(string $flag, $clientId): void
{
$needBind = !$this->getClientIdByFlag($flag);
parent::bind($flag, $clientId);
if ($needBind)
{
/** @var \Imi\Swoole\Server\Util\Amqp\AmqpServerConsumer $amqpServerConsumer */
$amqpServerConsumer = RequestContext::getServerBean('AmqpServerConsumer');
$amqpServerConsumer->bindRoutingKey('flag.' . $flag);
}
}

/**
* {@inheritDoc}
*/
public function unbind(string $flag, $clientId, ?int $keepTime = null): void
{
parent::unbind($flag, $clientId, $keepTime);
if (!$this->getClientIdByFlag($flag))
{
/** @var \Imi\Swoole\Server\Util\Amqp\AmqpServerConsumer $amqpServerConsumer */
$amqpServerConsumer = RequestContext::getServerBean('AmqpServerConsumer');
$amqpServerConsumer->unbindRoutingKey('flag.' . $flag);
}
}
}
41 changes: 41 additions & 0 deletions src/Server/Group/Handler/Amqp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Imi\Swoole\Server\Group\Handler;

use Imi\Bean\Annotation\Bean;
use Imi\RequestContext;
use Imi\Server\Group\Handler\Local;

/**
* @Bean("GroupAmqp")
*/
class Amqp extends Local
{
/**
* {@inheritDoc}
*/
public function createGroup(string $groupName, int $maxClients = -1): void
{
if ($this->hasGroup($groupName))
{
return;
}
parent::createGroup($groupName, $maxClients);
/** @var \Imi\Swoole\Server\Util\Amqp\AmqpServerConsumer $amqpServerConsumer */
$amqpServerConsumer = RequestContext::getServerBean('AmqpServerConsumer');
$amqpServerConsumer->bindRoutingKey('group.' . $groupName);
}

/**
* {@inheritDoc}
*/
public function closeGroup(string $groupName): void
{
parent::closeGroup($groupName);
/** @var \Imi\Swoole\Server\Util\Amqp\AmqpServerConsumer $amqpServerConsumer */
$amqpServerConsumer = RequestContext::getServerBean('AmqpServerConsumer');
$amqpServerConsumer->unbindRoutingKey('group.' . $groupName);
}
}
108 changes: 108 additions & 0 deletions src/Server/Util/Amqp/AmqpServerConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?php

declare(strict_types=1);

namespace Imi\Swoole\Server\Util\Amqp;

use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BaseConsumer;
use Imi\AMQP\Contract\IMessage;
use Imi\AMQP\Enum\ConsumerResult;
use Imi\AMQP\Pool\AMQPPool;
use Imi\App;
use Imi\Bean\Annotation\Bean;
use Imi\RequestContext;
use Imi\Server\Server;
use Imi\Server\ServerManager;
use Imi\Swoole\Server\Util\AmqpServerUtil;
use Imi\Worker;

if (class_exists(\Imi\AMQP\Main::class))
{
/**
* @Bean("AmqpServerConsumer")
*/
class AmqpServerConsumer extends BaseConsumer
{
protected AmqpServerUtil $amqpServerUtil;

/**
* {@inheritDoc}
*/
public function initConfig(): void
{
/** @var AmqpServerUtil $amqpServerUtil */
$amqpServerUtil = $this->amqpServerUtil = RequestContext::getServerBean('AmqpServerUtil');
$this->exchanges = [$exchangeAnnotation = new Exchange($amqpServerUtil->getExchangeConfig())];
$queueConfig = $amqpServerUtil->getQueueConfig();
$queueName = ($queueConfig['name'] .= Worker::getWorkerId());
$this->queues = [new Queue($queueConfig)];
$consumerAnnotation = new Consumer();
$consumerAnnotation->queue = $queueName;
$consumerAnnotation->exchange = $exchangeAnnotation->name;
$consumerAnnotation->routingKey = 'all';
$this->consumers = [$consumerAnnotation];
$this->poolName = $amqpServerUtil->getAmqpName() ?? AMQPPool::getDefaultPoolName();
}

/**
* 绑定路由键.
*/
public function bindRoutingKey(string $routingKey): void
{
$channel = $this->getConnection()->channel();
$channel->queue_bind($this->consumers[0]->queue, $this->exchanges[0]->name, $routingKey);
}

/**
* 解绑路由键.
*/
public function unbindRoutingKey(string $routingKey): void
{
$channel = $this->getConnection()->channel();
$channel->queue_unbind($this->consumers[0]->queue, $this->exchanges[0]->name, $routingKey);
}

/**
* {@inheritDoc}
*
* @return mixed
*/
protected function consume(IMessage $message)
{
try
{
$data = json_decode($message->getBody(), true);
$serverName = $data['serverName'];
RequestContext::set('server', $server = ServerManager::getServer($serverName));
switch ($data['action'] ?? null)
{
case 'sendRawByFlag':
Server::sendRawByFlag($data['data'], $data['flag'], $serverName, false);
break;
case 'closeByFlag':
Server::closeByFlag($data['flag'], $serverName, false);
break;
case 'sendRawToGroup':
Server::sendRawToGroup($data['group'], $data['data'], $serverName, false);
break;
case 'sendRawToAll':
Server::sendRawToAll($data['data'], $serverName, false);
break;
}

return ConsumerResult::ACK;
}
catch (\Throwable $th)
{
/** @var \Imi\Log\ErrorLog $errorLog */
$errorLog = App::getBean('ErrorLog');
$errorLog->onException($th);

return ConsumerResult::NACK;
}
}
}
}
34 changes: 34 additions & 0 deletions src/Server/Util/Amqp/AmqpServerPublisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Imi\Swoole\Server\Util\Amqp;

use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Base\BasePublisher;
use Imi\AMQP\Pool\AMQPPool;
use Imi\Bean\Annotation\Bean;
use Imi\RequestContext;
use Imi\Swoole\Server\Util\AmqpServerUtil;

if (class_exists(\Imi\AMQP\Main::class))
{
/**
* @Bean("AmqpServerPublisher")
*/
class AmqpServerPublisher extends BasePublisher
{
protected AmqpServerUtil $amqpServerUtil;

/**
* {@inheritDoc}
*/
public function initConfig(): void
{
/** @var AmqpServerUtil $amqpServerUtil */
$amqpServerUtil = $this->amqpServerUtil = RequestContext::getServerBean('AmqpServerUtil');
$this->exchanges = [new Exchange($amqpServerUtil->getExchangeConfig())];
$this->poolName = $amqpServerUtil->getAmqpName() ?? AMQPPool::getDefaultPoolName();
}
}
}
Loading

0 comments on commit d5b6454

Please sign in to comment.