Skip to content

Commit

Permalink
Future/zmq/publish subscribe (#17)
Browse files Browse the repository at this point in the history
* added implementation of publish/subscribe pattern in ZMQ
  • Loading branch information
Mararok authored Oct 12, 2016
1 parent 78e29e7 commit 0e7ace5
Show file tree
Hide file tree
Showing 6 changed files with 562 additions and 0 deletions.
34 changes: 34 additions & 0 deletions examples/Zmq/PublishSubscribe/example.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

use SAREhub\Commons\Misc\Dsn;
use SAREhub\Commons\Zmq\PublishSubscribe\Publisher;
use SAREhub\Commons\Zmq\RequestReply\RequestSender;

echo "Zmq.PublishSubscribe example\n";

function logMessage($message) {
echo date('[H:i:s] ').$message."\n";
}

$publisher = Publisher::inContext(new ZMQContext())->bind(Dsn::tcp()->endpoint('127.0.0.1:10000'));
logMessage("Connected");
logMessage("start subscriber");

$p = proc_open('php '.__DIR__.'/subscriber.php', [
1 => array("pipe", "w")
], $pipes);

logMessage("started subscriber");
sleep(2);

logMessage("Publishing message ...");
$publisher->publish("topic", "message", true);
sleep(5);
$publisher->unbind();
logMessage("unbinded");

echo "\nOutput from subscriber: \n";
echo "\n--------------------------------\n";
echo stream_get_contents($pipes[1]);
echo "\n--------------------------------\n";

31 changes: 31 additions & 0 deletions examples/Zmq/PublishSubscribe/subscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

use SAREhub\Commons\Misc\Dsn;
use SAREhub\Commons\Zmq\PublishSubscribe\Subscriber;

require dirname(dirname(dirname(__DIR__))).'/vendor/autoload.php';

function logMessage($message) {
echo date('[H:i:s] ').$message."\n";
}

try {
$subscriber = Subscriber::inContext(new ZMQContext())->connect(Dsn::tcp()->endpoint("127.0.0.1:10000"));
$subscriber->subscribe("topic");
logMessage("Connected");
logMessage("Waiting for message");

for ($i = 1; $i < 20; ++$i) {
logMessage("try receive $i");
if ($message = $subscriber->receive()) {
logMessage("Message: ".print_r($message, true));
break;
}
sleep(1);
}

$subscriber->disconnect();
logMessage("Disconnected");
} catch (Exception $e) {
echo $e;
}
93 changes: 93 additions & 0 deletions src/SAREhub/Commons/Zmq/PublishSubscribe/Publisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php

namespace SAREhub\Commons\Zmq\PublishSubscribe;

use SAREhub\Commons\Misc\Dsn;

/**
* Represents publisher ZMQ socket
*/
class Publisher {

/**
* @var Dsn
*/
private $dsn = null;

/**
* @var \ZMQSocket
*/
private $socket;

public function __construct(\ZMQContext $context) {
$this->socket = $context->getSocket(\ZMQ::SOCKET_PUB, null, null);
}

/**
* @param \ZMQContext $context
* @return Publisher
*/
public static function inContext(\ZMQContext $context) {
return new self($context);
}

/**
* @param string $topic
* @param string $message
* @param bool $wait
*/
public function publish($topic, $message, $wait = false) {
if (!$this->isBinded()) {
throw new \LogicException("Can't publish message on unbined socket");
}
$mode = ($wait) ? 0 : \ZMQ::MODE_DONTWAIT;
$this->getSocket()->sendmulti([$topic, $message], $mode);
}

/**
* @param Dsn $dsn
* @return $this
* @throws \LogicException, \ZMQException
*/
public function bind(Dsn $dsn) {
if ($this->isBinded()) {
throw new \LogicException("Can't bind binded socket");
}

$this->socket->bind((string)$dsn);
$this->dsn = $dsn;

return $this;
}

/**
* @throws \ZMQException
*/
public function unbind() {
if ($this->isBinded()) {
$this->socket->unbind((string)$this->getDsn());
$this->dsn = null;
}
}

/**
* @return bool
*/
public function isBinded() {
return $this->dsn !== null;
}

/**
* @return \ZMQSocket
*/
public function getSocket() {
return $this->socket;
}

/**
* @return Dsn
*/
public function getDsn() {
return $this->dsn;
}
}
143 changes: 143 additions & 0 deletions src/SAREhub/Commons/Zmq/PublishSubscribe/Subscriber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
<?php

namespace SAREhub\Commons\Zmq\PublishSubscribe;

use SAREhub\Commons\Misc\Dsn;

/**
* Represents subscriber ZMQ socket
*/
class Subscriber {

/**
* @var Dsn
*/
private $dsn = null;

/**
* @var array
*/
private $topics = [];

/**
* @var \ZMQSocket
*/
private $socket;

public function __construct(\ZMQContext $context) {
$this->socket = $context->getSocket(\ZMQ::SOCKET_SUB, null, null);
}


/**
* @param \ZMQContext $context
* @return Subscriber
*/
public static function inContext(\ZMQContext $context) {
return new self($context);
}

/**
* @param bool $wait
* @return null|array
*/
public function receive($wait = false) {
if ($this->isConnected()) {
$mode = ($wait) ? 0 : \ZMQ::MODE_DONTWAIT;
$parts = $this->getSocket()->recvMulti($mode);

if ($parts) {
return ['topic' => $parts[0], 'body' => $parts[1]];
}

return null;
}

throw new \LogicException("Can't receive message when socket isn't connect");
}

/**
* @param Dsn $dsn
* @return $this
*/
public function connect(Dsn $dsn) {
if ($this->isConnected()) {
throw new \LogicException("Can't connect on connected socket");
}

$this->getSocket()->connect((string)$dsn);

$this->dsn = $dsn;

return $this;
}

/**
* @param $topic
* @return $this
*/
public function subscribe($topic) {
if (!$this->isSubscribed($topic)) {
$this->getSocket()->setSockOpt(\ZMQ::SOCKOPT_SUBSCRIBE, $topic);
$this->topics[$topic] = true;
}

return $this;
}

public function unsubscribe($topic) {
if ($this->isSubscribed($topic)) {
$this->getSocket()->setSockOpt(\ZMQ::SOCKOPT_UNSUBSCRIBE, $topic);
unset($this->topics[$topic]);
}

return $this;
}

/**
* @param string $topic
* @return bool
*/
public function isSubscribed($topic) {
return isset($this->topics[$topic]);
}

/**
* @return array
*/
public function getTopics() {
return array_keys($this->topics);
}


/**
*
*/
public function disconnect() {
if ($this->isConnected()) {
$this->getSocket()->disconnect((string)$this->dsn);
$this->dsn = null;
}
}

/**
* @return bool
*/
public function isConnected() {
return $this->getDsn() !== null;
}

/**
* @return \ZMQSocket
*/
public function getSocket() {
return $this->socket;
}

/**
* @return Dsn
*/
public function getDsn() {
return $this->dsn;
}
}
Loading

0 comments on commit 0e7ace5

Please sign in to comment.