diff --git a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs index 339d24c1b..fdd46c766 100644 --- a/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs +++ b/src/KafkaFlow.UnitTests/PartitionOffsetsTests.cs @@ -4,6 +4,7 @@ namespace KafkaFlow.UnitTests using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; + using System.Threading; using System.Threading.Tasks; using FluentAssertions; using KafkaFlow.Consumers; @@ -150,29 +151,42 @@ public void ShouldUpdateOffset_WithManyGaps_ShouldUpdate() public void ShouldUpdateOffset_WithManyConcurrentOffsets_ShouldUpdate() { // Arrange - const int count = 1000000; + const int count = 1_000; var target = new PartitionOffsets(); - var offsets = new List(count); - var offsetsCommitted = new ConcurrentQueue(); + var offsetsCommitted = new ConcurrentBag(); + + var waitHandle = new ManualResetEvent(false); for (var i = 0; i < count; i += 2) { target.Enqueue(i); - offsets.Add(i); } // Act - Parallel.ForEach(offsets, offset => + var tasks = new List(); + for (var i = 0; i < count; i += 2) { - if (target.ShouldCommit(offset, out var lastProcessedOffset)) - { - offsetsCommitted.Enqueue(lastProcessedOffset); - } - }); + var offset = i; + tasks.Add( + Task.Run( + () => + { + waitHandle.WaitOne(); + + if (target.ShouldCommit(offset, out var lastProcessedOffset)) + { + offsetsCommitted.Add(lastProcessedOffset); + } + })); + } + + waitHandle.Set(); + + Task.WaitAll(tasks.ToArray()); // Assert - Assert.AreEqual(999998, offsetsCommitted.Last()); + Assert.AreEqual(count - 2, offsetsCommitted.Max()); } } }