Skip to content

Commit

Permalink
We should update from origin as old tests are way too inconsistent. F…
Browse files Browse the repository at this point in the history
…ixed another one by hand before merging
  • Loading branch information
csinig committed Jul 10, 2023
1 parent e485741 commit 0b07763
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace KafkaFlow.UnitTests
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using KafkaFlow.Consumers;
Expand Down Expand Up @@ -150,29 +151,42 @@ public void ShouldUpdateOffset_WithManyGaps_ShouldUpdate()
public void ShouldUpdateOffset_WithManyConcurrentOffsets_ShouldUpdate()
{
// Arrange
const int count = 1000000;
const int count = 1_000;

var target = new PartitionOffsets();
var offsets = new List<int>(count);
var offsetsCommitted = new ConcurrentQueue<long>();
var offsetsCommitted = new ConcurrentBag<long>();

var waitHandle = new ManualResetEvent(false);

for (var i = 0; i < count; i += 2)
{
target.Enqueue(i);
offsets.Add(i);
}

// Act
Parallel.ForEach(offsets, offset =>
var tasks = new List<Task>();
for (var i = 0; i < count; i += 2)
{
if (target.ShouldCommit(offset, out var lastProcessedOffset))
{
offsetsCommitted.Enqueue(lastProcessedOffset);
}
});
var offset = i;
tasks.Add(
Task.Run(
() =>
{
waitHandle.WaitOne();
if (target.ShouldCommit(offset, out var lastProcessedOffset))
{
offsetsCommitted.Add(lastProcessedOffset);
}
}));
}

waitHandle.Set();

Task.WaitAll(tasks.ToArray());

// Assert
Assert.AreEqual(999998, offsetsCommitted.Last());
Assert.AreEqual(count - 2, offsetsCommitted.Max());
}
}
}

0 comments on commit 0b07763

Please sign in to comment.