Skip to content

Commit

Permalink
Merge pull request hove-io#109 from hove-io/zmq_more_routing_frames
Browse files Browse the repository at this point in the history
[ZMQ] allow more than one frame to identify the client
  • Loading branch information
pbench authored May 17, 2022
2 parents 21a142f + d65dec0 commit 260627d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 48 deletions.
114 changes: 67 additions & 47 deletions zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,86 +26,106 @@ void LoadBalancer::bind(const std::string& clients_socket_path, const std::strin
}

void LoadBalancer::run() {

std::vector<std::string> frames {};
while (true) {
zmq_pollitem_t items[] = {{static_cast<void*>(workers), 0, ZMQ_POLLIN, 0},
{static_cast<void*>(clients), 0, ZMQ_POLLIN, 0}};
if (avalailable_worker.empty()) {
if (available_workers.empty()) {
// we don't look for request from client if there is no worker for handling them
zmq::poll(items, 1, -1);
} else {
zmq::poll(items, 2, -1);
}
// handle worker
if (items[0].revents & ZMQ_POLLIN) {
// the first frame is the identifier of the worker: we add it to the available worker
avalailable_worker.push(z_recv(workers));
{
// Second frame is empty
std::string empty = z_recv(workers);
assert(empty.size() == 0);

// the first frame is the identifier of the worker: we add it to the available workers list
available_workers.push(z_recv(workers));

// the second frame should be empty
std::string empty = z_recv(workers);
if (empty != "") {
// just skip the rest of the message
continue;
}

// Third frame is READY or else a client reply address
std::string client_addr = z_recv(workers);

// If client reply, send resp back to the appropriate client
if (client_addr != "READY") {
{
// another empty frame
std::string empty = z_recv(workers);
assert(empty.size() == 0);
}
// the actual reply
zmq::message_t reply;
workers.recv(&reply);
z_send(clients, client_addr, ZMQ_SNDMORE);
z_send(clients, "", ZMQ_SNDMORE);
z_send(clients, reply);
frames.clear();
size_t more = 0;
size_t more_size = sizeof (more);
do {
std::string frame = z_recv(workers);
frames.push_back(frame);
// Are there more frames coming?
workers.getsockopt(ZMQ_RCVMORE, &more, &more_size);
} while (more);


if (frames.size() < 3) {
continue;
}
if (frames[0] == "READY") {
// the worker just signaled it is ready, nothing to do
continue;
}

//here we should get a response from the worker

// send every remaining frames to the client
for (size_t idx = 0; idx < frames.size() - 1; ++ idx) {
z_send(clients, frames[idx], ZMQ_SNDMORE);
}
z_send(clients, frames.back());

}
// handle clients request
if (items[1].revents & ZMQ_POLLIN){

// The client request is a multi-part ZMQ message, we have to check every frame and be sure the multi-part message frame
// is composed as we wish, otherwise the multi-part message may be shifted unexpectedly.
// is composed as we wish,

// The multi-part ZMQ message should have 3 parts
// The first one is the ID of message
// The second one is an empty frame
// The third one is the real request
// The multi-part ZMQ message should have :
// - one or more frames identifying the client
// - then an empty frame
// - and finally one frame with the actual request payload
size_t more = 0;
size_t more_size = sizeof (more);

size_t nb_frames = 0;
// there is no copy/move constructor in message_t in v2.2, which is the verison used by Jenkins...
std::array<zmq::message_t, 3> frames{};
frames.clear();

do {
zmq::message_t frame{};
clients.recv(&frame);
std::string frame = z_recv(clients);
frames.push_back(frame);

if (nb_frames < 3) {
frames[nb_frames].move(&frame);
}
// Are there more frames coming?
clients.getsockopt(ZMQ_RCVMORE, &more, &more_size);
nb_frames++;
} while (more);

if (nb_frames > 3 || frames.at(1).size() != 0 ) {
z_send(clients, "");
throw navitia::recoverable_exception{"bad ZMQ message has been received and ignored"};

// if we have less than 3 frames, the message is ill-formed, and we ignore it
if (frames.size() < 3) {
continue;
}
// if we the penultimate frame is not empty, the message is ill-formed, and we ignore it
if (frames[frames.size() - 2] != "") {
continue;
}

std::string worker_addr = avalailable_worker.top();
avalailable_worker.pop();
std::string worker_addr = available_workers.top();
available_workers.pop();

// let's forward the message to the workers

// a first frame identifying the worker to route the request to
z_send(workers, worker_addr, ZMQ_SNDMORE);
// then an empty frame
z_send(workers, "", ZMQ_SNDMORE);
// frames[0] is the id of message
z_send(workers, frames[0], ZMQ_SNDMORE);
z_send(workers, "", ZMQ_SNDMORE);
// frames[2] is the request
z_send(workers, frames[2]);

// and then the message from the client
for (size_t idx = 0; idx < frames.size() - 1; ++ idx) {
z_send(workers, frames[idx], ZMQ_SNDMORE);
}
z_send(workers, frames.back());
}
}
}
2 changes: 1 addition & 1 deletion zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void z_send(zmq::socket_t& socket, zmq::message_t& msg, int flags = 0);
std::string z_recv(zmq::socket_t& socket);

class LoadBalancer {
std::stack<std::string> avalailable_worker;
std::stack<std::string> available_workers;
zmq::socket_t clients;
zmq::socket_t workers;

Expand Down

0 comments on commit 260627d

Please sign in to comment.