Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move blocking send operation to executor thread #251

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Arooba-git
Copy link

Hi! :)

Thank you for this project..

This PR fixes a blocking call in the send method of KafkaWriteStreamImpl class, which was detected with the help of BlockHound:
vkc1-blocking

We re-ran the test cases and also evaluated performance (in terms of sleep time latency of the SendThread) before and after the fix:

Before
vks1-latency-after

After
vks1-latency-before

@Ladicek
Copy link

Ladicek commented Jul 26, 2023

The producer.send() operation is already called on a worker thread (ctx.executeBlocking()), so this helps nothing AFAICT.

@Arooba-git
Copy link
Author

Arooba-git commented Jul 26, 2023

@Ladicek Right, you know I was wondering the same.. :)
Then I came across similar issue by another user (of executeBlocking still throwing blocking errors: eclipse-vertx/vert.x#2798

The Vertx developers then clarified its use in their documentation:

WARNING: Blocking code should block for a reasonable amount of time (i.e no more than a few seconds). Long blocking operations or polling operations (i.e a thread that spin in a loop polling events in a blocking fashion) are precluded. When the blocking operation lasts more than the 10 seconds, a message will be printed on the console by the blocked thread checker. Long blocking operations should use a dedicated thread managed by the application...

https://vertx.io/docs/apidocs/io/vertx/rxjava/core/Context.html

@vietj
Copy link
Contributor

vietj commented Jul 26, 2023

I believe the thing that matters here is : is the blocking operation for such time expected or not ?

@vietj
Copy link
Contributor

vietj commented Jul 26, 2023

so I think we should use this specific executor, however it should not be nested inside executeBlocking and use directly instead

@Arooba-git
Copy link
Author

@vietj Should I try updating?

@vietj
Copy link
Contributor

vietj commented Jul 27, 2023 via email

@Arooba-git
Copy link
Author

Arooba-git commented Mar 17, 2024

Apologies for the delayed response..

If I replace executeBlocking() with executor.execute(..), the return type will become void right? so we would have to update the usage of send method call everywhere.. no? 🤔

or should we return promise like this:

  @Override
  public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    ContextInternal ctx = vertx.getOrCreateContext();
    ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record);
    int len = this.len(record.value());
    this.pending += len;

      Promise<RecordMetadata> prom = ctx.promise();
      try {
        executor.execute(() -> {
          this.producer.send(record, (metadata, err) -> {
            // callback from Kafka IO thread
            ctx.runOnContext(v1 -> {
              synchronized (KafkaWriteStreamImpl.this) {

                // if exception happens, no record written
                if (err != null) {

                  if (this.exceptionHandler != null) {
                    Handler<Throwable> exceptionHandler = this.exceptionHandler;
                    ctx.runOnContext(v2 -> exceptionHandler.handle(err));
                  }
                }

                long lowWaterMark = this.maxSize / 2;
                this.pending -= len;
                if (this.pending < lowWaterMark && this.drainHandler != null) {
                  Handler<Void> drainHandler = this.drainHandler;
                  this.drainHandler = null;
                  ctx.runOnContext(drainHandler);
                }
              }
            });

            if (err != null) {
              if (startedSpan != null) {
                startedSpan.fail(ctx, err);
              }
              prom.fail(err);
            } else {
              if (startedSpan != null) {
                startedSpan.finish(ctx);
              }
              prom.complete(metadata);
            }
          });
        });
      } catch (Throwable e) {
        synchronized (KafkaWriteStreamImpl.this) {
          if (this.exceptionHandler != null) {
            Handler<Throwable> exceptionHandler = this.exceptionHandler;
            ctx.runOnContext(v3 -> exceptionHandler.handle(e));
          }
        }
        if (startedSpan != null) {
          startedSpan.fail(ctx, e);
        }
        prom.fail(e);
      }
      
    return prom.future();
  }

@vietj
Copy link
Contributor

vietj commented Mar 18, 2024

it is not clear what is happening since we are already in an execute blocking block, what is the actual issue we are trying to fix ? does it mean we should avoid execute blocking with kafka ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

4 participants