Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: require specifying event time when dropping a message using source transformer #90

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
)

func FilterEventTime(keys []string, d sourcetransformer.Datum) sourcetransformer.Messages {
func FilterEventTime(_ []string, d sourcetransformer.Datum) sourcetransformer.Messages {
janFirst2022 := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
janFirst2023 := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
if d.EventTime().Before(janFirst2022) {
Expand Down
15 changes: 6 additions & 9 deletions pkg/sourcetransformer/messaget.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@ import (
"time"
)

var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
// Watermark are at millisecond granularity, hence we use epoch(0) - 1 to indicate watermark is not available.
// eventTimeForDrop is used to indicate that the message is dropped hence, excluded from watermark calculation
eventTimeForDrop = time.Unix(0, -int64(time.Millisecond))
)
var DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__

// Message is used to wrap the data return by SourceTransformer functions.
// Compared with Message of other UDFs, source transformer Message contains one more field,
Expand Down Expand Up @@ -60,9 +55,11 @@ func (m Message) Tags() []string {
return m.tags
}

// MessageToDrop creates a Message to be dropped
func MessageToDrop() Message {
return Message{eventTime: eventTimeForDrop, value: []byte{}, tags: []string{DROP}}
// MessageToDrop creates a Message to be dropped with eventTime.
// eventTime is required because, even though a message is dropped, it is still considered as being processed,
// hence the watermark should be updated accordingly using the provided event time.
func MessageToDrop(eventTime time.Time) Message {
return Message{eventTime: eventTime, value: []byte{}, tags: []string{DROP}}
}

type Messages []Message
Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcetransformer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestService_sourceTransformFn(t *testing.T) {
{
name: "sourceTransform_fn_forward_msg_drop_msg",
handler: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
return MessagesBuilder().Append(MessageToDrop())
return MessagesBuilder().Append(MessageToDrop(testTime))
}),
args: args{
ctx: context.Background(),
Expand All @@ -93,7 +93,7 @@ func TestService_sourceTransformFn(t *testing.T) {
want: &stpb.SourceTransformResponse{
Results: []*stpb.SourceTransformResponse_Result{
{
EventTime: timestamppb.New(eventTimeForDrop),
EventTime: timestamppb.New(testTime),
Tags: []string{DROP},
Value: []byte{},
},
Expand Down
Loading