From 0e7ace5edeff97870de8dd06d06ea158d5f26644 Mon Sep 17 00:00:00 2001 From: Andrzej Wasiak Date: Wed, 12 Oct 2016 15:38:36 +0200 Subject: [PATCH] Future/zmq/publish subscribe (#17) * added implementation of publish/subscribe pattern in ZMQ --- examples/Zmq/PublishSubscribe/example.php | 34 ++++ examples/Zmq/PublishSubscribe/subscriber.php | 31 ++++ .../Zmq/PublishSubscribe/Publisher.php | 93 ++++++++++ .../Zmq/PublishSubscribe/Subscriber.php | 143 ++++++++++++++++ .../Zmq/PublishSubscribe/PublisherTest.php | 99 +++++++++++ .../Zmq/PublishSubscribe/SubscriberTest.php | 162 ++++++++++++++++++ 6 files changed, 562 insertions(+) create mode 100644 examples/Zmq/PublishSubscribe/example.php create mode 100644 examples/Zmq/PublishSubscribe/subscriber.php create mode 100644 src/SAREhub/Commons/Zmq/PublishSubscribe/Publisher.php create mode 100644 src/SAREhub/Commons/Zmq/PublishSubscribe/Subscriber.php create mode 100644 tests/SAREhub/Commons/Zmq/PublishSubscribe/PublisherTest.php create mode 100644 tests/SAREhub/Commons/Zmq/PublishSubscribe/SubscriberTest.php diff --git a/examples/Zmq/PublishSubscribe/example.php b/examples/Zmq/PublishSubscribe/example.php new file mode 100644 index 0000000..62344e7 --- /dev/null +++ b/examples/Zmq/PublishSubscribe/example.php @@ -0,0 +1,34 @@ +bind(Dsn::tcp()->endpoint('127.0.0.1:10000')); +logMessage("Connected"); +logMessage("start subscriber"); + +$p = proc_open('php '.__DIR__.'/subscriber.php', [ + 1 => array("pipe", "w") +], $pipes); + +logMessage("started subscriber"); +sleep(2); + +logMessage("Publishing message ..."); +$publisher->publish("topic", "message", true); +sleep(5); +$publisher->unbind(); +logMessage("unbinded"); + +echo "\nOutput from subscriber: \n"; +echo "\n--------------------------------\n"; +echo stream_get_contents($pipes[1]); +echo "\n--------------------------------\n"; + diff --git a/examples/Zmq/PublishSubscribe/subscriber.php b/examples/Zmq/PublishSubscribe/subscriber.php new file mode 100644 index 0000000..64fd236 --- /dev/null +++ b/examples/Zmq/PublishSubscribe/subscriber.php @@ -0,0 +1,31 @@ +connect(Dsn::tcp()->endpoint("127.0.0.1:10000")); + $subscriber->subscribe("topic"); + logMessage("Connected"); + logMessage("Waiting for message"); + + for ($i = 1; $i < 20; ++$i) { + logMessage("try receive $i"); + if ($message = $subscriber->receive()) { + logMessage("Message: ".print_r($message, true)); + break; + } + sleep(1); + } + + $subscriber->disconnect(); + logMessage("Disconnected"); +} catch (Exception $e) { + echo $e; +} \ No newline at end of file diff --git a/src/SAREhub/Commons/Zmq/PublishSubscribe/Publisher.php b/src/SAREhub/Commons/Zmq/PublishSubscribe/Publisher.php new file mode 100644 index 0000000..97c896a --- /dev/null +++ b/src/SAREhub/Commons/Zmq/PublishSubscribe/Publisher.php @@ -0,0 +1,93 @@ +socket = $context->getSocket(\ZMQ::SOCKET_PUB, null, null); + } + + /** + * @param \ZMQContext $context + * @return Publisher + */ + public static function inContext(\ZMQContext $context) { + return new self($context); + } + + /** + * @param string $topic + * @param string $message + * @param bool $wait + */ + public function publish($topic, $message, $wait = false) { + if (!$this->isBinded()) { + throw new \LogicException("Can't publish message on unbined socket"); + } + $mode = ($wait) ? 0 : \ZMQ::MODE_DONTWAIT; + $this->getSocket()->sendmulti([$topic, $message], $mode); + } + + /** + * @param Dsn $dsn + * @return $this + * @throws \LogicException, \ZMQException + */ + public function bind(Dsn $dsn) { + if ($this->isBinded()) { + throw new \LogicException("Can't bind binded socket"); + } + + $this->socket->bind((string)$dsn); + $this->dsn = $dsn; + + return $this; + } + + /** + * @throws \ZMQException + */ + public function unbind() { + if ($this->isBinded()) { + $this->socket->unbind((string)$this->getDsn()); + $this->dsn = null; + } + } + + /** + * @return bool + */ + public function isBinded() { + return $this->dsn !== null; + } + + /** + * @return \ZMQSocket + */ + public function getSocket() { + return $this->socket; + } + + /** + * @return Dsn + */ + public function getDsn() { + return $this->dsn; + } +} \ No newline at end of file diff --git a/src/SAREhub/Commons/Zmq/PublishSubscribe/Subscriber.php b/src/SAREhub/Commons/Zmq/PublishSubscribe/Subscriber.php new file mode 100644 index 0000000..1470595 --- /dev/null +++ b/src/SAREhub/Commons/Zmq/PublishSubscribe/Subscriber.php @@ -0,0 +1,143 @@ +socket = $context->getSocket(\ZMQ::SOCKET_SUB, null, null); + } + + + /** + * @param \ZMQContext $context + * @return Subscriber + */ + public static function inContext(\ZMQContext $context) { + return new self($context); + } + + /** + * @param bool $wait + * @return null|array + */ + public function receive($wait = false) { + if ($this->isConnected()) { + $mode = ($wait) ? 0 : \ZMQ::MODE_DONTWAIT; + $parts = $this->getSocket()->recvMulti($mode); + + if ($parts) { + return ['topic' => $parts[0], 'body' => $parts[1]]; + } + + return null; + } + + throw new \LogicException("Can't receive message when socket isn't connect"); + } + + /** + * @param Dsn $dsn + * @return $this + */ + public function connect(Dsn $dsn) { + if ($this->isConnected()) { + throw new \LogicException("Can't connect on connected socket"); + } + + $this->getSocket()->connect((string)$dsn); + + $this->dsn = $dsn; + + return $this; + } + + /** + * @param $topic + * @return $this + */ + public function subscribe($topic) { + if (!$this->isSubscribed($topic)) { + $this->getSocket()->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, $topic); + $this->topics[$topic] = true; + } + + return $this; + } + + public function unsubscribe($topic) { + if ($this->isSubscribed($topic)) { + $this->getSocket()->setSockOpt(\ZMQ::SOCKOPT_UNSUBSCRIBE, $topic); + unset($this->topics[$topic]); + } + + return $this; + } + + /** + * @param string $topic + * @return bool + */ + public function isSubscribed($topic) { + return isset($this->topics[$topic]); + } + + /** + * @return array + */ + public function getTopics() { + return array_keys($this->topics); + } + + + /** + * + */ + public function disconnect() { + if ($this->isConnected()) { + $this->getSocket()->disconnect((string)$this->dsn); + $this->dsn = null; + } + } + + /** + * @return bool + */ + public function isConnected() { + return $this->getDsn() !== null; + } + + /** + * @return \ZMQSocket + */ + public function getSocket() { + return $this->socket; + } + + /** + * @return Dsn + */ + public function getDsn() { + return $this->dsn; + } +} \ No newline at end of file diff --git a/tests/SAREhub/Commons/Zmq/PublishSubscribe/PublisherTest.php b/tests/SAREhub/Commons/Zmq/PublishSubscribe/PublisherTest.php new file mode 100644 index 0000000..15a9e29 --- /dev/null +++ b/tests/SAREhub/Commons/Zmq/PublishSubscribe/PublisherTest.php @@ -0,0 +1,99 @@ +createMock(ZMQContext::class); + $contextMock->expects($this->once())->method('getSocket') + ->with(ZMQ::SOCKET_PUB)->willReturn($this->socketMock); + + $publisher = Publisher::inContext($contextMock); + $this->assertSame($this->socketMock, $publisher->getSocket()); + } + + public function testIsBindedWhenCreateThenReturnFalse() { + $this->assertFalse($this->publisher->isBinded()); + } + + public function testBindWhenNotBindedThenSocketCallBind() { + $this->socketMock->expects($this->once())->method('bind')->with((string)$this->dsn); + $this->publisher->bind($this->dsn); + } + + public function testBindWhenBindedThenThrowException() { + $this->publisher->bind($this->dsn); + $this->expectException(LogicException::class); + $this->publisher->bind($this->dsn); + } + + public function testBindThenReturnThis() { + $this->assertSame($this->publisher, $this->publisher->bind($this->dsn)); + } + + public function testIsBindWhenBindedThenReturnTrue() { + $this->publisher->bind($this->dsn); + $this->assertTrue($this->publisher->isBinded()); + } + + public function testUnbindWhenNotBindedThenNoop() { + $this->socketMock->expects($this->never())->method('unbind'); + $this->publisher->unbind(); + } + + public function testUnbindWhenBindedThenSocketCallUnbind() { + $this->publisher->bind($this->dsn); + $this->socketMock->expects($this->once())->method('unbind')->with((string)$this->dsn); + $this->publisher->unbind(); + } + + public function testUnbindWhenUnbindedThenIsBindedReturnFalse() { + $this->publisher->bind($this->dsn); + $this->publisher->unbind(); + $this->assertFalse($this->publisher->isBinded()); + } + + public function testPublishWhenNotBindedThenThrowException() { + $this->expectException(LogicException::class); + $this->publisher->publish("topic", "message"); + } + + public function testPublishWhenBindedThenSocketSend() { + $this->publisher->bind($this->dsn); + $this->socketMock->expects($this->once())->method('sendmulti') + ->with(['topic', 'message'], ZMQ::MODE_DONTWAIT); + $this->publisher->publish("topic", "message"); + } + + public function testPublishWhenWaitModeThenSocketSendWait() { + $this->publisher->bind($this->dsn); + $this->socketMock->expects($this->once())->method('sendmulti') + ->with(['topic', 'message'], 0); + $this->publisher->publish("topic", "message", true); + } + + protected function setUp() { + parent::setUp(); + $contextMock = $this->createMock(ZMQContext::class); + $this->socketMock = $this->createMock(ZMQSocket::class); + $contextMock->method('getSocket')->willReturn($this->socketMock); + $this->publisher = Publisher::inContext($contextMock); + + $this->dsn = Dsn::tcp()->endpoint('127.1.0.1'); + } +} diff --git a/tests/SAREhub/Commons/Zmq/PublishSubscribe/SubscriberTest.php b/tests/SAREhub/Commons/Zmq/PublishSubscribe/SubscriberTest.php new file mode 100644 index 0000000..57c39b6 --- /dev/null +++ b/tests/SAREhub/Commons/Zmq/PublishSubscribe/SubscriberTest.php @@ -0,0 +1,162 @@ +createMock(ZMQContext::class); + $this->socketMock = $this->createMock(ZMQSocket::class); + $context->method('getSocket')->willReturn($this->socketMock); + $this->subscriber = Subscriber::inContext($context); + $this->dsn = Dsn::tcp()->endpoint('127.0.0.1:1000'); + } + + public function testCreate() { + $context = $this->createMock(ZMQContext::class); + $context->expects($this->once())->method('getSocket') + ->with(ZMQ::SOCKET_SUB)->willReturn($this->socketMock); + $this->subscriber = Subscriber::inContext($context); + } + + public function testIsConnectedWhenCreatedThenReturnFalse() { + $this->assertFalse($this->subscriber->isConnected()); + } + + public function testConnectWhenNotConnectedThenSocketConnect() { + $this->socketMock->expects($this->once())->method('connect')->with((string)$this->dsn); + $this->subscriber->connect($this->dsn); + } + + public function testIsConnectedWhenConnectedThenReturnTrue() { + $this->subscriber->connect($this->dsn); + $this->assertTrue($this->subscriber->isConnected()); + } + + public function testConnectWhenConnectedThenThrowException() { + $this->subscriber->connect($this->dsn); + $this->expectException(LogicException::class); + $this->subscriber->connect($this->dsn); + } + + public function testConnectThenReturnThis() { + $this->assertSame($this->subscriber, $this->subscriber->connect($this->dsn)); + } + + public function testDisconnectWhenConnectedThenSocketCallDisconnect() { + $this->subscriber->connect($this->dsn); + $this->socketMock->expects($this->once())->method('disconnect')->with((string)$this->dsn); + $this->subscriber->disconnect(); + } + + public function testDisconnectWhenNotConnectedThenNoop() { + $this->socketMock->expects($this->never())->method('disconnect'); + $this->subscriber->disconnect(); + } + + public function testDisconnectWhenConnectedThenIsConnectedReturnFalse() { + $this->subscriber->connect($this->dsn); + $this->subscriber->disconnect(); + $this->assertFalse($this->subscriber->isConnected()); + } + + + public function testReceiveWhenNotConnectedThenThrowException() { + $this->expectException(LogicException::class); + $this->subscriber->receive(); + } + + public function testReceiveWhenConnectedThenSocketCallReceiveInNonBlocking() { + $this->subscriber->connect($this->dsn); + $this->socketMock->expects($this->once())->method('recvmulti') + ->with(ZMQ::MODE_DONTWAIT)->willReturn("topic"); + $this->subscriber->receive(); + } + + public function testReceiveWhenConnectedAndMessageThenReturnMessage() { + $this->subscriber->connect($this->dsn); + $this->socketMock->method('recvmulti')->willReturn(["topic1", "message"]); + $this->assertEquals(['topic' => "topic1", 'body' => "message"], $this->subscriber->receive()); + } + + public function testReceiveWhenConnectedAndNoMessageThenReturnNull() { + $this->subscriber->connect($this->dsn); + $this->socketMock->expects($this->once())->method('recvmulti')->willReturn(false); + $this->assertNull($this->subscriber->receive()); + } + + public function testReceiveWhenConnectedAndWaitModeThenSocketCallReceiveBlocking() { + $this->subscriber->connect($this->dsn); + $this->socketMock->expects($this->once())->method('recvmulti') + ->with(0)->willReturn(false); + $this->subscriber->receive(true); + } + + public function testSubscribeThenSocketSubscribeOptionSet() { + $topic = "topic"; + $this->socketMock->expects($this->once())->method('setSockOpt') + ->with(\ZMQ::SOCKOPT_SUBSCRIBE, $topic); + + $this->subscriber->subscribe($topic); + } + + public function testSubscribeWhenSubscribedThenNoop() { + $topic = "topic"; + $this->subscriber->subscribe($topic); + $this->socketMock->expects($this->never())->method('setSockOpt'); + $this->subscriber->subscribe($topic); + } + + public function testSubscribeThenReturnThis() { + $this->assertSame($this->subscriber, $this->subscriber->subscribe("topic")); + } + + public function testUnsubscribeWhenSubscribeThenSocketUnsubscribeOptionSet() { + $topic = "topic"; + $this->subscriber->subscribe($topic); + $this->socketMock->expects($this->once())->method('setSockOpt') + ->with(\ZMQ::SOCKOPT_UNSUBSCRIBE, $topic); + $this->subscriber->unsubscribe($topic); + } + + public function testUnsubscribeWhenNotSubscribedThenNoop() { + $this->socketMock->expects($this->never())->method('setSockOpt'); + $this->subscriber->unsubscribe("topic"); + } + + public function testUnsubscribeThenReturnThis() { + $this->assertSame($this->subscriber, $this->subscriber->unsubscribe("topic")); + } + + public function testGetTopicsWhenSubscribeThenReturnTopics() { + $this->subscriber->subscribe("topic"); + $this->assertEquals(['topic'], $this->subscriber->getTopics()); + } + + public function testIsSubscribedWhenNotSubscribeThenReturnFalse() { + $this->assertFalse($this->subscriber->isSubscribed("topic")); + } + + public function testIsSubscribedWhenSubscribeAndUnsubscribeThenReturnFalse() { + $topic = "topic"; + $this->subscriber->subscribe($topic); + $this->subscriber->unsubscribe($topic); + $this->assertFalse($this->subscriber->isSubscribed($topic)); + } + +}