Skip to content

Commit

Permalink
Metadata (#10)
Browse files Browse the repository at this point in the history
Fixes #5
  • Loading branch information
nrwiersma authored Jun 27, 2018
1 parent 44baf82 commit 7b0fbea
Show file tree
Hide file tree
Showing 24 changed files with 274 additions and 231 deletions.
10 changes: 6 additions & 4 deletions cache/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ type Sink struct {
cache cache.Cache
expire time.Duration

batch int
count int
batch int
count int
lastMsg *streams.Message
}

// NewSink creates a new cache insert sink.
Expand All @@ -39,16 +40,17 @@ func (p *Sink) Process(msg *streams.Message) error {
return err
}

p.lastMsg = msg
p.count++
if p.count >= p.batch {
p.count = 0
return p.pipe.Commit()
return p.pipe.Commit(msg)
}

return nil
}

// Close closes the processor.
func (p *Sink) Close() error {
return p.pipe.Commit()
return p.pipe.Commit(p.lastMsg)
}
2 changes: 1 addition & 1 deletion example/branch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *RandIntSource) Consume() (*streams.Message, error) {
return streams.NewMessageWithContext(s.ctx, nil, s.rand.Intn(100)), nil
}

func (s *RandIntSource) Commit() error {
func (s *RandIntSource) Commit(v interface{}) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion example/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *RandIntSource) Consume() (*streams.Message, error) {
return streams.NewMessageWithContext(s.ctx, nil, s.rand.Intn(100)), nil
}

func (s *RandIntSource) Commit() error {
func (s *RandIntSource) Commit(v interface{}) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion example/merge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *RandIntSource) Consume() (*streams.Message, error) {
return streams.NewMessage(nil, s.rand.Intn(100)), nil
}

func (s *RandIntSource) Commit() error {
func (s *RandIntSource) Commit(v interface{}) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion example/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *RandomIntSource) Consume() (*streams.Message, error) {
return streams.NewMessage(nil, s.rand.Intn(100)), nil
}

func (s *RandomIntSource) Commit() error {
func (s *RandomIntSource) Commit(v interface{}) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *Sink) Process(msg *streams.Message) error {
p.count = 0
p.buf = make([]*sarama.ProducerMessage, 0, p.batch)

return p.pipe.Commit()
return p.pipe.Commit(msg)
}

return nil
Expand Down
24 changes: 8 additions & 16 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kafka

import (
"context"
"sync"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -70,8 +69,6 @@ type Source struct {
valueDecoder Decoder

state map[string]map[int32]int64
stateLock sync.Mutex

buf chan *sarama.ConsumerMessage
lastErr error
}
Expand Down Expand Up @@ -124,27 +121,24 @@ func (s *Source) Consume() (*streams.Message, error) {
return nil, err
}

s.markState(msg)

return streams.NewMessageWithContext(s.ctx, k, v), nil
m := streams.NewMessageWithContext(s.ctx, k, v).
WithMetadata(s, s.markState(msg))
return m, nil

case <-time.After(100 * time.Millisecond):
return streams.NewMessage(nil, nil), nil
}
}

// Commit marks the consumed records as processed.
func (s *Source) Commit() error {
s.stateLock.Lock()

for topic, partitions := range s.state {
func (s *Source) Commit(v interface{}) error {
state := v.(map[string]map[int32]int64)
for topic, partitions := range state {
for partition, offset := range partitions {
s.consumer.MarkPartitionOffset(topic, partition, offset, "")
}
}

s.stateLock.Unlock()

if err := s.consumer.CommitOffsets(); err != nil {
return errors.Wrap(err, "streams: could not commit kafka offset")
}
Expand All @@ -157,9 +151,7 @@ func (s *Source) Close() error {
return s.consumer.Close()
}

func (s *Source) markState(msg *sarama.ConsumerMessage) {
s.stateLock.Lock()

func (s *Source) markState(msg *sarama.ConsumerMessage) map[string]map[int32]int64 {
partitions, ok := s.state[msg.Topic]
if !ok {
partitions = make(map[int32]int64)
Expand All @@ -168,7 +160,7 @@ func (s *Source) markState(msg *sarama.ConsumerMessage) {

partitions[msg.Partition] = msg.Offset

s.stateLock.Unlock()
return s.state
}

func (s *Source) readErrors() {
Expand Down
26 changes: 20 additions & 6 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,41 @@ import (
)

type Message struct {
metadata map[Source]interface{}

Ctx context.Context
Key interface{}
Value interface{}
}

func (m *Message) Metadata() map[Source]interface{} {
return m.metadata
}

func (m *Message) WithMetadata(s Source, v interface{}) *Message {
m.metadata[s] = v

return m
}

func (m Message) Empty() bool {
return m.Key == nil && m.Value == nil
}

func NewMessage(k, v interface{}) *Message {
return &Message{
Ctx: context.Background(),
Key: k,
Value: v,
metadata: map[Source]interface{}{},
Ctx: context.Background(),
Key: k,
Value: v,
}
}

func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message {
return &Message{
Ctx: ctx,
Key: k,
Value: v,
metadata: map[Source]interface{}{},
Ctx: ctx,
Key: k,
Value: v,
}
}
9 changes: 9 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,12 @@ func TestNewMessageWithContext(t *testing.T) {
assert.Equal(t, "test", msg.Key)
assert.Equal(t, "test", msg.Value)
}

func TestMessage_Metadata(t *testing.T) {
msg := NewMessage("test", "test")

msg.WithMetadata(nil, "test")

assert.Len(t, msg.Metadata(), 1)
assert.Equal(t, "test", msg.Metadata()[nil])
}
22 changes: 21 additions & 1 deletion mocks/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ type record struct {
index int
}

type ForwardFunc func(message *streams.Message)

type Pipe struct {
t *testing.T

fn ForwardFunc

shouldError bool

expectForward []record
Expand All @@ -30,6 +34,12 @@ func NewPipe(t *testing.T) *Pipe {
}

func (p *Pipe) Forward(msg *streams.Message) error {
if p.fn != nil {
p.fn(msg)

return nil
}

if len(p.expectForward) == 0 {
p.t.Error("streams: mock: Unexpected call to Forward")
return nil
Expand All @@ -50,6 +60,12 @@ func (p *Pipe) Forward(msg *streams.Message) error {
}

func (p *Pipe) ForwardToChild(msg *streams.Message, index int) error {
if p.fn != nil {
p.fn(msg)

return nil
}

if len(p.expectForward) == 0 {
p.t.Error("streams: mock: Unexpected call to ForwardToChild")
return nil
Expand All @@ -69,7 +85,7 @@ func (p *Pipe) ForwardToChild(msg *streams.Message, index int) error {
return nil
}

func (p *Pipe) Commit() error {
func (p *Pipe) Commit(msg *streams.Message) error {
if !p.expectCommit {
p.t.Error("streams: mock: Unexpected call to Commit")
}
Expand All @@ -87,6 +103,10 @@ func (p *Pipe) ShouldError() {
p.shouldError = true
}

func (p *Pipe) OnForward(fn ForwardFunc) {
p.fn = fn
}

func (p *Pipe) ExpectForward(k, v interface{}) {
p.expectForward = append(p.expectForward, record{k, v, -1})
}
Expand Down
Loading

0 comments on commit 7b0fbea

Please sign in to comment.