Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker wont unregister on high workload. #7

Open
javadevmtl opened this issue May 7, 2013 · 5 comments
Open

Worker wont unregister on high workload. #7

javadevmtl opened this issue May 7, 2013 · 5 comments

Comments

@javadevmtl
Copy link

Ok I think I may have found an issue with the work queue on highly concurrent work load and what's causing me to ask crazy questions and running in circles. And it doesn't take much, 1 Jmeter user sending 250 requests per second.

Very easy to reproduce.

Create a simple http verticle and work queue with one worker. For each request made to the http verticle add a job to the work queue.

Setup JMeter to send requests to the http verticle. As JMeter is running attempt to unregister the worker, sometimes it does, sometimes it doesn't though the status ok is always returned.

It can be proven quite easily. If no work is being executed and you unregister and then send another request to the work queue the done handler never fires since obviously there's no worker to reply back. But if you keep adding jobs to the queue sometimes it cannot unregister the worker.

This is the http handler to test. Of course one must register a work queue and worker to complete the test.

public void handle(final HttpServerRequest req) {

req.bodyHandler(new Handler<Buffer>() {
    public void handle(Buffer body) {
        String bodyStr = body.toString();

        if(bodyStr.toLowerCase().contains("shutdown"))
        {
            final JsonObject message = new JsonObject(bodyStr);

            final String uid = message.getString("uid");
            System.out.println("Uid to remove: " + uid);

            eb.send("com.xxx.xxxxx.unregister", new JsonObject() {{ putString( "processor", uid ); }}, new Handler<Message<JsonObject>> () {
                @Override
                public void handle(Message<JsonObject> message) {

                    System.out.println("Received: " + message.body.toString()); // <--- Returns status ok all the time.
                }
                });

                req.response.end("Shut down requested!");
        }
        else
        {
            JsonObject message = new JsonObject();
            message.putString("message", bodyStr);

            eb.send("com.xxx.xxxxx", message, new Handler<Message<JsonObject>> () {
                @Override
                public void handle(Message<JsonObject> message) {
                    req.response.end(message.body.getString("message")); //<-- If no worker this won't happen.
                }
            });                         
        }
    }
});

}

Thoughts?

@purplefox
Copy link
Member

I'm not sure what you are trying to achieve here. Can you post a full example which can be run?

@javadevmtl
Copy link
Author

There's a race condition when trying to unregister a worker from the queue.

  • checkWork(): Removes the worker from the queue.
  • messageProcessed(): Puts the worker back on the queue.

If an unregister message comes in between the above 2 steps, the worker will not be unregistered. I.e: in doUnregister() processors.remove() will return false. It is possible to sneak in an unregister before or after the above 2 steps, but as the workers become more busy it becomes even harder to unregister.

With very simple tweaks on how the work queue handles the processors queue it ispossible to unregister the worker safely without disrupting any work.

Option 1: Don't remove the worker in checkWork()
Change processors from Queue to ArrayList and use an Atomic Integers to track position to get next worker from the ArrayList. Slower then current queue.

Option 2: Use a queue to track the received unregistered messages. When processedMessage() is supposed to return the processor back on the queue have it check the unregistered messages queue. If the processor is on that list don't put him back, otherwise put him back. Performance is on par.

@javadevmtl
Copy link
Author

Here is my fix using option 2

I added 2 new Queues called unQueue and registered. And modified the doRegister(), doUnregister() and messageProcessed()

package org.vertx.mods;

import org.vertx.java.busmods.BusModBase;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;

import java.util.LinkedList;
import java.util.Queue;

/**

  • Work Queue Bus Module

  • Please see the busmods manual for a full description


    *

  • @author Tim Fox
    */
    public class WorkQueue extends BusModBase {

    // LHS is typed as ArrayList to ensure high perf offset based index operations
    private final Queue processors = new LinkedList<>();
    private final Queue registered = new LinkedList<>();
    private final Queue unQueue = new LinkedList<>();
    private final Queue messages = new LinkedList<>();

    private long processTimeout;
    private String persistorAddress;
    private String collection;

    /**

    • Start the busmod
      */
      public void start() {
      super.start();

      System.out.println("My Version...");

      String address = getMandatoryStringConfig("address");
      processTimeout = super.getOptionalLongConfig("process_timeout", 5 * 60 * 1000);
      persistorAddress = super.getOptionalStringConfig("persistor_address", null);
      collection = super.getOptionalStringConfig("collection", null);

      if (persistorAddress != null) {
      loadMessages();
      }

      Handler<Message> registerHandler = new Handler<Message>() {
      public void handle(Message message) {
      doRegister(message);
      }
      };
      eb.registerHandler(address + ".register", registerHandler);
      Handler<Message> unregisterHandler = new Handler<Message>() {
      public void handle(Message message) {
      doUnregister(message);
      }
      };
      eb.registerHandler(address + ".unregister", unregisterHandler);
      Handler<Message> sendHandler = new Handler<Message>() {
      public void handle(Message message) {
      doSend(message);
      }
      };
      eb.registerHandler(address, sendHandler);
      }

    // Load all the message into memory
    // TODO - we could limit the amount we load at startup
    private void loadMessages() {
    JsonObject msg = new JsonObject().putString("action", "find").putString("collection", collection)
    .putObject("matcher", new JsonObject());
    eb.send(persistorAddress, msg, createLoadReplyHandler());
    }

    private void processLoadBatch(JsonArray toLoad) {
    for (Object obj: toLoad) {
    if (obj instanceof JsonObject) {
    messages.add(new LoadedHolder((JsonObject)obj));
    }
    }
    checkWork();
    }

    private interface MessageHolder {
    JsonObject getBody();
    void reply(JsonObject reply, Handler<Message> replyReplyHandler);
    }

    private Handler<Message> createLoadReplyHandler() {
    return new Handler<Message>() {
    public void handle(Message reply) {
    processLoadBatch(reply.body.getArray("results"));
    if (reply.body.getString("status").equals("more-exist")) {
    // Get next batch
    reply.reply(null, createLoadReplyHandler());
    }
    }
    };
    }

    private void checkWork() {
    if (!messages.isEmpty() && !processors.isEmpty()) {
    final MessageHolder message = messages.poll();
    final String address = processors.poll();
    final long timeoutID = vertx.setTimer(processTimeout, new Handler() {
    public void handle(Long id) {
    // Processor timed out - put message back on queue
    logger.warn("Processor timed out, message will be put back on queue");
    messages.add(message);
    }
    });
    eb.send(address, message.getBody(), new Handler<Message>() {
    public void handle(Message reply) {
    messageReplied(message, reply, address, timeoutID);
    }
    });
    }
    }

    // A reply has been received from the processor
    private void messageReplied(final MessageHolder message, final Message reply,
    final String processorAddress,
    final long timeoutID) {
    if (reply.replyAddress != null) {
    // The reply itself has a reply specified so we don't consider the message processed just yet
    message.reply(reply.body, new Handler<Message>() {
    public void handle(final Message replyReply) {
    reply.reply(replyReply.body, new Handler<Message>() {
    public void handle(Message replyReplyReply) {
    messageReplied(new NonLoadedHolder(replyReply), replyReplyReply, processorAddress, timeoutID);
    }
    });
    }
    });
    } else {
    if (persistorAddress != null) {
    JsonObject msg = new JsonObject().putString("action", "delete").putString("collection", collection)
    .putObject("matcher", message.getBody());
    eb.send(persistorAddress, msg, new Handler<Message>() {
    public void handle(Message replyReply) {
    if (!replyReply.body.getString("status").equals("ok")) {
    logger.error("Failed to delete document from queue: " + replyReply.body.getString("message"));
    }
    messageProcessed(timeoutID, processorAddress, message, reply);
    }
    });
    } else {
    messageProcessed(timeoutID, processorAddress, message, reply);
    }
    }
    }

    // The conversation between the sender and the processor has ended, so we can add the processor back on the queue
    private void messageProcessed(long timeoutID, String processorAddress, MessageHolder message,
    Message reply) {
    // The processor
    // can go back on the queue
    vertx.cancelTimer(timeoutID);

    // If no request to unregister, put processor back.
    if(!unQueue.remove(processorAddress))
    processors.add(processorAddress);
    else
    registered.remove(processorAddress);

    message.reply(reply.body, null);
    checkWork();
    }

    private void doRegister(Message message) {
    String processor = getMandatoryString("processor", message);
    if (processor == null) {
    return;
    }

    processors.add(processor);
    registered.add(processor);
    checkWork();
    sendOK(message);
    }

    private void doUnregister(Message message) {
    String processor = getMandatoryString("processor", message);
    if (processor == null) {
    return;
    }

    JsonObject reply = new JsonObject();

    // Don't process if the worker is not in the registered list.
    // Either because the processor name sent was wrong
    // or the processor was already removed.
    if(registered.contains(processor))
    {
    // Sometimes we are lucky and can remove straight away.
    // Other wise queue the unregister request.
    if(processors.remove(processor))
    {
    registered.remove(processor);

          reply.putString("message", "removed");
          sendOK(message, reply);
      }
      else
      {
          unQueue.add(processor);
    
          reply.putString("message", "queued");
          sendOK(message, reply);
      }
    

    }
    else
    {
    reply.putString("message", "not_registered");
    sendOK(message, reply);
    }
    }

    private void doSend(final Message message) {
    if (persistorAddress != null) {
    JsonObject msg = new JsonObject().putString("action", "save").putString("collection", collection)
    .putObject("document", message.body);
    eb.send(persistorAddress, msg, new Handler<Message>() {
    public void handle(Message reply) {
    if (reply.body.getString("status").equals("ok")) {
    actualSend(message);
    } else {
    sendAcceptedReply(message.body, "error", reply.body.getString("message"));
    sendError(message, reply.body.getString("message"));
    }
    }
    });
    } else {
    actualSend(message);
    }
    }

    private void sendAcceptedReply(JsonObject body, String status, String message) {
    String acceptedReply = body.getString("accepted-reply");
    if (acceptedReply != null) {
    JsonObject repl = new JsonObject().putString("status", status);
    if (message != null) {
    repl.putString("message", message);
    }
    eb.send(acceptedReply, repl);
    }
    }

    private void actualSend(Message message) {
    messages.add(new NonLoadedHolder(message));
    //Been added to the queue so reply if appropriate
    sendAcceptedReply(message.body, "accepted", null);
    checkWork();
    }

    private static class LoadedHolder implements MessageHolder {

    private final JsonObject body;

    private LoadedHolder(JsonObject body) {
    this.body = body;
    }

    public JsonObject getBody() {
    return body;
    }

    public void reply(JsonObject reply, Handler<Message> replyReplyHandler) {
    //Do nothing - we are loaded from storage so the sender has long gone
    }
    }

    private static class NonLoadedHolder implements MessageHolder {

    private final Message message;

    private NonLoadedHolder(Message message) {
    this.message = message;
    }

    public JsonObject getBody() {
    return message.body;
    }

    public void reply(JsonObject reply, Handler<Message> replyReplyHandler) {
    message.reply(reply, replyReplyHandler);
    }
    }

}

@purplefox
Copy link
Member

Can you submit a PR? It's very hard to see your changes when they are just pasted in a comment.

@javadevmtl
Copy link
Author

Ok let me try :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants