Skip to content

Commit

Permalink
[Job](Fix)Improve Event Publishing with Timeout
Browse files Browse the repository at this point in the history
### Summary:
This PR refines the publishEvent method to improve event publishing reliability by introducing a timeout mechanism and enhanced logging. The changes allow for a more responsive system when attempting to publish events to the disruptor, especially in cases where the ring buffer may not have sufficient capacity at the time.

#### Timeout Implementation:

A 1-second timeout (in nanoseconds) is set, after which the event publishing attempt will stop if the required capacity is not available.
The timeout is tracked using System.nanoTime() for precise elapsed time measurement.
#### Remaining Capacity Check:

The method checks if the remainingCapacity() of the ring buffer is greater than 1 (this can be adjusted based on your capacity requirements). If enough capacity is available, the event is published; otherwise, it waits and retries.
  • Loading branch information
CalvinKirs committed Dec 6, 2024
1 parent 6b4b3cb commit cffeac8
Showing 1 changed file with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
Expand All @@ -28,6 +27,7 @@
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* Utility class for creating and managing a Disruptor instance.
Expand Down Expand Up @@ -73,20 +73,37 @@ public void start() {
*/
public boolean publishEvent(Object... args) {
try {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
// Check if the RingBuffer has enough capacity to reserve 10 slots for tasks
// If there is insufficient capacity (less than 10 slots available)
// log a warning and drop the current task
if (!ringBuffer.hasAvailableCapacity(10)) {
LOG.warn("ring buffer has no available capacity,task will be dropped,"
+ "please check the task queue size.");
return false;
// Set the timeout to 1 second, converted to nanoseconds for precision
long timeoutInNanos = TimeUnit.SECONDS.toNanos(1); // Timeout set to 1 second
long startTime = System.nanoTime(); // Record the start time

// Loop until the timeout is reached
while (System.nanoTime() - startTime < timeoutInNanos) {
// Check if there is enough remaining capacity in the ring buffer
// Adjusting to check if the required capacity is available (instead of hardcoding 1)
if (disruptor.getRingBuffer().remainingCapacity() > 1) {
// Publish the event if there is enough capacity
disruptor.getRingBuffer().publishEvent(eventTranslator, args);
return true;
}

// Wait for a short period before retrying
try {
Thread.sleep(10); // Adjust the wait time as needed (maybe increase if not high-frequency)
} catch (InterruptedException e) {
// Log the exception and return false if interrupted
Thread.currentThread().interrupt(); // Restore interrupt status
LOG.warn("Thread interrupted while waiting to publish event", e);
return false;
}
}
ringBuffer.publishEvent(eventTranslator, args);
return true;

// Timeout reached without publishing the event
LOG.warn("Failed to publish event within the specified timeout (1 second)."
+ "Queue may be full. size is {}", disruptor.getRingBuffer().remainingCapacity());
} catch (Exception e) {
LOG.warn("Failed to publish event", e);
// Handle the exception, e.g., retry or alert
// Catching general exceptions to handle unexpected errors
LOG.warn("Failed to publish event due to an unexpected error", e);
}
return false;
}
Expand Down

0 comments on commit cffeac8

Please sign in to comment.