Skip to content

Commit

Permalink
Merge pull request #9 from RuiFG/master
Browse files Browse the repository at this point in the history
feat: add checkpoint locker
  • Loading branch information
RuiFG authored Jan 5, 2023
2 parents 1be0f8d + 9147745 commit 5c706c6
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 203 deletions.
289 changes: 140 additions & 149 deletions streaming-connector/kafka-connector/source/source.go
Original file line number Diff line number Diff line change
@@ -1,151 +1,142 @@
package source

import (
"fmt"
"github.com/RuiFG/streaming/streaming-core/element"
. "github.com/RuiFG/streaming/streaming-core/operator"
"github.com/RuiFG/streaming/streaming-core/stream"
"github.com/Shopify/sarama"
"sync"
)

type topicAndPartition struct {
Topic string
Partition int32
}

type Config struct {
SaramaConfig *sarama.Config
Addresses []string
Topics []string
GroupId string
}

type FormatFn[OUT any] func(message *sarama.ConsumerMessage) OUT

type source[OUT any] struct {
BaseOperator[any, any, OUT]
FormatFn[OUT]
config Config
consumerGroup sarama.ConsumerGroup
state *map[topicAndPartition]int64

offsetMap *sync.Map
offsetsToCommit map[int64]map[topicAndPartition]int64
doneChan chan struct{}
commitChan chan map[topicAndPartition]int64
}

func (s *source[OUT]) Open(ctx Context, collector element.Collector[OUT]) (err error) {
if err = s.BaseOperator.Open(ctx, collector); err != nil {
return err
}
s.consumerGroup, err = sarama.NewConsumerGroup(s.config.Addresses, s.config.GroupId, s.config.SaramaConfig)
if err != nil {
return err
}
return nil
}

func (s *source[OUT]) Run() {
for {
var err error
select {
case <-s.doneChan:
return
default:
err = s.consumerGroup.Consume(s.Ctx, s.config.Topics, s)
if err != nil {
fmt.Println(err)
//s.ctx.Logger().Warnw("can't consume kafka.", "err", err)
}

}
}

}

func (s *source[OUT]) Close() error {
close(s.doneChan)
return nil
}

// ----------------------------------barrier.Listener----------------------------------

func (s *source[OUT]) NotifyBarrierCome(detail element.Detail) {
m := map[topicAndPartition]int64{}
s.offsetMap.Range(func(key, value any) bool {
realKey := key.(topicAndPartition)
realValue := value.(int64)
m[realKey] = realValue
return true
})
s.offsetsToCommit[detail.Id] = m
s.state.Update(m)
}

func (s *source[OUT]) NotifyBarrierComplete(detail element.Detail) {
if commit, ok := s.offsetsToCommit[detail.Id]; ok {
s.commitChan <- commit
}
delete(s.offsetsToCommit, detail.Id)
}

func (s *source[OUT]) NotifyBarrierCancel(detail element.Detail) {
delete(s.offsetsToCommit, detail.Id)
}

// ----------------------------------ConsumerGroupHandler----------------------------------

func (s *source[OUT]) Setup(session sarama.ConsumerGroupSession) error {
s.offsetMap.Range(func(key, value any) bool {
realKey := key.(topicAndPartition)
realValue := value.(int64)
session.ResetOffset(realKey.Topic, realKey.Partition, realValue, "")
return true
})

return nil
}

func (s *source[OUT]) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (s *source[OUT]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
s.Collector.EmitEvent(&element.Event[OUT]{
Value: s.FormatFn(message),
Timestamp: 0,
HasTimestamp: false,
})
s.offsetMap.Store(topicAndPartition{Topic: message.Topic, Partition: message.Partition}, message.Offset)
case <-s.doneChan:
return nil
case commit := <-s.commitChan:
for tp, offset := range commit {
session.MarkOffset(tp.Topic, tp.Partition, offset, "")
}
session.Commit()
}
}

}

func FromSource[OUT any](env *stream.Environment, config Config, formatFn FormatFn[OUT], name string) (*stream.SourceStream[OUT], error) {
stream.FormSource(env, stream.SourceStreamOptions[OUT]{
Options: stream.Options{Name: name},
New: func() Source[OUT] {
return &source[OUT]{
FormatFn: formatFn,
config: config,
offsetMap: &sync.Map{},
offsetsToCommit: map[int64]map[topicAndPartition]int64{},
doneChan: make(chan struct{}),
commitChan: make(chan map[topicAndPartition]int64),
}
},
ElementListeners: nil,
})

}
//type topicAndPartition struct {
// Topic string
// Partition int32
//}
//
//type Config struct {
// SaramaConfig *sarama.Config
// Addresses []string
// Topics []string
// GroupId string
//}
//
//type FormatFn[OUT any] func(message *sarama.ConsumerMessage) OUT
//
//type source[OUT any] struct {
// BaseOperator[any, any, OUT]
// FormatFn[OUT]
// config Config
// consumerGroup sarama.ConsumerGroup
// state *map[topicAndPartition]int64
//
// offsetMap *sync.Map
// offsetsToCommit map[int64]map[topicAndPartition]int64
// doneChan chan struct{}
// commitChan chan map[topicAndPartition]int64
//}
//
//func (s *source[OUT]) Open(ctx Context, collector element.Collector[OUT]) (err error) {
// if err = s.BaseOperator.Open(ctx, collector); err != nil {
// return err
// }
// s.consumerGroup, err = sarama.NewConsumerGroup(s.config.Addresses, s.config.GroupId, s.config.SaramaConfig)
// if err != nil {
// return err
// }
// return nil
//}
//
//func (s *source[OUT]) Run() {
// for {
// var err error
// select {
// case <-s.doneChan:
// return
// default:
// err = s.consumerGroup.Consume(s.Ctx, s.config.Topics, s)
// if err != nil {
// fmt.Println(err)
// //s.ctx.Logger().Warnw("can't consume kafka.", "err", err)
// }
//
// }
// }
//
//}
//
//func (s *source[OUT]) Close() error {
// close(s.doneChan)
// return nil
//}
//
//// ----------------------------------barrier.Listener----------------------------------
//
//func (s *source[OUT]) NotifyBarrierCome(detail element.Detail) {
// m := map[topicAndPartition]int64{}
// s.offsetMap.Range(func(key, value any) bool {
// realKey := key.(topicAndPartition)
// realValue := value.(int64)
// m[realKey] = realValue
// return true
// })
// s.offsetsToCommit[detail.Id] = m
// s.state.Update(m)
//}
//
//func (s *source[OUT]) NotifyBarrierComplete(detail element.Detail) {
// if commit, ok := s.offsetsToCommit[detail.Id]; ok {
// s.commitChan <- commit
// }
// delete(s.offsetsToCommit, detail.Id)
//}
//
//func (s *source[OUT]) NotifyBarrierCancel(detail element.Detail) {
// delete(s.offsetsToCommit, detail.Id)
//}
//
//// ----------------------------------ConsumerGroupHandler----------------------------------
//
//func (s *source[OUT]) Setup(session sarama.ConsumerGroupSession) error {
// s.offsetMap.Range(func(key, value any) bool {
// realKey := key.(topicAndPartition)
// realValue := value.(int64)
// session.ResetOffset(realKey.Topic, realKey.Partition, realValue, "")
// return true
// })
//
// return nil
//}
//
//func (s *source[OUT]) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
//
//func (s *source[OUT]) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// for {
// select {
// case message := <-claim.Messages():
// s.Collector.EmitEvent(&element.Event[OUT]{
// Value: s.FormatFn(message),
// Timestamp: 0,
// HasTimestamp: false,
// })
// s.offsetMap.Store(topicAndPartition{Topic: message.Topic, Partition: message.Partition}, message.Offset)
// case <-s.doneChan:
// return nil
// case commit := <-s.commitChan:
// for tp, offset := range commit {
// session.MarkOffset(tp.Topic, tp.Partition, offset, "")
// }
// session.Commit()
// }
// }
//
//}
//
//func FromSource[OUT any](env *stream.Environment, config Config, formatFn FormatFn[OUT], name string) (*stream.SourceStream[OUT], error) {
// stream.FormSource(env, stream.SourceStreamOptions[OUT]{
// Options: stream.Options{Name: name},
// New: func() Source[OUT] {
// return &source[OUT]{
// FormatFn: formatFn,
// config: config,
// offsetMap: &sync.Map{},
// offsetsToCommit: map[int64]map[topicAndPartition]int64{},
// doneChan: make(chan struct{}),
// commitChan: make(chan map[topicAndPartition]int64),
// }
// },
// ElementListeners: nil,
// })
//
//}
19 changes: 0 additions & 19 deletions streaming-core/common/safe/go.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,3 @@ func Run(fn func() error) (err error) {
err = fn()
return err
}

func Go(fn func() error) <-chan error {
c := make(chan error)
go func() {
if err := Run(fn); err != nil {
c <- err
close(c)
}
}()
return c
}

func GoChannel(fn func() error, errorChan chan<- error) {
go func() {
if err := Run(fn); err != nil {
errorChan <- err
}
}()
}
11 changes: 1 addition & 10 deletions streaming-core/operator/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package operator

import (
"fmt"
"github.com/RuiFG/streaming/streaming-core/common/safe"
"github.com/RuiFG/streaming/streaming-core/element"
)

Expand All @@ -16,15 +15,7 @@ func (o *SourceOperatorWrap[OUT]) Open(ctx Context, collector element.Collector[
if err := o.Source.Open(ctx, collector); err != nil {
return fmt.Errorf("failed to open source operator: %w", err)
}
go func() {
if err := safe.Run(func() error {
o.Source.Run()
return nil
}); err == nil {
return
}
ctx.Logger().Warn("source operator exited unexpectedly, restarting.")
}()
go o.Source.Run()
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions streaming-core/operator/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"github.com/RuiFG/streaming/streaming-core/store"
"github.com/uber-go/tally/v4"
"go.uber.org/zap"
"sync"
)

type context struct {
storeController store.Controller
logger *zap.Logger
locker sync.Locker
timerManager *TimerManager
callerChan chan *executor.Executor
scope tally.Scope
Expand Down Expand Up @@ -37,18 +39,24 @@ func (c *context) TimerManager() *TimerManager {
return c.timerManager
}

func (c *context) Locker() sync.Locker {
return c.locker
}

func NewContext(
logger *zap.Logger,
scope tally.Scope,
controller store.Controller,
callerChan chan *executor.Executor,
manager *TimerManager,
locker sync.Locker,
) Context {
return &context{
storeController: controller,
logger: logger,
scope: scope,
timerManager: manager,
callerChan: callerChan,
locker: locker,
}
}
2 changes: 2 additions & 0 deletions streaming-core/operator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"github.com/RuiFG/streaming/streaming-core/store"
"github.com/uber-go/tally/v4"
"go.uber.org/zap"
"sync"
)

type Context interface {
Logger() *zap.Logger
Scope() tally.Scope
Store() store.Controller
Locker() sync.Locker
TimerManager() *TimerManager
//Exec will call func that are mutually exclusive
Exec(func()) *executor.Executor
Expand Down
Loading

0 comments on commit 5c706c6

Please sign in to comment.