diff --git a/pkg/sourcetransformer/examples/event_time_filter/impl/filter.go b/pkg/sourcetransformer/examples/event_time_filter/impl/filter.go index 5e93c28b..3583c9ee 100644 --- a/pkg/sourcetransformer/examples/event_time_filter/impl/filter.go +++ b/pkg/sourcetransformer/examples/event_time_filter/impl/filter.go @@ -6,7 +6,7 @@ import ( "github.com/numaproj/numaflow-go/pkg/sourcetransformer" ) -func FilterEventTime(keys []string, d sourcetransformer.Datum) sourcetransformer.Messages { +func FilterEventTime(_ []string, d sourcetransformer.Datum) sourcetransformer.Messages { janFirst2022 := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) janFirst2023 := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) if d.EventTime().Before(janFirst2022) { diff --git a/pkg/sourcetransformer/messaget.go b/pkg/sourcetransformer/messaget.go index e7199a83..282b89ae 100644 --- a/pkg/sourcetransformer/messaget.go +++ b/pkg/sourcetransformer/messaget.go @@ -5,12 +5,7 @@ import ( "time" ) -var ( - DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ - // Watermark are at millisecond granularity, hence we use epoch(0) - 1 to indicate watermark is not available. - // eventTimeForDrop is used to indicate that the message is dropped hence, excluded from watermark calculation - eventTimeForDrop = time.Unix(0, -int64(time.Millisecond)) -) +var DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ // Message is used to wrap the data return by SourceTransformer functions. // Compared with Message of other UDFs, source transformer Message contains one more field, @@ -60,9 +55,11 @@ func (m Message) Tags() []string { return m.tags } -// MessageToDrop creates a Message to be dropped -func MessageToDrop() Message { - return Message{eventTime: eventTimeForDrop, value: []byte{}, tags: []string{DROP}} +// MessageToDrop creates a Message to be dropped with eventTime. +// eventTime is required because, even though a message is dropped, it is still considered as being processed, +// hence the watermark should be updated accordingly using the provided event time. +func MessageToDrop(eventTime time.Time) Message { + return Message{eventTime: eventTime, value: []byte{}, tags: []string{DROP}} } type Messages []Message diff --git a/pkg/sourcetransformer/service_test.go b/pkg/sourcetransformer/service_test.go index 5269deb0..d57d937e 100644 --- a/pkg/sourcetransformer/service_test.go +++ b/pkg/sourcetransformer/service_test.go @@ -79,7 +79,7 @@ func TestService_sourceTransformFn(t *testing.T) { { name: "sourceTransform_fn_forward_msg_drop_msg", handler: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages { - return MessagesBuilder().Append(MessageToDrop()) + return MessagesBuilder().Append(MessageToDrop(testTime)) }), args: args{ ctx: context.Background(), @@ -93,7 +93,7 @@ func TestService_sourceTransformFn(t *testing.T) { want: &stpb.SourceTransformResponse{ Results: []*stpb.SourceTransformResponse_Result{ { - EventTime: timestamppb.New(eventTimeForDrop), + EventTime: timestamppb.New(testTime), Tags: []string{DROP}, Value: []byte{}, },