From 55843102a4bb47103df9fe363119fc84a3c6ff48 Mon Sep 17 00:00:00 2001
From: reugn <reugpro@gmail.com>
Date: Mon, 29 Jul 2024 20:25:29 +0300
Subject: [PATCH] fix(flow): remove priority queue from sliding window

---
 flow/queue.go               | 69 -----------------------------
 flow/queue_test.go          | 56 ------------------------
 flow/sliding_window.go      | 87 +++++++++++++++++++++----------------
 flow/sliding_window_test.go | 21 +++++----
 4 files changed, 63 insertions(+), 170 deletions(-)
 delete mode 100644 flow/queue.go
 delete mode 100644 flow/queue_test.go

diff --git a/flow/queue.go b/flow/queue.go
deleted file mode 100644
index 9953d49..0000000
--- a/flow/queue.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package flow
-
-import "container/heap"
-
-// Item represents a PriorityQueue item.
-type Item struct {
-	Msg   any
-	epoch int64 // item priority, backed by the epoch time.
-	index int   // maintained by the heap.Interface methods.
-}
-
-// NewItem returns a new Item.
-func NewItem(msg any, epoch int64, index int) *Item {
-	return &Item{msg, epoch, index}
-}
-
-// PriorityQueue implements heap.Interface.
-type PriorityQueue []*Item
-
-// Len returns the PriorityQueue length.
-func (pq PriorityQueue) Len() int { return len(pq) }
-
-// Less is the items less comparator.
-func (pq PriorityQueue) Less(i, j int) bool {
-	return pq[i].epoch < pq[j].epoch
-}
-
-// Swap exchanges indexes of the items.
-func (pq PriorityQueue) Swap(i, j int) {
-	pq[i], pq[j] = pq[j], pq[i]
-	pq[i].index = i
-	pq[j].index = j
-}
-
-// Push implements heap.Interface.Push.
-// Appends an item to the PriorityQueue.
-func (pq *PriorityQueue) Push(x any) {
-	n := len(*pq)
-	item := x.(*Item)
-	item.index = n
-	*pq = append(*pq, item)
-}
-
-// Pop implements heap.Interface.Pop.
-// Removes and returns the Len() - 1 element.
-func (pq *PriorityQueue) Pop() any {
-	old := *pq
-	n := len(old)
-	item := old[n-1]
-	item.index = -1 // for safety
-	*pq = old[0 : n-1]
-	return item
-}
-
-// Head returns the first item of the PriorityQueue without removing it.
-func (pq *PriorityQueue) Head() *Item {
-	return (*pq)[0]
-}
-
-// Update sets item priority and calls heap.Fix to re-establish the heap ordering.
-func (pq *PriorityQueue) Update(item *Item, newEpoch int64) {
-	item.epoch = newEpoch
-	heap.Fix(pq, item.index)
-}
-
-// Slice returns a sliced PriorityQueue using the given bounds.
-func (pq PriorityQueue) Slice(start, end int) PriorityQueue {
-	return pq[start:end]
-}
diff --git a/flow/queue_test.go b/flow/queue_test.go
deleted file mode 100644
index a7806cf..0000000
--- a/flow/queue_test.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package flow_test
-
-import (
-	"container/heap"
-	"testing"
-	"time"
-
-	"github.com/reugn/go-streams/flow"
-	"github.com/reugn/go-streams/internal/assert"
-)
-
-func TestQueueOps(t *testing.T) {
-	queue := &flow.PriorityQueue{}
-	heap.Push(queue, flow.NewItem(1, time.Now().UnixNano(), 0))
-	heap.Push(queue, flow.NewItem(2, 1234, 0))
-	heap.Push(queue, flow.NewItem(3, time.Now().UnixNano(), 0))
-	queue.Swap(0, 1)
-	head := queue.Head()
-	queue.Update(head, time.Now().UnixNano())
-	first := heap.Pop(queue).(*flow.Item)
-
-	assert.Equal(t, 2, first.Msg.(int))
-}
-
-func TestQueueOrder(t *testing.T) {
-	queue := &flow.PriorityQueue{}
-
-	pushItem(queue, 5)
-	pushItem(queue, 4)
-	pushItem(queue, 6)
-	pushItem(queue, 3)
-	pushItem(queue, 7)
-	pushItem(queue, 2)
-	pushItem(queue, 8)
-	pushItem(queue, 1)
-	pushItem(queue, 9)
-
-	assert.Equal(t, 1, popMsg(queue))
-	assert.Equal(t, 2, popMsg(queue))
-	assert.Equal(t, 3, popMsg(queue))
-	assert.Equal(t, 4, popMsg(queue))
-	assert.Equal(t, 5, popMsg(queue))
-	assert.Equal(t, 6, popMsg(queue))
-	assert.Equal(t, 7, popMsg(queue))
-	assert.Equal(t, 8, popMsg(queue))
-	assert.Equal(t, 9, popMsg(queue))
-}
-
-func pushItem(queue *flow.PriorityQueue, timestamp int64) {
-	item := flow.NewItem(timestamp, timestamp, 0)
-	heap.Push(queue, item)
-}
-
-func popMsg(queue *flow.PriorityQueue) int64 {
-	return (heap.Pop(queue).(*flow.Item)).Msg.(int64)
-}
diff --git a/flow/sliding_window.go b/flow/sliding_window.go
index 81653bc..54db10c 100644
--- a/flow/sliding_window.go
+++ b/flow/sliding_window.go
@@ -1,13 +1,19 @@
 package flow
 
 import (
-	"container/heap"
+	"sort"
 	"sync"
 	"time"
 
 	"github.com/reugn/go-streams"
 )
 
+// timedElement stores an incoming element along with its timestamp.
+type timedElement[T any] struct {
+	element   T
+	timestamp int64
+}
+
 // SlidingWindow assigns elements to windows of fixed length configured by the window
 // size parameter.
 // An additional window slide parameter controls how frequently a sliding window is started.
@@ -18,7 +24,7 @@ type SlidingWindow[T any] struct {
 	sync.Mutex
 	windowSize         time.Duration
 	slidingInterval    time.Duration
-	queue              *PriorityQueue
+	queue              []timedElement[T]
 	in                 chan any
 	out                chan any
 	done               chan struct{}
@@ -65,7 +71,7 @@ func NewSlidingWindowWithExtractor[T any](
 	slidingWindow := &SlidingWindow[T]{
 		windowSize:         windowSize,
 		slidingInterval:    slidingInterval,
-		queue:              &PriorityQueue{},
+		queue:              make([]timedElement[T], 0),
 		in:                 make(chan any),
 		out:                make(chan any),
 		done:               make(chan struct{}),
@@ -108,7 +114,7 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
 }
 
 // timestamp extracts the timestamp from an element if the timestampExtractor is set.
-// It returns system clock time otherwise.
+// Otherwise, the system time is returned.
 func (sw *SlidingWindow[T]) timestamp(element T) int64 {
 	if sw.timestampExtractor == nil {
 		return time.Now().UnixNano()
@@ -116,13 +122,16 @@ func (sw *SlidingWindow[T]) timestamp(element T) int64 {
 	return sw.timestampExtractor(element)
 }
 
-// receive buffers the incoming elements by pushing them into a priority queue,
-// ordered by their creation time.
+// receive buffers the incoming elements by pushing them into the queue,
+// wrapping the original item into a timedElement along with its timestamp.
 func (sw *SlidingWindow[T]) receive() {
 	for element := range sw.in {
-		item := &Item{element, sw.timestamp(element.(T)), 0}
 		sw.Lock()
-		heap.Push(sw.queue, item)
+		timed := timedElement[T]{
+			element:   element.(T),
+			timestamp: sw.timestamp(element.(T)),
+		}
+		sw.queue = append(sw.queue, timed)
 		sw.Unlock()
 	}
 	close(sw.done)
@@ -150,47 +159,51 @@ func (sw *SlidingWindow[T]) emit() {
 	}
 }
 
-// dispatchWindow creates a new window and slides the elements queue.
-// It sends the slice of elements to the output channel if the window is not empty.
+// dispatchWindow is responsible for sending the elements in the current
+// window to the output channel and moving the window to the next position.
 func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
 	sw.Lock()
-	// build a window of elements
-	var windowBottomIndex int
-	now := tick.UnixNano()
-	windowUpperIndex := sw.queue.Len()
-	slideUpperIndex := windowUpperIndex
-	slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds()
-	windowBottomTime := now - sw.windowSize.Nanoseconds()
-	for i, item := range *sw.queue {
-		if item.epoch < windowBottomTime {
-			windowBottomIndex = i
-		}
-		if item.epoch > slideUpperTime {
-			slideUpperIndex = i
+
+	// sort elements in the queue by their timestamp
+	sort.Slice(sw.queue, func(i, j int) bool {
+		return sw.queue[i].timestamp < sw.queue[j].timestamp
+	})
+
+	// calculate the next window start time
+	nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval).UnixNano()
+	// initialize the next window queue
+	var nextWindowQueue []timedElement[T]
+	for i, element := range sw.queue {
+		if element.timestamp > nextWindowStartTime {
+			nextWindowQueue = make([]timedElement[T], len(sw.queue)-i)
+			_ = copy(nextWindowQueue, sw.queue[i:])
 			break
 		}
 	}
-	windowElements := extractWindowElements[T](sw.queue.Slice(windowBottomIndex, windowUpperIndex))
-	if windowUpperIndex > 0 { // the queue is not empty
-		// slice the queue using the lower and upper bounds
-		sliced := sw.queue.Slice(slideUpperIndex, windowUpperIndex)
-		// reset the queue
-		sw.queue = &sliced
-		heap.Init(sw.queue)
-	}
+
+	// extract current window elements
+	windowElements := extractWindowElements(sw.queue, tick.UnixNano())
+	// move the window
+	sw.queue = nextWindowQueue
+
 	sw.Unlock()
 
-	// send elements if the window is not empty
+	// send elements downstream if the current window is not empty
 	if len(windowElements) > 0 {
 		sw.out <- windowElements
 	}
 }
 
-// extractWindowElements generates a window of elements from a given slice of queue items.
-func extractWindowElements[T any](items []*Item) []T {
-	elements := make([]T, len(items))
-	for i, item := range items {
-		elements[i] = item.Msg.(T)
+// extractWindowElements extracts current window elements from the given slice
+// of timedElement. Elements newer than now will not be included.
+func extractWindowElements[T any](timed []timedElement[T], now int64) []T {
+	elements := make([]T, 0, len(timed))
+	for _, timedElement := range timed {
+		if timedElement.timestamp < now {
+			elements = append(elements, timedElement.element)
+		} else {
+			break // we can break since the input is an ordered slice
+		}
 	}
 	return elements
 }
diff --git a/flow/sliding_window_test.go b/flow/sliding_window_test.go
index ba7f276..35c5a64 100644
--- a/flow/sliding_window_test.go
+++ b/flow/sliding_window_test.go
@@ -70,16 +70,21 @@ func TestSlidingWindowWithExtractor(t *testing.T) {
 
 	now := time.Now()
 	inputValues := []element{
+		{"c", now.Add(29 * time.Millisecond).UnixNano()},
 		{"a", now.Add(2 * time.Millisecond).UnixNano()},
 		{"b", now.Add(17 * time.Millisecond).UnixNano()},
-		{"c", now.Add(29 * time.Millisecond).UnixNano()},
 		{"d", now.Add(35 * time.Millisecond).UnixNano()},
-		{"e", now.Add(77 * time.Millisecond).UnixNano()},
 		{"f", now.Add(93 * time.Millisecond).UnixNano()},
+		{"e", now.Add(77 * time.Millisecond).UnixNano()},
 		{"g", now.Add(120 * time.Millisecond).UnixNano()},
 	}
 	go ingestSlice(inputValues, in)
 	go closeDeferred(in, 250*time.Millisecond)
+	// send some out-of-order events
+	go ingestDeferred(element{"h", now.Add(5 * time.Millisecond).UnixNano()},
+		in, 145*time.Millisecond)
+	go ingestDeferred(element{"i", now.Add(3 * time.Millisecond).UnixNano()},
+		in, 145*time.Millisecond)
 
 	go func() {
 		source.
@@ -93,14 +98,14 @@ func TestSlidingWindowWithExtractor(t *testing.T) {
 	}
 	fmt.Println(outputValues)
 
-	assert.Equal(t, 6, len(outputValues)) // [[a b c d e f g] [c d e f g] [e f g] [e f g] [f g] [g]]
+	assert.Equal(t, 6, len(outputValues)) // [[a b c d] [c d] [e] [e f] [f g] [i h g]]
 
-	assert.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g"}, outputValues[0])
-	assert.Equal(t, []string{"c", "d", "e", "f", "g"}, outputValues[1])
-	assert.Equal(t, []string{"e", "f", "g"}, outputValues[2])
-	assert.Equal(t, []string{"e", "f", "g"}, outputValues[3])
+	assert.Equal(t, []string{"a", "b", "c", "d"}, outputValues[0])
+	assert.Equal(t, []string{"c", "d"}, outputValues[1])
+	assert.Equal(t, []string{"e"}, outputValues[2])
+	assert.Equal(t, []string{"e", "f"}, outputValues[3])
 	assert.Equal(t, []string{"f", "g"}, outputValues[4])
-	assert.Equal(t, []string{"g"}, outputValues[5])
+	assert.Equal(t, []string{"i", "h", "g"}, outputValues[5])
 }
 
 func stringValues(elements []element) []string {