diff --git a/cache/sink.go b/cache/sink.go index c73b8cf..7090341 100644 --- a/cache/sink.go +++ b/cache/sink.go @@ -7,7 +7,7 @@ import ( "github.com/msales/streams" ) -type CacheSink struct { +type Sink struct { ctx streams.Context cache cache.Cache @@ -17,9 +17,9 @@ type CacheSink struct { count int } -// NewCacheSink creates a new cache insert sink. -func NewCacheSink(cache cache.Cache, expire time.Duration) *CacheSink { - return &CacheSink{ +// NewSink creates a new cache insert sink. +func NewSink(cache cache.Cache, expire time.Duration) *Sink { + return &Sink{ cache: cache, expire: expire, batch: 1000, @@ -27,12 +27,12 @@ func NewCacheSink(cache cache.Cache, expire time.Duration) *CacheSink { } // WithContext sets the context on the Processor. -func (p *CacheSink) WithContext(ctx streams.Context) { +func (p *Sink) WithContext(ctx streams.Context) { p.ctx = ctx } // Process processes the stream record. -func (p *CacheSink) Process(key, value interface{}) error { +func (p *Sink) Process(key, value interface{}) error { k := key.(string) p.cache.Set(k, value, p.expire) @@ -47,6 +47,6 @@ func (p *CacheSink) Process(key, value interface{}) error { } // Close closes the processor. -func (p *CacheSink) Close() error { +func (p *Sink) Close() error { return p.ctx.Commit() } diff --git a/example/kafka/main.go b/example/kafka/main.go index d561871..47dc81d 100644 --- a/example/kafka/main.go +++ b/example/kafka/main.go @@ -45,11 +45,16 @@ func main() { } func producerTask(s stats.Stats, brokers []string, c *sarama.Config) (streams.Task, error) { - sink, err := kafka.NewKafkaSink("example1", brokers, *c) + config := kafka.NewSinkConfig() + config.Config = *c + config.Brokers = brokers + config.Topic = "example1" + config.ValueEncoder = kafka.StringEncoder{} + + sink, err := kafka.NewSink(config) if err != nil { return nil, err } - sink.WithValueEncoder(kafka.StringEncoder{}) builder := streams.NewStreamBuilder() builder.Source("rand-source", NewRandIntSource()). @@ -65,16 +70,22 @@ func producerTask(s stats.Stats, brokers []string, c *sarama.Config) (streams.Ta } func consumerTask(s stats.Stats, brokers []string, c *sarama.Config) (streams.Task, error) { - src, err := kafka.NewKafkaSource("example1", "example-consumer", brokers, *c) + config := kafka.NewSourceConfig() + config.Config = *c + config.Brokers = brokers + config.Topic = "example1" + config.GroupId = "example-consumer" + config.ValueDecoder = kafka.StringDecoder{} + + src, err := kafka.NewSource(config) if err != nil { return nil, err } - src.WithValueDecoder(kafka.StringDecoder{}) builder := streams.NewStreamBuilder() builder.Source("kafka-source", src). - Map("to-int", IntMapper) - // Print("print") + Map("to-int", IntMapper). + Print("print") task := streams.NewTask(builder.Build(), streams.WithStats(s)) task.OnError(func(err error) { diff --git a/kafka/sink.go b/kafka/sink.go index 3e76434..6ac4b1c 100644 --- a/kafka/sink.go +++ b/kafka/sink.go @@ -5,7 +5,52 @@ import ( "github.com/msales/streams" ) -type KafkaSink struct { +type SinkConfig struct { + sarama.Config + + Brokers []string + Topic string + + KeyEncoder Encoder + ValueEncoder Encoder + + BatchSize int +} + +func NewSinkConfig() *SinkConfig { + c := &SinkConfig{ + Config: *sarama.NewConfig(), + } + + c.KeyEncoder = ByteEncoder{} + c.ValueEncoder = ByteEncoder{} + c.BatchSize = 1000 + + return c +} + +// Validate checks a Config instance. It will return a +// sarama.ConfigurationError if the specified values don't make sense. +func (c *SinkConfig) Validate() error { + if err := c.Config.Validate(); err != nil { + return err + } + + switch { + case c.Brokers == nil || len(c.Brokers) == 0: + return sarama.ConfigurationError("Brokers mut have at least one broker") + case c.KeyEncoder == nil: + return sarama.ConfigurationError("KeyEncoder must be an instance of Encoder") + case c.ValueEncoder == nil: + return sarama.ConfigurationError("ValueEncoder must be an instance of Encoder") + case c.BatchSize <= 0: + return sarama.ConfigurationError("BatchSize must be at least 1") + } + + return nil +} + +type Sink struct { ctx streams.Context keyEncoder Encoder @@ -13,39 +58,41 @@ type KafkaSink struct { topic string producer sarama.SyncProducer + + batch int + count int + buf []*sarama.ProducerMessage } -func NewKafkaSink(topic string, brokers []string, c sarama.Config) (*KafkaSink, error) { - p, err := sarama.NewSyncProducer(brokers, &c) +func NewSink(c *SinkConfig) (*Sink, error) { + if err := c.Validate(); err != nil { + return nil, err + } + + p, err := sarama.NewSyncProducer(c.Brokers, &c.Config) if err != nil { return nil, err } - return &KafkaSink{ - topic: topic, - keyEncoder: ByteEncoder{}, - valueEncoder: ByteEncoder{}, + s := &Sink{ + topic: c.Topic, + keyEncoder: c.KeyEncoder, + valueEncoder: c.ValueEncoder, producer: p, - }, nil + batch: c.BatchSize, + buf: make([]*sarama.ProducerMessage, 0, c.BatchSize), + } + + return s, nil } // WithContext sets the context on the Processor. -func (p *KafkaSink) WithContext(ctx streams.Context) { +func (p *Sink) WithContext(ctx streams.Context) { p.ctx = ctx } -// WithKeyEncoder sets the Encoder to encode the key with. -func (p *KafkaSink) WithKeyEncoder(e Encoder) { - p.keyEncoder = e -} - -// WithValueEncoder sets the Encoder to encode the value with. -func (p *KafkaSink) WithValueEncoder(e Encoder) { - p.valueEncoder = e -} - // Process processes the stream record. -func (p *KafkaSink) Process(key, value interface{}) error { +func (p *Sink) Process(key, value interface{}) error { k, err := p.keyEncoder.Encode(key) if err != nil { return err @@ -61,15 +108,24 @@ func (p *KafkaSink) Process(key, value interface{}) error { Key: sarama.ByteEncoder(k), Value: sarama.ByteEncoder(v), } + p.buf = append(p.buf, msg) + p.count++ - if _, _, err := p.producer.SendMessage(msg); err != nil { - return err + if p.count >= p.batch { + if err := p.producer.SendMessages(p.buf); err != nil { + return err + } + + p.count = 0 + p.buf = make([]*sarama.ProducerMessage, 0, p.batch) + + return p.ctx.Commit() } - return p.ctx.Commit() + return nil } // Close closes the processor. -func (p *KafkaSink) Close() error { +func (p *Sink) Close() error { return p.producer.Close() } diff --git a/kafka/source.go b/kafka/source.go index a7caf21..1734814 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -6,7 +6,53 @@ import ( "github.com/pkg/errors" ) -type KafkaSource struct { +type SourceConfig struct { + sarama.Config + + Brokers []string + Topic string + GroupId string + + KeyDecoder Decoder + ValueDecoder Decoder + + BufferSize int +} + +func NewSourceConfig() *SourceConfig { + c := &SourceConfig{ + Config: *sarama.NewConfig(), + } + + c.KeyDecoder = ByteDecoder{} + c.ValueDecoder = ByteDecoder{} + c.BufferSize = 1000 + + return c +} + +// Validate checks a Config instance. It will return a +// sarama.ConfigurationError if the specified values don't make sense. +func (c *SourceConfig) Validate() error { + if err := c.Config.Validate(); err != nil { + return err + } + + switch { + case c.Brokers == nil || len(c.Brokers) == 0: + return sarama.ConfigurationError("Brokers mut have at least one broker") + case c.KeyDecoder == nil: + return sarama.ConfigurationError("KeyDecoder must be an instance of Decoder") + case c.ValueDecoder == nil: + return sarama.ConfigurationError("ValueDecoder must be an instance of Decoder") + case c.BufferSize <= 0: + return sarama.ConfigurationError("BufferSize must be at least 1") + } + + return nil +} + +type Source struct { consumer *cluster.Consumer keyDecoder Decoder @@ -17,21 +63,25 @@ type KafkaSource struct { lastErr error } -func NewKafkaSource(topic, group string, brokers []string, config sarama.Config) (*KafkaSource, error) { +func NewSource(c *SourceConfig) (*Source, error) { + if err := c.Validate(); err != nil { + return nil, err + } + cc := cluster.NewConfig() - cc.Config = config + cc.Config = c.Config cc.Consumer.Return.Errors = true - consumer, err := cluster.NewConsumer(brokers, group, []string{topic}, cc) + consumer, err := cluster.NewConsumer(c.Brokers, c.GroupId, []string{c.Topic}, cc) if err != nil { return nil, err } - s := &KafkaSource{ + s := &Source{ consumer: consumer, - keyDecoder: ByteDecoder{}, - valueDecoder: ByteDecoder{}, - buf: make(chan *sarama.ConsumerMessage, 1000), + keyDecoder: c.KeyDecoder, + valueDecoder: c.ValueDecoder, + buf: make(chan *sarama.ConsumerMessage, c.BufferSize), state: make(map[string]map[int32]int64), } @@ -41,15 +91,7 @@ func NewKafkaSource(topic, group string, brokers []string, config sarama.Config) return s, nil } -func (s *KafkaSource) WithKeyDecoder(d Decoder) { - s.keyDecoder = d -} - -func (s *KafkaSource) WithValueDecoder(d Decoder) { - s.valueDecoder = d -} - -func (s *KafkaSource) Consume() (key, value interface{}, err error) { +func (s *Source) Consume() (key, value interface{}, err error) { if s.lastErr != nil { return nil, nil, err } @@ -75,7 +117,7 @@ func (s *KafkaSource) Consume() (key, value interface{}, err error) { } } -func (s *KafkaSource) Commit() error { +func (s *Source) Commit() error { for topic, partitions := range s.state { for partition, offset := range partitions { s.consumer.MarkPartitionOffset(topic, partition, offset, "") @@ -89,11 +131,11 @@ func (s *KafkaSource) Commit() error { return nil } -func (s *KafkaSource) Close() error { +func (s *Source) Close() error { return s.consumer.Close() } -func (s *KafkaSource) markState(msg *sarama.ConsumerMessage) { +func (s *Source) markState(msg *sarama.ConsumerMessage) { partitions, ok := s.state[msg.Topic] if !ok { partitions = make(map[int32]int64) @@ -103,13 +145,13 @@ func (s *KafkaSource) markState(msg *sarama.ConsumerMessage) { partitions[msg.Partition] = msg.Offset } -func (s *KafkaSource) readErrors() { +func (s *Source) readErrors() { for err := range s.consumer.Errors() { s.lastErr = err } } -func (s *KafkaSource) readMessages() { +func (s *Source) readMessages() { for msg := range s.consumer.Messages() { s.buf <- msg } diff --git a/sql/sink.go b/sql/sink.go index e4c37d2..85c68d0 100644 --- a/sql/sink.go +++ b/sql/sink.go @@ -6,41 +6,41 @@ import ( "github.com/msales/streams" ) -type SqlTxFunc func(*sql.Tx) error +type TxFunc func(*sql.Tx) error -type SqlInsertFunc func(*sql.Tx, interface{}, interface{}) error +type InsertFunc func(*sql.Tx, interface{}, interface{}) error -type SqlSinkFunc func(*SqlSink) +type SinkFunc func(*Sink) -func WithBeginFn(fn SqlTxFunc) SqlSinkFunc { - return func(s *SqlSink) { +func WithBeginFn(fn TxFunc) SinkFunc { + return func(s *Sink) { s.beginFn = fn } } -func WithCommitFn(fn SqlTxFunc) SqlSinkFunc { - return func(s *SqlSink) { +func WithCommitFn(fn TxFunc) SinkFunc { + return func(s *Sink) { s.commitFn = fn } } -type SqlSink struct { +type Sink struct { ctx streams.Context db *sql.DB tx *sql.Tx - beginFn SqlTxFunc - insertFn SqlInsertFunc - commitFn SqlTxFunc + beginFn TxFunc + insertFn InsertFunc + commitFn TxFunc batch int count int } -// NewSqlSink creates a new batch sql insert sink. -func NewSqlSink(db *sql.DB, fn SqlInsertFunc, batch int, opts ...SqlSinkFunc) *SqlSink { - s := &SqlSink{ +// NewSink creates a new batch sql insert sink. +func NewSink(db *sql.DB, fn InsertFunc, batch int, opts ...SinkFunc) *Sink { + s := &Sink{ db: db, insertFn: fn, batch: batch, @@ -55,12 +55,12 @@ func NewSqlSink(db *sql.DB, fn SqlInsertFunc, batch int, opts ...SqlSinkFunc) *S } // WithContext sets the context on the Processor. -func (p *SqlSink) WithContext(ctx streams.Context) { +func (p *Sink) WithContext(ctx streams.Context) { p.ctx = ctx } // Process processes the stream record. -func (p *SqlSink) Process(key, value interface{}) error { +func (p *Sink) Process(key, value interface{}) error { if err := p.ensureTransaction(); err != nil { return err } @@ -78,7 +78,7 @@ func (p *SqlSink) Process(key, value interface{}) error { return nil } -func (p *SqlSink) ensureTransaction() error { +func (p *Sink) ensureTransaction() error { var err error if p.tx != nil { @@ -97,7 +97,7 @@ func (p *SqlSink) ensureTransaction() error { return nil } -func (p *SqlSink) commitTransaction() error { +func (p *Sink) commitTransaction() error { if p.tx == nil { return nil } @@ -118,7 +118,7 @@ func (p *SqlSink) commitTransaction() error { } // Close closes the processor. -func (p *SqlSink) Close() error { +func (p *Sink) Close() error { if err := p.commitTransaction(); err != nil { return err }