diff --git a/flow/session_window.go b/flow/session_window.go index 4889f5a..a6994c3 100644 --- a/flow/session_window.go +++ b/flow/session_window.go @@ -11,7 +11,7 @@ import ( // Session windows do not overlap and do not have a fixed start and end time. // T indicates the incoming element type, and the outgoing element type is []T. type SessionWindow[T any] struct { - sync.Mutex + mu sync.Mutex inactivityGap time.Duration in chan any out chan any @@ -74,9 +74,9 @@ func (sw *SessionWindow[T]) transmit(inlet streams.Inlet) { // It resets the inactivity timer on each new element. func (sw *SessionWindow[T]) receive() { for element := range sw.in { - sw.Lock() + sw.mu.Lock() sw.buffer = append(sw.buffer, element.(T)) - sw.Unlock() + sw.mu.Unlock() sw.notifyTimerReset() // signal to reset the inactivity timer } close(sw.done) @@ -121,10 +121,10 @@ func (sw *SessionWindow[T]) emit() { // dispatchWindow creates a window from buffered elements and resets the buffer. // It sends the slice of elements to the output channel if the window is not empty. func (sw *SessionWindow[T]) dispatchWindow() { - sw.Lock() + sw.mu.Lock() windowElements := sw.buffer sw.buffer = nil - sw.Unlock() + sw.mu.Unlock() // send elements if the window is not empty if len(windowElements) > 0 { diff --git a/flow/sliding_window.go b/flow/sliding_window.go index 54db10c..88f10c2 100644 --- a/flow/sliding_window.go +++ b/flow/sliding_window.go @@ -21,7 +21,7 @@ type timedElement[T any] struct { // In this case elements are assigned to multiple windows. // T indicates the incoming element type, and the outgoing element type is []T. type SlidingWindow[T any] struct { - sync.Mutex + mu sync.Mutex windowSize time.Duration slidingInterval time.Duration queue []timedElement[T] @@ -126,13 +126,13 @@ func (sw *SlidingWindow[T]) timestamp(element T) int64 { // wrapping the original item into a timedElement along with its timestamp. func (sw *SlidingWindow[T]) receive() { for element := range sw.in { - sw.Lock() + sw.mu.Lock() timed := timedElement[T]{ element: element.(T), timestamp: sw.timestamp(element.(T)), } sw.queue = append(sw.queue, timed) - sw.Unlock() + sw.mu.Unlock() } close(sw.done) } @@ -162,7 +162,7 @@ func (sw *SlidingWindow[T]) emit() { // dispatchWindow is responsible for sending the elements in the current // window to the output channel and moving the window to the next position. func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) { - sw.Lock() + sw.mu.Lock() // sort elements in the queue by their timestamp sort.Slice(sw.queue, func(i, j int) bool { @@ -186,7 +186,7 @@ func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) { // move the window sw.queue = nextWindowQueue - sw.Unlock() + sw.mu.Unlock() // send elements downstream if the current window is not empty if len(windowElements) > 0 { diff --git a/flow/tumbling_window.go b/flow/tumbling_window.go index d36352c..fc828e6 100644 --- a/flow/tumbling_window.go +++ b/flow/tumbling_window.go @@ -11,7 +11,7 @@ import ( // Tumbling windows have a fixed size and do not overlap. // T indicates the incoming element type, and the outgoing element type is []T. type TumblingWindow[T any] struct { - sync.Mutex + mu sync.Mutex windowSize time.Duration in chan any out chan any @@ -71,9 +71,9 @@ func (tw *TumblingWindow[T]) transmit(inlet streams.Inlet) { // receive buffers the incoming elements. func (tw *TumblingWindow[T]) receive() { for element := range tw.in { - tw.Lock() + tw.mu.Lock() tw.buffer = append(tw.buffer, element.(T)) - tw.Unlock() + tw.mu.Unlock() } close(tw.done) } @@ -99,10 +99,10 @@ func (tw *TumblingWindow[T]) emit() { // dispatchWindow creates a window from buffered elements and resets the buffer. // It sends the slice of elements to the output channel if the window is not empty. func (tw *TumblingWindow[T]) dispatchWindow() { - tw.Lock() + tw.mu.Lock() windowElements := tw.buffer tw.buffer = nil - tw.Unlock() + tw.mu.Unlock() // send elements if the window is not empty if len(windowElements) > 0 {