Skip to content

Commit

Permalink
chore: introducing partitions for udsource (#1410)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
Co-authored-by: Keran Yang <[email protected]>
  • Loading branch information
3 people authored Dec 8, 2023
1 parent 263263b commit 08d2689
Show file tree
Hide file tree
Showing 19 changed files with 46 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9 h1:U/9e+ZENDVmWOURe7iXaK2RFJIANYg/HJZGeahErJQI=
github.com/numaproj/numaflow-go v0.5.3-0.20231208052731-3d4d17004cc9/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
5 changes: 5 additions & 0 deletions pkg/sdkclient/source/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,8 @@ func (c *client) AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb
func (c *client) PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error) {
return c.grpcClt.PendingFn(ctx, req)
}

// PartitionsFn returns the number of partitions from the source.
func (c *client) PartitionsFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PartitionsResponse, error) {
return c.grpcClt.PartitionsFn(ctx, req)
}
2 changes: 1 addition & 1 deletion pkg/sdkclient/source/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestReadFn(t *testing.T) {
expectedResp := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"},
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/sdkclient/source/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ type Client interface {
AckFn(ctx context.Context, req *sourcepb.AckRequest) (*sourcepb.AckResponse, error)
// PendingFn returns the number of pending messages from the udsource.
PendingFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PendingResponse, error)
// PartitionsFn returns the list of partitions from the udsource.
PartitionsFn(ctx context.Context, req *emptypb.Empty) (*sourcepb.PartitionsResponse, error)
}
4 changes: 2 additions & 2 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
// if the source is idling, we will publish idle watermark to the source and all the toBuffers
// we will not publish idle watermark if the source is not idling.
// publish idle watermark for the source
df.srcIdleHandler.PublishSourceIdleWatermark(df.reader.Partitions())
df.srcIdleHandler.PublishSourceIdleWatermark(df.reader.Partitions(df.ctx))

// if we have published idle watermark to source, we need to publish idle watermark to all the toBuffers
// it might not get the latest watermark because of publishing delay, but we will get in the subsequent
Expand All @@ -230,7 +230,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
for index := range toVertexBuffers {
// publish idle watermark to all the source partitions owned by this reader.
// it is 1:1 for many (HTTP, tickgen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing.
for _, sp := range df.reader.Partitions() {
for _, sp := range df.reader.Partitions(df.ctx) {
if vertexPublishers, ok := df.toVertexWMPublishers[toVertexName]; ok {
var publisher, ok = vertexPublishers[sp]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SimpleSource) Ack(ctx context.Context, offsets []isb.Offset) []error {
return s.buffer.Ack(ctx, offsets)
}

func (s *SimpleSource) Partitions() []int32 {
func (s *SimpleSource) Partitions(context.Context) []int32 {
return []int32{0}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (mg *memGen) GetName() string {
}

// Partitions returns the partitions for the source.
func (mg *memGen) Partitions() []int32 {
func (mg *memGen) Partitions(context.Context) []int32 {
return []int32{mg.vertexInstance.Replica}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (h *httpSource) GetName() string {
}

// Partitions returns the partitions for the source.
func (h *httpSource) Partitions() []int32 {
func (h *httpSource) Partitions(context.Context) []int32 {
return []int32{h.vertexReplica}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *kafkaSource) GetName() string {
}

// Partitions returns the partitions from which the source is reading.
func (r *kafkaSource) Partitions() []int32 {
func (r *kafkaSource) Partitions(context.Context) []int32 {
for topic, partitions := range r.handler.sess.Claims() {
if topic == r.topic {
return partitions
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (ns *natsSource) GetName() string {
}

// Partitions returns the partitions associated with this source.
func (ns *natsSource) Partitions() []int32 {
func (ns *natsSource) Partitions(context.Context) []int32 {
return []int32{ns.vertexReplica}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/sourcer/sourcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SourceReader interface {
// Partitions returns the partitions of the source. This is used by the forwarder to determine to which partition
// idle watermarks should be published. Partition assignment to a pod is dynamic, so this method may return different
// partitions at different times. (Example - Kafka, every time topic rebalancing happens, the partitions gets updated)
Partitions() []int32
Partitions(ctx context.Context) []int32
}

// Sourcer interface provides an isb.BufferReader abstraction over the underlying data source.
Expand Down
12 changes: 11 additions & 1 deletion pkg/sources/udsource/grpc_udsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,17 @@ func (u *GRPCBasedUDSource) ApplyAckFn(ctx context.Context, offsets []isb.Offset
return err
}

// ApplyPartitionFn returns the partitions associated with the source.
func (u *GRPCBasedUDSource) ApplyPartitionFn(ctx context.Context) ([]int32, error) {
resp, err := u.client.PartitionsFn(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}

return resp.GetResult().GetPartitions(), nil
}

func constructMessageID(r *sourcepb.ReadResponse_Result) string {
// For a user-defined source, the partition ID plus the offset should be able to uniquely identify a message
return r.Offset.GetPartitionId() + "-" + string(r.Offset.GetOffset())
return fmt.Sprintf("%d-%s", r.GetOffset().GetPartitionId(), string(r.GetOffset().GetOffset()))
}
8 changes: 4 additions & 4 deletions pkg/sources/udsource/grpc_udsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) {
expectedResponse := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"},
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand Down Expand Up @@ -238,7 +238,7 @@ func Test_gRPCBasedUDSource_ApplyReadWithMockClient(t *testing.T) {
expectedResponse := &sourcepb.ReadResponse{
Result: &sourcepb.ReadResponse_Result{
Payload: []byte(`test_payload`),
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: "0"},
Offset: &sourcepb.Offset{Offset: []byte(`test_offset`), PartitionId: 0},
EventTime: timestamppb.New(TestEventTime),
Keys: []string{"test_key"},
},
Expand Down Expand Up @@ -272,7 +272,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{
{Offset: []byte("test-offset-1"), PartitionId: "0"}, {Offset: []byte("test-offset-2"), PartitionId: "0"},
{Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0},
},
},
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func Test_gRPCBasedUDSource_ApplyAckWithMockClient(t *testing.T) {
req := &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{
{Offset: []byte("test-offset-1"), PartitionId: "0"}, {Offset: []byte("test-offset-2"), PartitionId: "0"},
{Offset: []byte("test-offset-1"), PartitionId: 0}, {Offset: []byte("test-offset-2"), PartitionId: 0},
},
},
}
Expand Down
25 changes: 7 additions & 18 deletions pkg/sources/udsource/user_defined_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type userDefinedSource struct {
srcPublishWMStores store.WatermarkStore // source watermark publisher stores
lifecycleCtx context.Context // lifecycleCtx context is used to control the lifecycle of this source.
readTimeout time.Duration // read timeout for the source
partitions map[int32]struct{} // partitions of the source
logger *zap.SugaredLogger
}

Expand All @@ -85,7 +84,6 @@ func New(
pipelineName: vertexInstance.Vertex.Spec.PipelineName,
sourceApplier: sourceApplier,
srcPublishWMStores: publishWMStores,
partitions: make(map[int32]struct{}),
logger: logging.NewLogger(), // default logger
}
for _, opt := range opts {
Expand Down Expand Up @@ -122,27 +120,18 @@ func (u *userDefinedSource) GetName() string {
}

// Partitions returns the partitions of the user-defined source
func (u *userDefinedSource) Partitions() []int32 {
partitions := make([]int32, 0, len(u.partitions))
for partition := range u.partitions {
partitions = append(partitions, partition)
func (u *userDefinedSource) Partitions(ctx context.Context) []int32 {
partitions, err := u.sourceApplier.ApplyPartitionFn(ctx)
if err != nil {
u.logger.Errorw("Error getting partitions", zap.Error(err))
return nil
}
return partitions
}

// Read reads the messages from the user-defined source, tracks the partitions from which the messages are read
// tracked partitions are used to determine the partitions to which the watermarks should be published
// Read reads the messages from the user-defined source.
func (u *userDefinedSource) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) {
readMessages, err := u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout)
if err != nil {
return nil, err
}
for _, msg := range readMessages {
if _, ok := u.partitions[msg.ReadOffset.PartitionIdx()]; !ok {
u.partitions[msg.ReadOffset.PartitionIdx()] = struct{}{}
}
}
return readMessages, nil
return u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout)
}

// Ack acknowledges the messages from the user-defined source
Expand Down
15 changes: 2 additions & 13 deletions pkg/sources/udsource/utils/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@ limitations under the License.
package utils

import (
"strconv"

sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1"

"github.com/numaproj/numaflow/pkg/isb"
)

// DefaultPartitionIdx Default partition index
var DefaultPartitionIdx = int32(0)

// simpleSourceOffset is a simple implementation of isb.Offset from the source side.
type simpleSourceOffset struct {
offset string
Expand Down Expand Up @@ -62,17 +57,11 @@ func (s *simpleSourceOffset) NoAck() error {

func ConvertToSourceOffset(offset isb.Offset) *sourcepb.Offset {
return &sourcepb.Offset{
PartitionId: strconv.Itoa(int(offset.PartitionIdx())),
PartitionId: offset.PartitionIdx(),
Offset: []byte(offset.String()),
}
}

func ConvertToIsbOffset(offset *sourcepb.Offset) isb.Offset {
if partitionIdx, err := strconv.Atoi(offset.GetPartitionId()); err != nil {
// If the partition ID is not a number, use the default partition index
// TODO - should we require UDSource users to return us a number instead of string as partition ID?
return NewSimpleSourceOffset(string(offset.Offset), DefaultPartitionIdx)
} else {
return NewSimpleSourceOffset(string(offset.Offset), int32(partitionIdx))
}
return NewSimpleSourceOffset(string(offset.Offset), offset.GetPartitionId())
}
2 changes: 1 addition & 1 deletion pkg/sources/udsource/utils/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestOffsetConversion(t *testing.T) {
assert.Equal(t, testIsbOffset.PartitionIdx(), convertedBackIsbOffset.PartitionIdx())
assert.Equal(t, testIsbOffset.String(), convertedBackIsbOffset.String())
testSrcOffset := &sourcepb.Offset{
PartitionId: "0",
PartitionId: 0,
Offset: []byte("test"),
}
convertedIsbOffset := ConvertToIsbOffset(testSrcOffset)
Expand Down
4 changes: 2 additions & 2 deletions test/idle-source-e2e/idle_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (is *IdleSourceSuite) TestIdleKeyedReducePipeline() {

done := make(chan struct{})
go func() {
// publish messages to source vertex, with event time starting from 60000
startTime := 100
// publish messages to source vertex, with event time starting from 1000
startTime := 1000
for i := 0; true; i++ {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ spec:
threshold: 5s # The pipeline will be considered idle if the source has not emitted any data for given threshold value.
incrementBy: 3s # If source is found to be idle then increment the watermark by given incrementBy value.
stepInterval: 2s # If source is idling then publish the watermark only when step interval has passed.
limits:
readBatchSize: 50
vertices:
- name: in
scale:
Expand Down

0 comments on commit 08d2689

Please sign in to comment.