Skip to content

Commit

Permalink
Refactor sources and sinks (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored May 7, 2018
1 parent f319a17 commit 1061b11
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 78 deletions.
14 changes: 7 additions & 7 deletions cache/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/msales/streams"
)

type CacheSink struct {
type Sink struct {
ctx streams.Context

cache cache.Cache
Expand All @@ -17,22 +17,22 @@ type CacheSink struct {
count int
}

// NewCacheSink creates a new cache insert sink.
func NewCacheSink(cache cache.Cache, expire time.Duration) *CacheSink {
return &CacheSink{
// NewSink creates a new cache insert sink.
func NewSink(cache cache.Cache, expire time.Duration) *Sink {
return &Sink{
cache: cache,
expire: expire,
batch: 1000,
}
}

// WithContext sets the context on the Processor.
func (p *CacheSink) WithContext(ctx streams.Context) {
func (p *Sink) WithContext(ctx streams.Context) {
p.ctx = ctx
}

// Process processes the stream record.
func (p *CacheSink) Process(key, value interface{}) error {
func (p *Sink) Process(key, value interface{}) error {
k := key.(string)

p.cache.Set(k, value, p.expire)
Expand All @@ -47,6 +47,6 @@ func (p *CacheSink) Process(key, value interface{}) error {
}

// Close closes the processor.
func (p *CacheSink) Close() error {
func (p *Sink) Close() error {
return p.ctx.Commit()
}
23 changes: 17 additions & 6 deletions example/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ func main() {
}

func producerTask(s stats.Stats, brokers []string, c *sarama.Config) (streams.Task, error) {
sink, err := kafka.NewKafkaSink("example1", brokers, *c)
config := kafka.NewSinkConfig()
config.Config = *c
config.Brokers = brokers
config.Topic = "example1"
config.ValueEncoder = kafka.StringEncoder{}

sink, err := kafka.NewSink(config)
if err != nil {
return nil, err
}
sink.WithValueEncoder(kafka.StringEncoder{})

builder := streams.NewStreamBuilder()
builder.Source("rand-source", NewRandIntSource()).
Expand All @@ -65,16 +70,22 @@ func producerTask(s stats.Stats, brokers []string, c *sarama.Config) (streams.Ta
}

func consumerTask(s stats.Stats, brokers []string, c *sarama.Config) (streams.Task, error) {
src, err := kafka.NewKafkaSource("example1", "example-consumer", brokers, *c)
config := kafka.NewSourceConfig()
config.Config = *c
config.Brokers = brokers
config.Topic = "example1"
config.GroupId = "example-consumer"
config.ValueDecoder = kafka.StringDecoder{}

src, err := kafka.NewSource(config)
if err != nil {
return nil, err
}
src.WithValueDecoder(kafka.StringDecoder{})

builder := streams.NewStreamBuilder()
builder.Source("kafka-source", src).
Map("to-int", IntMapper)
// Print("print")
Map("to-int", IntMapper).
Print("print")

task := streams.NewTask(builder.Build(), streams.WithStats(s))
task.OnError(func(err error) {
Expand Down
104 changes: 80 additions & 24 deletions kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,94 @@ import (
"github.com/msales/streams"
)

type KafkaSink struct {
type SinkConfig struct {
sarama.Config

Brokers []string
Topic string

KeyEncoder Encoder
ValueEncoder Encoder

BatchSize int
}

func NewSinkConfig() *SinkConfig {
c := &SinkConfig{
Config: *sarama.NewConfig(),
}

c.KeyEncoder = ByteEncoder{}
c.ValueEncoder = ByteEncoder{}
c.BatchSize = 1000

return c
}

// Validate checks a Config instance. It will return a
// sarama.ConfigurationError if the specified values don't make sense.
func (c *SinkConfig) Validate() error {
if err := c.Config.Validate(); err != nil {
return err
}

switch {
case c.Brokers == nil || len(c.Brokers) == 0:
return sarama.ConfigurationError("Brokers mut have at least one broker")
case c.KeyEncoder == nil:
return sarama.ConfigurationError("KeyEncoder must be an instance of Encoder")
case c.ValueEncoder == nil:
return sarama.ConfigurationError("ValueEncoder must be an instance of Encoder")
case c.BatchSize <= 0:
return sarama.ConfigurationError("BatchSize must be at least 1")
}

return nil
}

type Sink struct {
ctx streams.Context

keyEncoder Encoder
valueEncoder Encoder

topic string
producer sarama.SyncProducer

batch int
count int
buf []*sarama.ProducerMessage
}

func NewKafkaSink(topic string, brokers []string, c sarama.Config) (*KafkaSink, error) {
p, err := sarama.NewSyncProducer(brokers, &c)
func NewSink(c *SinkConfig) (*Sink, error) {
if err := c.Validate(); err != nil {
return nil, err
}

p, err := sarama.NewSyncProducer(c.Brokers, &c.Config)
if err != nil {
return nil, err
}

return &KafkaSink{
topic: topic,
keyEncoder: ByteEncoder{},
valueEncoder: ByteEncoder{},
s := &Sink{
topic: c.Topic,
keyEncoder: c.KeyEncoder,
valueEncoder: c.ValueEncoder,
producer: p,
}, nil
batch: c.BatchSize,
buf: make([]*sarama.ProducerMessage, 0, c.BatchSize),
}

return s, nil
}

// WithContext sets the context on the Processor.
func (p *KafkaSink) WithContext(ctx streams.Context) {
func (p *Sink) WithContext(ctx streams.Context) {
p.ctx = ctx
}

// WithKeyEncoder sets the Encoder to encode the key with.
func (p *KafkaSink) WithKeyEncoder(e Encoder) {
p.keyEncoder = e
}

// WithValueEncoder sets the Encoder to encode the value with.
func (p *KafkaSink) WithValueEncoder(e Encoder) {
p.valueEncoder = e
}

// Process processes the stream record.
func (p *KafkaSink) Process(key, value interface{}) error {
func (p *Sink) Process(key, value interface{}) error {
k, err := p.keyEncoder.Encode(key)
if err != nil {
return err
Expand All @@ -61,15 +108,24 @@ func (p *KafkaSink) Process(key, value interface{}) error {
Key: sarama.ByteEncoder(k),
Value: sarama.ByteEncoder(v),
}
p.buf = append(p.buf, msg)
p.count++

if _, _, err := p.producer.SendMessage(msg); err != nil {
return err
if p.count >= p.batch {
if err := p.producer.SendMessages(p.buf); err != nil {
return err
}

p.count = 0
p.buf = make([]*sarama.ProducerMessage, 0, p.batch)

return p.ctx.Commit()
}

return p.ctx.Commit()
return nil
}

// Close closes the processor.
func (p *KafkaSink) Close() error {
func (p *Sink) Close() error {
return p.producer.Close()
}
86 changes: 64 additions & 22 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,53 @@ import (
"github.com/pkg/errors"
)

type KafkaSource struct {
type SourceConfig struct {
sarama.Config

Brokers []string
Topic string
GroupId string

KeyDecoder Decoder
ValueDecoder Decoder

BufferSize int
}

func NewSourceConfig() *SourceConfig {
c := &SourceConfig{
Config: *sarama.NewConfig(),
}

c.KeyDecoder = ByteDecoder{}
c.ValueDecoder = ByteDecoder{}
c.BufferSize = 1000

return c
}

// Validate checks a Config instance. It will return a
// sarama.ConfigurationError if the specified values don't make sense.
func (c *SourceConfig) Validate() error {
if err := c.Config.Validate(); err != nil {
return err
}

switch {
case c.Brokers == nil || len(c.Brokers) == 0:
return sarama.ConfigurationError("Brokers mut have at least one broker")
case c.KeyDecoder == nil:
return sarama.ConfigurationError("KeyDecoder must be an instance of Decoder")
case c.ValueDecoder == nil:
return sarama.ConfigurationError("ValueDecoder must be an instance of Decoder")
case c.BufferSize <= 0:
return sarama.ConfigurationError("BufferSize must be at least 1")
}

return nil
}

type Source struct {
consumer *cluster.Consumer

keyDecoder Decoder
Expand All @@ -17,21 +63,25 @@ type KafkaSource struct {
lastErr error
}

func NewKafkaSource(topic, group string, brokers []string, config sarama.Config) (*KafkaSource, error) {
func NewSource(c *SourceConfig) (*Source, error) {
if err := c.Validate(); err != nil {
return nil, err
}

cc := cluster.NewConfig()
cc.Config = config
cc.Config = c.Config
cc.Consumer.Return.Errors = true

consumer, err := cluster.NewConsumer(brokers, group, []string{topic}, cc)
consumer, err := cluster.NewConsumer(c.Brokers, c.GroupId, []string{c.Topic}, cc)
if err != nil {
return nil, err
}

s := &KafkaSource{
s := &Source{
consumer: consumer,
keyDecoder: ByteDecoder{},
valueDecoder: ByteDecoder{},
buf: make(chan *sarama.ConsumerMessage, 1000),
keyDecoder: c.KeyDecoder,
valueDecoder: c.ValueDecoder,
buf: make(chan *sarama.ConsumerMessage, c.BufferSize),
state: make(map[string]map[int32]int64),
}

Expand All @@ -41,15 +91,7 @@ func NewKafkaSource(topic, group string, brokers []string, config sarama.Config)
return s, nil
}

func (s *KafkaSource) WithKeyDecoder(d Decoder) {
s.keyDecoder = d
}

func (s *KafkaSource) WithValueDecoder(d Decoder) {
s.valueDecoder = d
}

func (s *KafkaSource) Consume() (key, value interface{}, err error) {
func (s *Source) Consume() (key, value interface{}, err error) {
if s.lastErr != nil {
return nil, nil, err
}
Expand All @@ -75,7 +117,7 @@ func (s *KafkaSource) Consume() (key, value interface{}, err error) {
}
}

func (s *KafkaSource) Commit() error {
func (s *Source) Commit() error {
for topic, partitions := range s.state {
for partition, offset := range partitions {
s.consumer.MarkPartitionOffset(topic, partition, offset, "")
Expand All @@ -89,11 +131,11 @@ func (s *KafkaSource) Commit() error {
return nil
}

func (s *KafkaSource) Close() error {
func (s *Source) Close() error {
return s.consumer.Close()
}

func (s *KafkaSource) markState(msg *sarama.ConsumerMessage) {
func (s *Source) markState(msg *sarama.ConsumerMessage) {
partitions, ok := s.state[msg.Topic]
if !ok {
partitions = make(map[int32]int64)
Expand All @@ -103,13 +145,13 @@ func (s *KafkaSource) markState(msg *sarama.ConsumerMessage) {
partitions[msg.Partition] = msg.Offset
}

func (s *KafkaSource) readErrors() {
func (s *Source) readErrors() {
for err := range s.consumer.Errors() {
s.lastErr = err
}
}

func (s *KafkaSource) readMessages() {
func (s *Source) readMessages() {
for msg := range s.consumer.Messages() {
s.buf <- msg
}
Expand Down
Loading

0 comments on commit 1061b11

Please sign in to comment.