Skip to content

Commit

Permalink
Strategy (#24)
Browse files Browse the repository at this point in the history
**BREAKING**:
* Add Metadata Strategy to deal with duplication in some cases
  • Loading branch information
nrwiersma authored Dec 13, 2018
1 parent f6f79e6 commit 1386053
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 170 deletions.
6 changes: 1 addition & 5 deletions fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ type fakeMetadata struct{}
func (m *fakeMetadata) WithOrigin(streams.MetadataOrigin) {
}

func (m *fakeMetadata) Update(streams.Metadata) streams.Metadata {
return m
}

func (m *fakeMetadata) Merge(v streams.Metadata) streams.Metadata {
func (m *fakeMetadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata {
return m
}

Expand Down
30 changes: 9 additions & 21 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (m Metadata) WithOrigin(o streams.MetadataOrigin) {
}
}

// Update updates the given metadata with the contained metadata.
func (m Metadata) Update(v streams.Metadata) streams.Metadata {
// Merge merges the contained metadata into the given the metadata.
func (m Metadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata {
if v == nil {
return m
}
Expand All @@ -94,29 +94,17 @@ func (m Metadata) Update(v streams.Metadata) streams.Metadata {
continue
}

if newPos.Offset > oldPos.Offset {
metadata[i] = newPos
if newPos.Origin > oldPos.Origin {
continue
}
}

return metadata
}

// Merge merges the contained metadata into the given the metadata.
func (m Metadata) Merge(v streams.Metadata) streams.Metadata {
if v == nil {
return m
}

metadata := v.(Metadata)
for _, newPos := range m {
i, oldPos := metadata.find(newPos.Topic, newPos.Partition)
if oldPos == nil {
metadata = append(metadata, newPos)
continue
if newPos.Origin < oldPos.Origin {
metadata[i] = newPos
}

if (newPos.Origin == oldPos.Origin && newPos.Offset < oldPos.Offset) || (newPos.Origin < oldPos.Origin) {
// At this point origins are equal
if (s == streams.Lossless && newPos.Offset < oldPos.Offset) ||
(s == streams.Dupless && newPos.Offset > oldPos.Offset) {
metadata[i] = newPos
}
}
Expand Down
71 changes: 25 additions & 46 deletions kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,76 +92,55 @@ func TestMetadata_WithOrigin(t *testing.T) {
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, meta)
}

func TestMetadata_Update(t *testing.T) {
func TestMetadata_Merge(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 1, Offset: 2}}

res := meta1.Update(meta2)
res := meta2.Merge(meta1, streams.Lossless)

assert.IsType(t, kafka.Metadata{}, res)
meta1 = res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}, meta1)
}

func TestMetadata_UpdatePicksHighest(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}

res := meta1.Update(meta2)

assert.IsType(t, kafka.Metadata{}, res)
merged := res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}}, merged)
}

func TestMetadata_UpdateNilMerged(t *testing.T) {
meta := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}

res := meta.Update(nil)

assert.IsType(t, kafka.Metadata{}, res)
merged := res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}, merged)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}, {Topic: "foo", Partition: 1, Offset: 2}}, meta1)
}

func TestMetadata_UpdateNewPartition(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 1, Offset: 3}}
func TestMetadata_MergeTakesCommitterOverProcessorWhenCommitter(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}

res := meta2.Update(meta1)
res := meta2.Merge(meta1, streams.Lossless)

assert.IsType(t, kafka.Metadata{}, res)
merged := res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 10}, {Topic: "foo", Partition: 1, Offset: 3}}, merged)
resMeta := res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, resMeta)
}

func TestMetadata_Merge(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 1, Offset: 2}}
func TestMetadata_MergeTakesCommitterOverProcessorWhenProcessor(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}

res := meta2.Merge(meta1)
res := meta1.Merge(meta2, streams.Lossless)

assert.IsType(t, kafka.Metadata{}, res)
meta1 = res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}, {Topic: "foo", Partition: 1, Offset: 2}}, meta1)
resMeta := res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, resMeta)
}

func TestMetadata_MergeTakesCommitterOverProcessor(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}
func TestMetadata_MergeTakesHighestWhenTheSameOriginAndDupless(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.ProcessorOrigin}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}}

res := meta2.Merge(meta1)
res := meta2.Merge(meta1, streams.Dupless)

assert.IsType(t, kafka.Metadata{}, res)
meta1 = res.(kafka.Metadata)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.CommitterOrigin}}, meta1)
assert.Equal(t, kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.ProcessorOrigin}}, meta1)
}

func TestMetadata_MergeTakesLowestWhenTheSameOrigin(t *testing.T) {
func TestMetadata_MergeTakesLowestWhenTheSameOriginAndLossLess(t *testing.T) {
meta1 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3, Origin: streams.ProcessorOrigin}}
meta2 := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 2, Origin: streams.ProcessorOrigin}}

res := meta2.Merge(meta1)
res := meta2.Merge(meta1, streams.Lossless)

assert.IsType(t, kafka.Metadata{}, res)
meta1 = res.(kafka.Metadata)
Expand All @@ -171,7 +150,7 @@ func TestMetadata_MergeTakesLowestWhenTheSameOrigin(t *testing.T) {
func TestMetadata_MergeNilMerged(t *testing.T) {
b := kafka.Metadata{{Topic: "foo", Partition: 0, Offset: 3}}

res := b.Merge(nil)
res := b.Merge(nil, streams.Lossless)

assert.IsType(t, kafka.Metadata{}, res)
a := res.(kafka.Metadata)
Expand All @@ -186,7 +165,7 @@ func BenchmarkMetadata_Merge(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
meta = other.Update(meta)
meta = other.Merge(meta, streams.Lossless)
}
}

Expand Down
17 changes: 12 additions & 5 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@ import (
// MetadataOrigin represents the metadata origin type.
type MetadataOrigin uint8

// MetadataOrigin types
// MetadataOrigin types.
const (
CommitterOrigin MetadataOrigin = iota
ProcessorOrigin
)

// MetadataStrategy represents the metadata merge strategy.
type MetadataStrategy uint8

// MetadataStrategy types.
const (
Lossless MetadataStrategy = iota
Dupless
)

// Metadata represents metadata that can be merged.
type Metadata interface {
// WithOrigin sets the MetadataOrigin on the metadata.
WithOrigin(MetadataOrigin)
// Update updates the given metadata with the contained metadata.
Update(Metadata) Metadata
// Merge merges the contained metadata into the given the metadata.
Merge(Metadata) Metadata
// Merge merges the contained metadata into the given the metadata with the given strategy.
Merge(Metadata, MetadataStrategy) Metadata
}

// Message represents data the flows through the stream.
Expand Down
19 changes: 3 additions & 16 deletions metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,14 @@ type Metaitem struct {
// Metaitems represents a slice of Metaitem pointers.
type Metaitems []*Metaitem

// Update combines contents of two Metaitems objects, updating the Metadata where necessary.
func (m Metaitems) Update(items Metaitems) Metaitems {
return m.join(items, func(old, new Metadata) Metadata {
return old.Update(new)
})
}

// Merge combines contents of two Metaitems objects, merging the Metadata where necessary.
func (m Metaitems) Merge(items Metaitems) Metaitems {
return m.join(items, func(old, new Metadata) Metadata {
return old.Merge(new)
})
}

func (m Metaitems) join(items Metaitems, fn func(old, new Metadata) Metadata) Metaitems {
func (m Metaitems) Merge(items Metaitems, strategy MetadataStrategy) Metaitems {
OUTER:
for _, newItem := range items {
for _, oldItem := range m {
if oldItem.Source == newItem.Source {
if oldItem.Metadata != nil {
oldItem.Metadata = fn(oldItem.Metadata, newItem.Metadata)
oldItem.Metadata = oldItem.Metadata.Merge(newItem.Metadata, strategy)
}
continue OUTER
}
Expand Down Expand Up @@ -133,7 +120,7 @@ func (s *metastore) Mark(p Processor, src Source, meta Metadata) error {

for _, item := range items {
if item.Source == src {
item.Metadata = meta.Update(item.Metadata)
item.Metadata = meta.Merge(item.Metadata, Dupless)
return nil
}
}
Expand Down
53 changes: 12 additions & 41 deletions metastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/mock"
)

func TestMetaitems_Update(t *testing.T) {
func TestMetaitems_MergeDupless(t *testing.T) {
src1 := new(MockSource)
src2 := new(MockSource)
src3 := new(MockSource)
Expand All @@ -27,28 +27,19 @@ func TestMetaitems_Update(t *testing.T) {
items := streams.Metaitems{item1, item2}
other := streams.Metaitems{item3, item4}

meta2.On("Update", mock.Anything).Return(meta5)
meta2.On("Merge", mock.Anything, mock.Anything).Return(meta5)

joined := items.Update(other)
joined := items.Merge(other, streams.Dupless)

assert.Len(t, joined, 3)
assert.True(t, joined[0] == item1)
assert.True(t, joined[1] == item2)
assert.True(t, joined[2] == item3)
assert.True(t, meta5 == item2.Metadata)
meta2.AssertCalled(t, "Update", meta4)
meta2.AssertCalled(t, "Merge", meta4, streams.Dupless)
}

func TestMetaitems_UpdateHandlesNilSourceAndMetadata(t *testing.T) {
items := streams.Metaitems{{Source: nil, Metadata: nil}}
other := streams.Metaitems{{Source: nil, Metadata: nil}}

joined := items.Update(other)

assert.Len(t, joined, 1)
}

func TestMetaitems_Merge(t *testing.T) {
func TestMetaitems_MergeLossless(t *testing.T) {
src1 := new(MockSource)
src2 := new(MockSource)
src3 := new(MockSource)
Expand All @@ -67,47 +58,27 @@ func TestMetaitems_Merge(t *testing.T) {
items := streams.Metaitems{item1, item2}
other := streams.Metaitems{item3, item4}

meta2.On("Merge", mock.Anything).Return(meta5)
meta2.On("Merge", mock.Anything, mock.Anything).Return(meta5)

merged := items.Merge(other)
merged := items.Merge(other, streams.Lossless)

assert.Len(t, merged, 3)
assert.True(t, merged[0] == item1)
assert.True(t, merged[1] == item2)
assert.True(t, merged[2] == item3)
assert.True(t, meta5 == item2.Metadata)
meta2.AssertCalled(t, "Merge", meta4)
meta2.AssertCalled(t, "Merge", meta4, streams.Lossless)
}

func TestMetaitems_MergeHandlesNilSourceAndMetadata(t *testing.T) {
items := streams.Metaitems{{Source: nil, Metadata: nil}}
other := streams.Metaitems{{Source: nil, Metadata: nil}}

joined := items.Merge(other)
joined := items.Merge(other, streams.Lossless)

assert.Len(t, joined, 1)
}

func BenchmarkMetaitems_Update(b *testing.B) {
src1 := &fakeSource{}
src2 := &fakeSource{}
src3 := &fakeSource{}

meta1 := &fakeMetadata{}
meta2 := &fakeMetadata{}
meta3 := &fakeMetadata{}
meta4 := &fakeMetadata{}

items1 := streams.Metaitems{{Source: src1, Metadata: meta1}, {Source: src2, Metadata: meta2}}
items2 := streams.Metaitems{{Source: src3, Metadata: meta3}, {Source: src2, Metadata: meta4}}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
items1.Update(items2)
}
}

func BenchmarkMetaitems_Merge(b *testing.B) {
src1 := &fakeSource{}
src2 := &fakeSource{}
Expand All @@ -124,7 +95,7 @@ func BenchmarkMetaitems_Merge(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
items1.Merge(items2)
items1.Merge(items2, streams.Lossless)
}
}

Expand Down Expand Up @@ -165,7 +136,7 @@ func TestMetastore_MarkProcessor(t *testing.T) {
newMeta := new(MockMetadata)
meta2 := new(MockMetadata)
meta2.On("WithOrigin", streams.ProcessorOrigin)
meta2.On("Update", meta1).Return(newMeta)
meta2.On("Merge", meta1, streams.Dupless).Return(newMeta)
s := streams.NewMetastore()

err := s.Mark(p, src, meta1)
Expand All @@ -186,7 +157,7 @@ func TestMetastore_MarkCommitter(t *testing.T) {
newMeta := new(MockMetadata)
meta2 := new(MockMetadata)
meta2.On("WithOrigin", streams.CommitterOrigin)
meta2.On("Update", meta1).Return(newMeta)
meta2.On("Merge", meta1, streams.Dupless).Return(newMeta)
s := streams.NewMetastore()

err := s.Mark(p, src, meta1)
Expand Down
9 changes: 2 additions & 7 deletions mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@ func (m *MockMetadata) WithOrigin(o streams.MetadataOrigin) {
m.Called(o)
}

func (m *MockMetadata) Update(v streams.Metadata) streams.Metadata {
args := m.Called(v)
return args.Get(0).(streams.Metadata)
}

func (m *MockMetadata) Merge(v streams.Metadata) streams.Metadata {
args := m.Called(v)
func (m *MockMetadata) Merge(v streams.Metadata, s streams.MetadataStrategy) streams.Metadata {
args := m.Called(v, s)
return args.Get(0).(streams.Metadata)
}

Expand Down
2 changes: 1 addition & 1 deletion nanotime.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package streams

import (
_ "unsafe"
_ "unsafe" // Required in order to import nanotime
)

//go:linkname nanotime runtime.nanotime
Expand Down
Loading

0 comments on commit 1386053

Please sign in to comment.