diff --git a/flow/queue.go b/flow/queue.go deleted file mode 100644 index 9953d49..0000000 --- a/flow/queue.go +++ /dev/null @@ -1,69 +0,0 @@ -package flow - -import "container/heap" - -// Item represents a PriorityQueue item. -type Item struct { - Msg any - epoch int64 // item priority, backed by the epoch time. - index int // maintained by the heap.Interface methods. -} - -// NewItem returns a new Item. -func NewItem(msg any, epoch int64, index int) *Item { - return &Item{msg, epoch, index} -} - -// PriorityQueue implements heap.Interface. -type PriorityQueue []*Item - -// Len returns the PriorityQueue length. -func (pq PriorityQueue) Len() int { return len(pq) } - -// Less is the items less comparator. -func (pq PriorityQueue) Less(i, j int) bool { - return pq[i].epoch < pq[j].epoch -} - -// Swap exchanges indexes of the items. -func (pq PriorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j -} - -// Push implements heap.Interface.Push. -// Appends an item to the PriorityQueue. -func (pq *PriorityQueue) Push(x any) { - n := len(*pq) - item := x.(*Item) - item.index = n - *pq = append(*pq, item) -} - -// Pop implements heap.Interface.Pop. -// Removes and returns the Len() - 1 element. -func (pq *PriorityQueue) Pop() any { - old := *pq - n := len(old) - item := old[n-1] - item.index = -1 // for safety - *pq = old[0 : n-1] - return item -} - -// Head returns the first item of the PriorityQueue without removing it. -func (pq *PriorityQueue) Head() *Item { - return (*pq)[0] -} - -// Update sets item priority and calls heap.Fix to re-establish the heap ordering. -func (pq *PriorityQueue) Update(item *Item, newEpoch int64) { - item.epoch = newEpoch - heap.Fix(pq, item.index) -} - -// Slice returns a sliced PriorityQueue using the given bounds. -func (pq PriorityQueue) Slice(start, end int) PriorityQueue { - return pq[start:end] -} diff --git a/flow/queue_test.go b/flow/queue_test.go deleted file mode 100644 index a7806cf..0000000 --- a/flow/queue_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package flow_test - -import ( - "container/heap" - "testing" - "time" - - "github.com/reugn/go-streams/flow" - "github.com/reugn/go-streams/internal/assert" -) - -func TestQueueOps(t *testing.T) { - queue := &flow.PriorityQueue{} - heap.Push(queue, flow.NewItem(1, time.Now().UnixNano(), 0)) - heap.Push(queue, flow.NewItem(2, 1234, 0)) - heap.Push(queue, flow.NewItem(3, time.Now().UnixNano(), 0)) - queue.Swap(0, 1) - head := queue.Head() - queue.Update(head, time.Now().UnixNano()) - first := heap.Pop(queue).(*flow.Item) - - assert.Equal(t, 2, first.Msg.(int)) -} - -func TestQueueOrder(t *testing.T) { - queue := &flow.PriorityQueue{} - - pushItem(queue, 5) - pushItem(queue, 4) - pushItem(queue, 6) - pushItem(queue, 3) - pushItem(queue, 7) - pushItem(queue, 2) - pushItem(queue, 8) - pushItem(queue, 1) - pushItem(queue, 9) - - assert.Equal(t, 1, popMsg(queue)) - assert.Equal(t, 2, popMsg(queue)) - assert.Equal(t, 3, popMsg(queue)) - assert.Equal(t, 4, popMsg(queue)) - assert.Equal(t, 5, popMsg(queue)) - assert.Equal(t, 6, popMsg(queue)) - assert.Equal(t, 7, popMsg(queue)) - assert.Equal(t, 8, popMsg(queue)) - assert.Equal(t, 9, popMsg(queue)) -} - -func pushItem(queue *flow.PriorityQueue, timestamp int64) { - item := flow.NewItem(timestamp, timestamp, 0) - heap.Push(queue, item) -} - -func popMsg(queue *flow.PriorityQueue) int64 { - return (heap.Pop(queue).(*flow.Item)).Msg.(int64) -} diff --git a/flow/sliding_window.go b/flow/sliding_window.go index 81653bc..54db10c 100644 --- a/flow/sliding_window.go +++ b/flow/sliding_window.go @@ -1,13 +1,19 @@ package flow import ( - "container/heap" + "sort" "sync" "time" "github.com/reugn/go-streams" ) +// timedElement stores an incoming element along with its timestamp. +type timedElement[T any] struct { + element T + timestamp int64 +} + // SlidingWindow assigns elements to windows of fixed length configured by the window // size parameter. // An additional window slide parameter controls how frequently a sliding window is started. @@ -18,7 +24,7 @@ type SlidingWindow[T any] struct { sync.Mutex windowSize time.Duration slidingInterval time.Duration - queue *PriorityQueue + queue []timedElement[T] in chan any out chan any done chan struct{} @@ -65,7 +71,7 @@ func NewSlidingWindowWithExtractor[T any]( slidingWindow := &SlidingWindow[T]{ windowSize: windowSize, slidingInterval: slidingInterval, - queue: &PriorityQueue{}, + queue: make([]timedElement[T], 0), in: make(chan any), out: make(chan any), done: make(chan struct{}), @@ -108,7 +114,7 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) { } // timestamp extracts the timestamp from an element if the timestampExtractor is set. -// It returns system clock time otherwise. +// Otherwise, the system time is returned. func (sw *SlidingWindow[T]) timestamp(element T) int64 { if sw.timestampExtractor == nil { return time.Now().UnixNano() @@ -116,13 +122,16 @@ func (sw *SlidingWindow[T]) timestamp(element T) int64 { return sw.timestampExtractor(element) } -// receive buffers the incoming elements by pushing them into a priority queue, -// ordered by their creation time. +// receive buffers the incoming elements by pushing them into the queue, +// wrapping the original item into a timedElement along with its timestamp. func (sw *SlidingWindow[T]) receive() { for element := range sw.in { - item := &Item{element, sw.timestamp(element.(T)), 0} sw.Lock() - heap.Push(sw.queue, item) + timed := timedElement[T]{ + element: element.(T), + timestamp: sw.timestamp(element.(T)), + } + sw.queue = append(sw.queue, timed) sw.Unlock() } close(sw.done) @@ -150,47 +159,51 @@ func (sw *SlidingWindow[T]) emit() { } } -// dispatchWindow creates a new window and slides the elements queue. -// It sends the slice of elements to the output channel if the window is not empty. +// dispatchWindow is responsible for sending the elements in the current +// window to the output channel and moving the window to the next position. func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) { sw.Lock() - // build a window of elements - var windowBottomIndex int - now := tick.UnixNano() - windowUpperIndex := sw.queue.Len() - slideUpperIndex := windowUpperIndex - slideUpperTime := now - sw.windowSize.Nanoseconds() + sw.slidingInterval.Nanoseconds() - windowBottomTime := now - sw.windowSize.Nanoseconds() - for i, item := range *sw.queue { - if item.epoch < windowBottomTime { - windowBottomIndex = i - } - if item.epoch > slideUpperTime { - slideUpperIndex = i + + // sort elements in the queue by their timestamp + sort.Slice(sw.queue, func(i, j int) bool { + return sw.queue[i].timestamp < sw.queue[j].timestamp + }) + + // calculate the next window start time + nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval).UnixNano() + // initialize the next window queue + var nextWindowQueue []timedElement[T] + for i, element := range sw.queue { + if element.timestamp > nextWindowStartTime { + nextWindowQueue = make([]timedElement[T], len(sw.queue)-i) + _ = copy(nextWindowQueue, sw.queue[i:]) break } } - windowElements := extractWindowElements[T](sw.queue.Slice(windowBottomIndex, windowUpperIndex)) - if windowUpperIndex > 0 { // the queue is not empty - // slice the queue using the lower and upper bounds - sliced := sw.queue.Slice(slideUpperIndex, windowUpperIndex) - // reset the queue - sw.queue = &sliced - heap.Init(sw.queue) - } + + // extract current window elements + windowElements := extractWindowElements(sw.queue, tick.UnixNano()) + // move the window + sw.queue = nextWindowQueue + sw.Unlock() - // send elements if the window is not empty + // send elements downstream if the current window is not empty if len(windowElements) > 0 { sw.out <- windowElements } } -// extractWindowElements generates a window of elements from a given slice of queue items. -func extractWindowElements[T any](items []*Item) []T { - elements := make([]T, len(items)) - for i, item := range items { - elements[i] = item.Msg.(T) +// extractWindowElements extracts current window elements from the given slice +// of timedElement. Elements newer than now will not be included. +func extractWindowElements[T any](timed []timedElement[T], now int64) []T { + elements := make([]T, 0, len(timed)) + for _, timedElement := range timed { + if timedElement.timestamp < now { + elements = append(elements, timedElement.element) + } else { + break // we can break since the input is an ordered slice + } } return elements } diff --git a/flow/sliding_window_test.go b/flow/sliding_window_test.go index ba7f276..35c5a64 100644 --- a/flow/sliding_window_test.go +++ b/flow/sliding_window_test.go @@ -70,16 +70,21 @@ func TestSlidingWindowWithExtractor(t *testing.T) { now := time.Now() inputValues := []element{ + {"c", now.Add(29 * time.Millisecond).UnixNano()}, {"a", now.Add(2 * time.Millisecond).UnixNano()}, {"b", now.Add(17 * time.Millisecond).UnixNano()}, - {"c", now.Add(29 * time.Millisecond).UnixNano()}, {"d", now.Add(35 * time.Millisecond).UnixNano()}, - {"e", now.Add(77 * time.Millisecond).UnixNano()}, {"f", now.Add(93 * time.Millisecond).UnixNano()}, + {"e", now.Add(77 * time.Millisecond).UnixNano()}, {"g", now.Add(120 * time.Millisecond).UnixNano()}, } go ingestSlice(inputValues, in) go closeDeferred(in, 250*time.Millisecond) + // send some out-of-order events + go ingestDeferred(element{"h", now.Add(5 * time.Millisecond).UnixNano()}, + in, 145*time.Millisecond) + go ingestDeferred(element{"i", now.Add(3 * time.Millisecond).UnixNano()}, + in, 145*time.Millisecond) go func() { source. @@ -93,14 +98,14 @@ func TestSlidingWindowWithExtractor(t *testing.T) { } fmt.Println(outputValues) - assert.Equal(t, 6, len(outputValues)) // [[a b c d e f g] [c d e f g] [e f g] [e f g] [f g] [g]] + assert.Equal(t, 6, len(outputValues)) // [[a b c d] [c d] [e] [e f] [f g] [i h g]] - assert.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g"}, outputValues[0]) - assert.Equal(t, []string{"c", "d", "e", "f", "g"}, outputValues[1]) - assert.Equal(t, []string{"e", "f", "g"}, outputValues[2]) - assert.Equal(t, []string{"e", "f", "g"}, outputValues[3]) + assert.Equal(t, []string{"a", "b", "c", "d"}, outputValues[0]) + assert.Equal(t, []string{"c", "d"}, outputValues[1]) + assert.Equal(t, []string{"e"}, outputValues[2]) + assert.Equal(t, []string{"e", "f"}, outputValues[3]) assert.Equal(t, []string{"f", "g"}, outputValues[4]) - assert.Equal(t, []string{"g"}, outputValues[5]) + assert.Equal(t, []string{"i", "h", "g"}, outputValues[5]) } func stringValues(elements []element) []string {