-
Notifications
You must be signed in to change notification settings - Fork 227
Publication Options
The core pattern to publish an event to the ring buffer is:
- Claim a sequence number.
- Configure the event associated with the sequence number.
- Publish the sequence number.
This pattern can be simply applied using the Next
/ Publish
methods:
// (1) Claim the next sequence
var sequence = _ringBuffer.Next();
try
{
// (2) Get and configure the event for the sequence
var data = _ringBuffer[sequence];
data.Id = id;
data.Value = value;
}
finally
{
// (3) Publish the event
_ringBuffer.Publish(sequence);
}
Alternatively, you can use the PublishEvent
method that returns a struct that automatically invokes Publish
on disposal:
using (var scope = _ringBuffer.PublishEvent())
{
var data = scope.Event();
data.Id = id;
data.Value = value;
// The event is published at the end of the scope
}
The ring buffer is a bounded queue. When you publish an event to the Disruptor, you need to decide how your program should behave when the ring buffer is full. The default behavior is to block the producer. The call to Next
is blocking and thus applies backpressure, i.e.: the producer(s) are slowed down whilst the consumer(s) continue to process messages, and no events are lost.
When the ring buffer is full, the call to
Next
will block the producer by spin-waiting using an AggressiveSpinWait. This publication strategy is quite suitable for latency sensitive applications, but it can generate high CPU usage. Consider using your own publication strategy if this CPU usage is an issue in your application.
If backpressure is not the right option for your use case, or if the default spin-waiting is not appropriate for your application, you can implement your own flow control strategy using TryNext
. TryNext
does not block and returns false when the ring buffer is full.
There are multiple valid flow control strategies that can be implemented using TryNext
:
- Dropping data.
- Saving data in the producer state and retrying later.
- Waiting with a timeout.
- Waiting using your own spin-waiter.
- Dumping the data, for example in a log file.
- Throwing exceptions.
- etc.
Here is an example usage of TryNext
:
/// <summary>
/// Claim an available sequence in the ring buffer.
/// When the ring buffer is full, wait for the specified duration and throw on timeout.
/// Use a SpinWait to reduce CPU usage when waiting.
/// </summary>
public static long Next(this RingBuffer ringBuffer, TimeSpan timeout)
{
var stopwatch = Stopwatch.StartNew();
var spinWait = new SpinWait();
do
{
if (ringBuffer.TryNext(out var sequence))
return sequence;
spinWait.SpinOnce();
}
while (stopwatch.Elapsed < timeout);
throw new TimeoutException();
}
You can also use the equivalent TryPublishEvent
method instead of TryNext
.
Claiming and publishing sequence numbers requires coordination between producers and consumers. If you need to publish multiple events, consider using batches. Batches allow you to publish a block of contiguous events and improve performance by reducing method calls and coordination.
Batch publication is available using Next(int)
, PublishEvents(int)
or TryNext(int, out long)
:
public void PublishEventBatchUsingScope(ReadOnlySpan<int> ids, double value)
{
using (var scope = _ringBuffer.PublishEvents(ids.Length))
{
for (var index = 0; index < ids.Length; index++)
{
var sampleEvent = scope.Event(index);
sampleEvent.Id = ids[index];
sampleEvent.Value = value;
}
}
}
public void PublishEventBatchUsingRawApi(ReadOnlySpan<int> ids, double value)
{
var hi = _ringBuffer.Next(ids.Length);
var lo = hi - (ids.Length - 1);
try
{
for (var index = 0; index < ids.Length; index++)
{
var sampleEvent = _ringBuffer[lo + index];
sampleEvent.Id = ids[index];
sampleEvent.Value = value;
}
}
finally
{
_ringBuffer.Publish(lo, hi);
}
}