From 3fa6be3bc86171bd703fe9b16510f8b3867466ca Mon Sep 17 00:00:00 2001 From: reugn Date: Sun, 18 Aug 2024 11:20:36 +0300 Subject: [PATCH] add missing coverage --- flow/batch_test.go | 1 + flow/reduce_test.go | 20 ++++++++++++-------- flow/session_window_test.go | 1 + flow/sliding_window_test.go | 1 + flow/throttler.go | 8 ++++---- flow/throttler_test.go | 21 +++++---------------- 6 files changed, 24 insertions(+), 28 deletions(-) diff --git a/flow/batch_test.go b/flow/batch_test.go index 29cebf9..627b592 100644 --- a/flow/batch_test.go +++ b/flow/batch_test.go @@ -65,6 +65,7 @@ func TestBatch_Ptr(t *testing.T) { go func() { source. Via(batch). + Via(flow.NewPassThrough()). // Via coverage To(sink) }() diff --git a/flow/reduce_test.go b/flow/reduce_test.go index 57b9e8a..2e7dab6 100644 --- a/flow/reduce_test.go +++ b/flow/reduce_test.go @@ -43,19 +43,23 @@ func TestReduce(t *testing.T) { if tt.ptr { ingestSlice(ptrSlice(input), in) - } else { - ingestSlice(input, in) - } - close(in) + close(in) - source. - Via(tt.reduceFlow). - To(sink) + source. + Via(tt.reduceFlow). + To(sink) - if tt.ptr { output := readSlicePtr[int](out) assert.Equal(t, ptrSlice(expected), output) } else { + ingestSlice(input, in) + close(in) + + source. + Via(tt.reduceFlow). + Via(flow.NewPassThrough()). // Via coverage + To(sink) + output := readSlice[int](out) assert.Equal(t, expected, output) } diff --git a/flow/session_window_test.go b/flow/session_window_test.go index 9783706..37dfb9f 100644 --- a/flow/session_window_test.go +++ b/flow/session_window_test.go @@ -91,6 +91,7 @@ func TestSessionWindow_Ptr(t *testing.T) { go func() { source. Via(sessionWindow). + Via(flow.NewPassThrough()). // Via coverage To(sink) }() diff --git a/flow/sliding_window_test.go b/flow/sliding_window_test.go index 9ea416d..ce34596 100644 --- a/flow/sliding_window_test.go +++ b/flow/sliding_window_test.go @@ -146,6 +146,7 @@ func TestSlidingWindow_WithExtractorPtr(t *testing.T) { go func() { source. Via(slidingWindow). + Via(flow.NewPassThrough()). // Via coverage To(sink) }() diff --git a/flow/throttler.go b/flow/throttler.go index 29770f1..44b63cf 100644 --- a/flow/throttler.go +++ b/flow/throttler.go @@ -59,7 +59,7 @@ func NewThrottler(elements int, period time.Duration, bufferSize int, mode Throt done: make(chan struct{}), } go throttler.resetQuotaCounterLoop() - go throttler.bufferize() + go throttler.buffer() return throttler } @@ -95,9 +95,9 @@ func (th *Throttler) notifyQuotaReset() { } } -// bufferize starts buffering incoming elements. -// If an unsupported ThrottleMode was specified, bufferize will panic. -func (th *Throttler) bufferize() { +// buffer starts buffering incoming elements. +// If an unsupported ThrottleMode was specified, buffer will panic. +func (th *Throttler) buffer() { switch th.mode { case Discard: for element := range th.in { diff --git a/flow/throttler_test.go b/flow/throttler_test.go index 9714a64..bb5d7f1 100644 --- a/flow/throttler_test.go +++ b/flow/throttler_test.go @@ -1,7 +1,6 @@ package flow_test import ( - "fmt" "testing" "time" @@ -10,7 +9,7 @@ import ( "github.com/reugn/go-streams/internal/assert" ) -func TestThrottlerWithBackpressure(t *testing.T) { +func TestThrottler_WithBackpressure(t *testing.T) { in := make(chan any) out := make(chan any) @@ -30,27 +29,22 @@ func TestThrottlerWithBackpressure(t *testing.T) { outputValues := readValues(interval/2, out) assert.Equal(t, []any{"a", "b"}, outputValues) - fmt.Println(outputValues) outputValues = readValues(interval, out) - fmt.Println(outputValues) assert.Equal(t, []any{"c", "d"}, outputValues) outputValues = readValues(interval, out) - fmt.Println(outputValues) assert.Equal(t, []any{"e", "f"}, outputValues) outputValues = readValues(interval, out) - fmt.Println(outputValues) assert.Equal(t, []any{"g"}, outputValues) outputValues = readValues(interval, out) - fmt.Println(outputValues) var empty []any assert.Equal(t, empty, outputValues) } -func TestThrottlerWithDiscard(t *testing.T) { +func TestThrottler_WithDiscard(t *testing.T) { in := make(chan any, 7) out := make(chan any, 7) @@ -69,18 +63,15 @@ func TestThrottlerWithDiscard(t *testing.T) { outputValues := readValues(interval/2, out) assert.Equal(t, []any{"a", "b"}, outputValues) - fmt.Println(outputValues) - outputValues = readValues(interval, out) - fmt.Println(outputValues) + _ = readValues(interval, out) outputValues = readValues(interval, out) - fmt.Println(outputValues) var empty []any assert.Equal(t, empty, outputValues) } -func TestThrottlerNonPositiveElements(t *testing.T) { +func TestThrottler_NonPositiveElements(t *testing.T) { assert.Panics(t, func() { flow.NewThrottler(0, time.Second, 1, flow.Discard) }) @@ -89,7 +80,7 @@ func TestThrottlerNonPositiveElements(t *testing.T) { }) } -func TestThrottlerNonPositiveBufferSize(t *testing.T) { +func TestThrottler_NonPositiveBufferSize(t *testing.T) { assert.Panics(t, func() { flow.NewThrottler(1, time.Second, 0, flow.Backpressure) }) @@ -102,7 +93,6 @@ func writeValues(in chan any) { inputValues := []string{"a", "b", "c", "d", "e", "f", "g"} ingestSlice(inputValues, in) close(in) - fmt.Println("Closed input channel") } func readValues(timeout time.Duration, out <-chan any) []any { @@ -114,7 +104,6 @@ func readValues(timeout time.Duration, out <-chan any) []any { if e != nil { outputValues = append(outputValues, e) } else { - fmt.Println("Got nil in output") timer.Stop() return outputValues }