diff --git a/.travis.yml b/.travis.yml index e1e6193..64a2687 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ go: before_install: - curl -L -s https://github.com/golang/dep/releases/download/v0.4.1/dep-linux-amd64 -o $GOPATH/bin/dep - chmod +x $GOPATH/bin/dep - - go get -u github.com/golang/lint/golint + - go get -u golang.org/x/lint/golint - go get github.com/mattn/goveralls - go get golang.org/x/tools/cmd/cover diff --git a/Gopkg.lock b/Gopkg.lock index 213f098..6831435 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,54 +2,79 @@ [[projects]] + digest = "1:f11e03e8297265765272534835027251cc808ed6dab124f5f5dbc37f7c908abb" name = "github.com/Shopify/sarama" packages = ["."] - revision = "35324cf48e33d8260e1c7c18854465a904ade249" - version = "v1.17.0" + pruneopts = "" + revision = "ec843464b50d4c8b56403ec9d589cf41ea30e722" + version = "v1.19.0" [[projects]] branch = "master" + digest = "1:c0bec5f9b98d0bc872ff5e834fac186b807b656683bd29cb82fb207a1513fabb" + name = "github.com/beorn7/perks" + packages = ["quantile"] + pruneopts = "" + revision = "3a771d992973f24aa725d07868b467d1ddfceafb" + +[[projects]] + branch = "master" + digest = "1:339bac0d398920a11fd7e8a7fbd4f133d4edd4faa1c2cbaba256d3684a266019" name = "github.com/bradfitz/gomemcache" packages = ["memcache"] - revision = "1952afaa557dc08e8e0d89eafab110fb501c1a2b" + pruneopts = "" + revision = "bc664df9673713a0ccf26e3b55a673ec7301088b" [[projects]] + digest = "1:6da5545112f73dbad12895d25e39818c1c3e8040ebba488d4d3fe43bc8685eb6" name = "github.com/bsm/sarama-cluster" packages = ["."] - revision = "cf455bc755fe41ac9bb2861e7a961833d9c2ecc3" - version = "v2.1.13" + pruneopts = "" + revision = "c618e605e15c0d7535f6c96ff8efbb0dba4fd66c" + version = "v2.1.15" [[projects]] + digest = "1:47fe89a242ccbae03d31b4c665d3d983786acd316c0d7c51bcfa0d019b205004" name = "github.com/cactus/go-statsd-client" packages = ["statsd"] + pruneopts = "" revision = "138b925ccdf617776955904ba7759fce64406cec" version = "v3.1.1" [[projects]] + digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77" name = "github.com/davecgh/go-spew" packages = ["spew"] - revision = "346938d642f2ec3594ed81d874461961cd0faa76" - version = "v1.1.0" + pruneopts = "" + revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" + version = "v1.1.1" [[projects]] + digest = "1:6d6672f85a84411509885eaa32f597577873de00e30729b9bb0eb1e1faa49c12" name = "github.com/eapache/go-resiliency" packages = ["breaker"] + pruneopts = "" revision = "ea41b0fad31007accc7f806884dcdf3da98b79ce" version = "v1.1.0" [[projects]] branch = "master" + digest = "1:6643c01e619a68f80ac12ad81223275df653528c6d7e3788291c1fd6f1d622f6" name = "github.com/eapache/go-xerial-snappy" packages = ["."] - revision = "bb955e01b9346ac19dc29eb16586c90ded99a98c" + pruneopts = "" + revision = "776d5712da21bc4762676d614db1d8a64f4238b0" [[projects]] + digest = "1:d8d46d21073d0f65daf1740ebf4629c65e04bf92e14ce93c2201e8624843c3d3" name = "github.com/eapache/queue" packages = ["."] + pruneopts = "" revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" version = "v1.1.0" [[projects]] + digest = "1:52735cb54d5122db78b96f7c8980fcb91dba9a65497bae324996d1ea152ad7d4" name = "github.com/go-redis/redis" packages = [ ".", @@ -59,114 +84,207 @@ "internal/pool", "internal/proto", "internal/singleflight", - "internal/util" + "internal/util", ] - revision = "83fb42932f6145ce52df09860384a4653d2d332a" - version = "v6.12.0" + pruneopts = "" + revision = "f3bba01df2026fc865f7782948845db9cf44cf23" + version = "v6.14.1" [[projects]] + digest = "1:a01080d20c45c031c13f3828c56e58f4f51d926a482ad10cc0316225097eb7ea" name = "github.com/go-stack/stack" packages = ["."] - revision = "259ab82a6cad3992b4e21ff5cac294ccb06474bc" - version = "v1.7.0" + pruneopts = "" + revision = "2fee6af1a9795aafbe0253a0cfbdf668e1fb8a9a" + version = "v1.8.0" + +[[projects]] + digest = "1:3dd078fda7500c341bc26cfbc6c6a34614f295a2457149fc1045cab767cbcf18" + name = "github.com/golang/protobuf" + packages = ["proto"] + pruneopts = "" + revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5" + version = "v1.2.0" [[projects]] branch = "master" + digest = "1:2a5888946cdbc8aa360fd43301f9fc7869d663f60d5eedae7d4e6e5e4f06f2bf" name = "github.com/golang/snappy" packages = ["."] + pruneopts = "" revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" [[projects]] + digest = "1:9ea83adf8e96d6304f394d40436f2eb44c1dc3250d223b74088cc253a6cd0a1c" name = "github.com/mattn/go-colorable" packages = ["."] + pruneopts = "" revision = "167de6bfdfba052fa6b2d3664c8f5272e23c9072" version = "v0.0.9" [[projects]] + digest = "1:3140e04675a6a91d2a20ea9d10bdadf6072085502e6def6768361260aee4b967" name = "github.com/mattn/go-isatty" packages = ["."] - revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39" - version = "v0.0.3" + pruneopts = "" + revision = "6ca4dbf54d38eea1a992b3c722a76a5d1c4cb25c" + version = "v0.0.4" + +[[projects]] + digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" + name = "github.com/matttproud/golang_protobuf_extensions" + packages = ["pbutil"] + pruneopts = "" + revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" + version = "v1.0.1" [[projects]] + digest = "1:4d5b9abaf46aabf48c8a84084247b6a4e6e14dc46b1b93a56c262ac97970e3ee" name = "github.com/msales/pkg" packages = [ "cache", "log", - "stats" + "stats", ] - revision = "17f08a8e63b432dc9a202d4fdf7479e7a6b7e31c" - version = "v2.0.0" + pruneopts = "" + revision = "5cf7c801763778eaab3421be1a824ca4a367c600" + version = "v2.4.1" [[projects]] + digest = "1:b1df71d0b2287062b90c6b4c8d3c934440aa0d2eb201d03f22be0f045860b4aa" name = "github.com/pierrec/lz4" packages = [ ".", - "internal/xxh32" + "internal/xxh32", ] - revision = "6b9367c9ff401dbc54fabce3fb8d972e799b702d" - version = "v2.0.2" + pruneopts = "" + revision = "635575b42742856941dbc767b44905bb9ba083f6" + version = "v2.0.7" [[projects]] + digest = "1:7365acd48986e205ccb8652cc746f09c8b7876030d53710ea6ef7d0bd0dcd7ca" name = "github.com/pkg/errors" packages = ["."] + pruneopts = "" revision = "645ef00459ed84a119197bfb8d8205042c6df63d" version = "v0.8.0" [[projects]] + digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" packages = ["difflib"] + pruneopts = "" revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" +[[projects]] + digest = "1:4142d94383572e74b42352273652c62afec5b23f325222ed09198f46009022d1" + name = "github.com/prometheus/client_golang" + packages = [ + "prometheus", + "prometheus/promhttp", + ] + pruneopts = "" + revision = "c5b7fccd204277076155f10851dad72b76a49317" + version = "v0.8.0" + +[[projects]] + branch = "master" + digest = "1:185cf55b1f44a1bf243558901c3f06efa5c64ba62cfdcbb1bf7bbe8c3fb68561" + name = "github.com/prometheus/client_model" + packages = ["go"] + pruneopts = "" + revision = "5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f" + [[projects]] branch = "master" + digest = "1:d1b5970f2a453e7c4be08117fb683b5d096bad9d17f119a6e58d4c561ca205dd" + name = "github.com/prometheus/common" + packages = [ + "expfmt", + "internal/bitbucket.org/ww/goautoneg", + "model", + ] + pruneopts = "" + revision = "bcb74de08d37a417cb6789eec1d6c810040f0470" + +[[projects]] + branch = "master" + digest = "1:1f62ed2c173c42c1edad2e94e127318ea11b0d28c62590c82a8d2d3cde189afe" + name = "github.com/prometheus/procfs" + packages = [ + ".", + "internal/util", + "nfs", + "xfs", + ] + pruneopts = "" + revision = "185b4288413d2a0dd0806f78c90dde719829e5ae" + +[[projects]] + branch = "master" + digest = "1:15bcdc717654ef21128e8af3a63eec39a6d08a830e297f93d65163f87c8eb523" name = "github.com/rcrowley/go-metrics" packages = ["."] + pruneopts = "" revision = "e2704e165165ec55d062f5919b4b29494e9fa790" [[projects]] + digest = "1:711eebe744c0151a9d09af2315f0bb729b2ec7637ef4c410fa90a18ef74b65b6" name = "github.com/stretchr/objx" packages = ["."] + pruneopts = "" revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c" version = "v0.1.1" [[projects]] + digest = "1:c587772fb8ad29ad4db67575dad25ba17a51f072ff18a22b4f0257a4d9c24f75" name = "github.com/stretchr/testify" packages = [ "assert", - "mock" + "mock", ] + pruneopts = "" revision = "f35b8ab0b5a2cef36673838d662e249dd9c94686" version = "v1.2.2" [[projects]] branch = "master" - name = "github.com/tevino/abool" - packages = ["."] - revision = "9b9efcf221b50905aab9bbabd3daed56dc10f339" - -[[projects]] - branch = "master" + digest = "1:2ed0bf267e44950120acd95570227e28184573ffb099bd85b529ee148e004ddb" name = "golang.org/x/sys" packages = ["unix"] - revision = "fc8bd948cf46f9c7af0f07d34151ce25fe90e477" + pruneopts = "" + revision = "fa43e7bc11baaae89f3f902b2b4d832b68234844" [[projects]] + digest = "1:4014584c076f25aaf35d9de36c79ae2d208bba32c780f405e3395dad79292e22" name = "gopkg.in/DATA-DOG/go-sqlmock.v1" packages = ["."] + pruneopts = "" revision = "d76b18b42f285b792bf985118980ce9eacea9d10" version = "v1.3.0" [[projects]] + digest = "1:a4fcebba8c1b8ae25dd31f1cfa6c6ed5b1e5719ab2e9e1c18e8c98969ebde215" name = "gopkg.in/inconshreveable/log15.v2" packages = ["."] - revision = "0decfc6c20d9ca0ad143b0e89dcaa20f810b4fb3" - version = "v2.13" + pruneopts = "" + revision = "67afb5ed74ec82fd7ac8f49d27c509ac6f991970" + version = "v2.14" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "1db35b4cd812295cd019cdba2884b74ff048756df9005a6c0c222a0150d5fa51" + input-imports = [ + "github.com/Shopify/sarama", + "github.com/bsm/sarama-cluster", + "github.com/msales/pkg/cache", + "github.com/msales/pkg/stats", + "github.com/pkg/errors", + "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", + "gopkg.in/DATA-DOG/go-sqlmock.v1", + "gopkg.in/inconshreveable/log15.v2", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 2f737a4..b3cd1bb 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -14,10 +14,6 @@ name = "github.com/bsm/sarama-cluster" version = "2.*" -[[constraint]] - branch = "master" - name = "github.com/tevino/abool" - [[constraint]] name = "github.com/stretchr/testify" version = "1.*" diff --git a/cache/sink.go b/cache/sink.go index 5fa9bb1..5d2d8a7 100644 --- a/cache/sink.go +++ b/cache/sink.go @@ -7,6 +7,7 @@ import ( "github.com/msales/streams" ) +// Sink represents a Cache streams sink. type Sink struct { pipe streams.Pipe diff --git a/example/benchmark/main.go b/example/benchmark/main.go new file mode 100644 index 0000000..6b94002 --- /dev/null +++ b/example/benchmark/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "log" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/msales/pkg/stats" + "github.com/msales/streams" +) + +import _ "net/http/pprof" + +func main() { + ctx := context.Background() + + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + + client, err := stats.NewBufferedStatsd("localhost:8125", "streams.example") + if err != nil { + log.Fatal(err.Error()) + } + ctx = stats.WithStats(ctx, client) + + task, err := task(ctx) + if err != nil { + log.Fatal(err.Error()) + } + task.Start() + defer task.Close() + + // Wait for SIGTERM + <-waitForSignals() +} + +func task(ctx context.Context) (streams.Task, error) { + builder := streams.NewStreamBuilder() + builder.Source("nil-source", newNilSource(ctx)). + Map("do-nothing", nothingMapper) + + task := streams.NewTask(builder.Build()) + task.OnError(func(err error) { + log.Fatal(err.Error()) + }) + + return task, nil +} + +type nilSource struct { + ctx context.Context +} + +func newNilSource(ctx context.Context) streams.Source { + return &nilSource{ + ctx: ctx, + } +} + +func (s *nilSource) Consume() (*streams.Message, error) { + return streams.NewMessageWithContext(s.ctx, nil, nil), nil +} + +func (s *nilSource) Commit(v interface{}) error { + return nil +} + +func (s *nilSource) Close() error { + return nil +} + +func nothingMapper(msg *streams.Message) (*streams.Message, error) { + return msg, nil +} + +func waitForSignals() chan os.Signal { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + return sigs +} diff --git a/example/branch/main.go b/example/branch/main.go index 9869696..848a845 100644 --- a/example/branch/main.go +++ b/example/branch/main.go @@ -28,14 +28,14 @@ func main() { builder := streams.NewStreamBuilder() - s := builder.Source("rand-source", NewRandIntSource(ctx)). - Branch("branch", BranchEvenNumberFilter, BranchOddNumberFilter) + s := builder.Source("rand-source", newRandIntSource(ctx)). + Branch("branch", branchEvenNumberFilter, branchOddNumberFilter) // Event numbers s[0].Print("print-event") // Odd Numbers - s[1].Map("negative-mapper", NegativeMapper). + s[1].Map("negative-mapper", negativeMapper). Print("print-negative") task := streams.NewTask(builder.Build()) @@ -43,67 +43,58 @@ func main() { log.Fatal(err.Error()) }) task.Start() + defer task.Close() // Wait for SIGTERM - done := listenForSignals() - <-done - - task.Close() + <-waitForSignals() } -type RandIntSource struct { +type randIntSource struct { ctx context.Context rand *rand.Rand } -func NewRandIntSource(ctx context.Context) streams.Source { - return &RandIntSource{ +func newRandIntSource(ctx context.Context) streams.Source { + return &randIntSource{ ctx: ctx, rand: rand.New(rand.NewSource(1234)), } } -func (s *RandIntSource) Consume() (*streams.Message, error) { +func (s *randIntSource) Consume() (*streams.Message, error) { return streams.NewMessageWithContext(s.ctx, nil, s.rand.Intn(100)), nil } -func (s *RandIntSource) Commit(v interface{}) error { +func (s *randIntSource) Commit(v interface{}) error { return nil } -func (s *RandIntSource) Close() error { +func (s *randIntSource) Close() error { return nil } -func BranchOddNumberFilter(msg *streams.Message) (bool, error) { +func branchOddNumberFilter(msg *streams.Message) (bool, error) { num := msg.Value.(int) return num%2 == 1, nil } -func BranchEvenNumberFilter(msg *streams.Message) (bool, error) { +func branchEvenNumberFilter(msg *streams.Message) (bool, error) { num := msg.Value.(int) return num%2 == 0, nil } -func NegativeMapper(msg *streams.Message) (*streams.Message, error) { +func negativeMapper(msg *streams.Message) (*streams.Message, error) { num := msg.Value.(int) msg.Value = num * -1 return msg, nil } -func listenForSignals() chan bool { +func waitForSignals() chan os.Signal { sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigs - done <- true - }() - - return done + return sigs } diff --git a/example/kafka/main.go b/example/kafka/main.go index 9556fe5..c67b8cd 100644 --- a/example/kafka/main.go +++ b/example/kafka/main.go @@ -39,13 +39,11 @@ func main() { p.Start() c.Start() + defer c.Close() + defer p.Close() // Wait for SIGTERM - done := listenForSignals() - <-done - - p.Close() - c.Close() + <-waitForSignals() } func producerTask(ctx context.Context, brokers []string, c *sarama.Config) (streams.Task, error) { @@ -61,8 +59,8 @@ func producerTask(ctx context.Context, brokers []string, c *sarama.Config) (stre } builder := streams.NewStreamBuilder() - builder.Source("rand-source", NewRandIntSource(ctx)). - Map("to-string", StringMapper). + builder.Source("rand-source", newRandIntSource(ctx)). + Map("to-string", stringMapper). Process("kafka-sink", sink) task := streams.NewTask(builder.Build()) @@ -89,7 +87,7 @@ func consumerTask(ctx context.Context, brokers []string, c *sarama.Config) (stre builder := streams.NewStreamBuilder() builder.Source("kafka-source", src). - Map("to-int", IntMapper). + Map("to-int", intMapper). Print("print") task := streams.NewTask(builder.Build()) @@ -100,38 +98,38 @@ func consumerTask(ctx context.Context, brokers []string, c *sarama.Config) (stre return task, nil } -type RandIntSource struct { +type randIntSource struct { ctx context.Context rand *rand.Rand } -func NewRandIntSource(ctx context.Context) streams.Source { - return &RandIntSource{ +func newRandIntSource(ctx context.Context) streams.Source { + return &randIntSource{ ctx: ctx, rand: rand.New(rand.NewSource(1234)), } } -func (s *RandIntSource) Consume() (*streams.Message, error) { +func (s *randIntSource) Consume() (*streams.Message, error) { return streams.NewMessageWithContext(s.ctx, nil, s.rand.Intn(100)), nil } -func (s *RandIntSource) Commit(v interface{}) error { +func (s *randIntSource) Commit(v interface{}) error { return nil } -func (s *RandIntSource) Close() error { +func (s *randIntSource) Close() error { return nil } -func StringMapper(msg *streams.Message) (*streams.Message, error) { +func stringMapper(msg *streams.Message) (*streams.Message, error) { i := msg.Value.(int) msg.Value = strconv.Itoa(i) return msg, nil } -func IntMapper(msg *streams.Message) (*streams.Message, error) { +func intMapper(msg *streams.Message) (*streams.Message, error) { s := msg.Value.(string) i, err := strconv.Atoi(s) if err != nil { @@ -143,16 +141,9 @@ func IntMapper(msg *streams.Message) (*streams.Message, error) { return msg, nil } -func listenForSignals() chan bool { +func waitForSignals() chan os.Signal { sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigs - done <- true - }() - - return done + return sigs } diff --git a/example/merge/main.go b/example/merge/main.go index 2cd5524..fa16c28 100644 --- a/example/merge/main.go +++ b/example/merge/main.go @@ -13,12 +13,12 @@ import ( func main() { builder := streams.NewStreamBuilder() - stream1 := builder.Source("rand1-source", NewRandIntSource()). - Filter("filter1", LowNumberFilter) + stream1 := builder.Source("rand1-source", newRandIntSource()). + Filter("filter1", lowNumberFilter) - builder.Source("rand2-source", NewRandIntSource()). - Filter("filter2", HighNumberFilter). - Map("add-hundedred-mapper", AddHundredMapper). + builder.Source("rand2-source", newRandIntSource()). + Filter("filter2", highNumberFilter). + Map("add-hundedred-mapper", addHundredMapper). Merge("merge", stream1). Print("print") @@ -27,65 +27,56 @@ func main() { log.Fatal(err.Error()) }) task.Start() + defer task.Close() // Wait for SIGTERM - done := listenForSignals() - <-done - - task.Close() + <-waitForSignals() } -type RandIntSource struct { +type randIntSource struct { rand *rand.Rand } -func NewRandIntSource() streams.Source { - return &RandIntSource{ +func newRandIntSource() streams.Source { + return &randIntSource{ rand: rand.New(rand.NewSource(1234)), } } -func (s *RandIntSource) Consume() (*streams.Message, error) { +func (s *randIntSource) Consume() (*streams.Message, error) { return streams.NewMessage(nil, s.rand.Intn(100)), nil } -func (s *RandIntSource) Commit(v interface{}) error { +func (s *randIntSource) Commit(v interface{}) error { return nil } -func (s *RandIntSource) Close() error { +func (s *randIntSource) Close() error { return nil } -func LowNumberFilter(msg *streams.Message) (bool, error) { +func lowNumberFilter(msg *streams.Message) (bool, error) { num := msg.Value.(int) return num < 50, nil } -func HighNumberFilter(msg *streams.Message) (bool, error) { +func highNumberFilter(msg *streams.Message) (bool, error) { num := msg.Value.(int) return num >= 50, nil } -func AddHundredMapper(msg *streams.Message) (*streams.Message, error) { +func addHundredMapper(msg *streams.Message) (*streams.Message, error) { num := msg.Value.(int) msg.Value = num + 100 return msg, nil } -func listenForSignals() chan bool { +func waitForSignals() chan os.Signal { sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigs - done <- true - }() - - return done + return sigs } diff --git a/example/simple/main.go b/example/simple/main.go index ed5f189..07f92a8 100644 --- a/example/simple/main.go +++ b/example/simple/main.go @@ -12,9 +12,9 @@ import ( func main() { builder := streams.NewStreamBuilder() - builder.Source("rand-source", NewRandIntSource()). - Filter("odd-filter", OddNumberFilter). - Map("double-mapper", DoubleMapper). + builder.Source("rand-source", newRandIntSource()). + Filter("odd-filter", oddNumberFilter). + Map("double-mapper", doubleMapper). Print("print") task := streams.NewTask(builder.Build()) @@ -22,59 +22,50 @@ func main() { log.Fatal(err.Error()) }) task.Start() + defer task.Close() // Wait for SIGTERM - done := listenForSignals() - <-done - - task.Close() + <-waitForSignals() } -type RandomIntSource struct { +type randIntSource struct { rand *rand.Rand } -func NewRandIntSource() streams.Source { - return &RandomIntSource{ +func newRandIntSource() streams.Source { + return &randIntSource{ rand: rand.New(rand.NewSource(1234)), } } -func (s *RandomIntSource) Consume() (*streams.Message, error) { +func (s *randIntSource) Consume() (*streams.Message, error) { return streams.NewMessage(nil, s.rand.Intn(100)), nil } -func (s *RandomIntSource) Commit(v interface{}) error { +func (s *randIntSource) Commit(v interface{}) error { return nil } -func (s *RandomIntSource) Close() error { +func (s *randIntSource) Close() error { return nil } -func OddNumberFilter(msg *streams.Message) (bool, error) { +func oddNumberFilter(msg *streams.Message) (bool, error) { num := msg.Value.(int) return num%2 == 1, nil } -func DoubleMapper(msg *streams.Message) (*streams.Message, error) { +func doubleMapper(msg *streams.Message) (*streams.Message, error) { num := msg.Value.(int) msg.Value = num * 2 return msg, nil } -func listenForSignals() chan bool { +func waitForSignals() chan os.Signal { sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigs - done <- true - }() - - return done + return sigs } diff --git a/kafka/encoder.go b/kafka/encoder.go index ab01e1a..420ac37 100644 --- a/kafka/encoder.go +++ b/kafka/encoder.go @@ -1,21 +1,29 @@ package kafka +// Decoder represents a Kafka data decoder. type Decoder interface { + // Decode transforms byte data to the desired type. Decode([]byte) (interface{}, error) } +// Encoder represents a Kafka data encoder. type Encoder interface { + // Encode transforms the typed data to bytes. Encode(interface{}) ([]byte, error) } +// ByteDecoder represents a byte decoder. type ByteDecoder struct{} +// Decode transforms byte data to the desired type. func (d ByteDecoder) Decode(b []byte) (interface{}, error) { return b, nil } +// ByteEncoder represents a byte encoder. type ByteEncoder struct{} +// Encode transforms the typed data to bytes. func (e ByteEncoder) Encode(v interface{}) ([]byte, error) { if v == nil { return nil, nil @@ -24,14 +32,18 @@ func (e ByteEncoder) Encode(v interface{}) ([]byte, error) { return v.([]byte), nil } +// StringDecoder represents a string decoder. type StringDecoder struct{} +// Decode transforms byte data to a string. func (d StringDecoder) Decode(b []byte) (interface{}, error) { return string(b), nil } +// StringEncoder represents a string encoder. type StringEncoder struct{} +// Encode transforms the string data to bytes. func (e StringEncoder) Encode(v interface{}) ([]byte, error) { if v == nil { return nil, nil diff --git a/kafka/sink.go b/kafka/sink.go index 6009140..0ffbb1c 100644 --- a/kafka/sink.go +++ b/kafka/sink.go @@ -5,6 +5,7 @@ import ( "github.com/msales/streams" ) +// SinkConfig represents the configuration of a Sink. type SinkConfig struct { sarama.Config @@ -17,6 +18,7 @@ type SinkConfig struct { BatchSize int } +// NewSinkConfig creates a new SinkConfig. func NewSinkConfig() *SinkConfig { c := &SinkConfig{ Config: *sarama.NewConfig(), @@ -39,7 +41,7 @@ func (c *SinkConfig) Validate() error { switch { case c.Brokers == nil || len(c.Brokers) == 0: - return sarama.ConfigurationError("Brokers mut have at least one broker") + return sarama.ConfigurationError("Brokers must have at least one broker") case c.KeyEncoder == nil: return sarama.ConfigurationError("KeyEncoder must be an instance of Encoder") case c.ValueEncoder == nil: @@ -51,6 +53,7 @@ func (c *SinkConfig) Validate() error { return nil } +// Sink represents a Kafka streams sink. type Sink struct { pipe streams.Pipe @@ -65,6 +68,7 @@ type Sink struct { buf []*sarama.ProducerMessage } +// NewSink creates a new Kafka sink. func NewSink(c *SinkConfig) (*Sink, error) { if err := c.Validate(); err != nil { return nil, err diff --git a/kafka/source.go b/kafka/source.go index 9f27738..8f8c37d 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -54,7 +54,7 @@ func (c *SourceConfig) Validate() error { switch { case c.Brokers == nil || len(c.Brokers) == 0: - return sarama.ConfigurationError("Brokers mut have at least one broker") + return sarama.ConfigurationError("Brokers must have at least one broker") case c.KeyDecoder == nil: return sarama.ConfigurationError("KeyDecoder must be an instance of Decoder") case c.ValueDecoder == nil: diff --git a/message.go b/message.go index d2d07d3..32dc5e8 100644 --- a/message.go +++ b/message.go @@ -4,6 +4,7 @@ import ( "context" ) +// Message represents data the flows through the stream. type Message struct { metadata map[Source]interface{} @@ -12,20 +13,24 @@ type Message struct { Value interface{} } +// Metadata returns the Message metadata. func (m *Message) Metadata() map[Source]interface{} { return m.metadata } +// WithMetadata add metadata to the Message from a Source. func (m *Message) WithMetadata(s Source, v interface{}) *Message { m.metadata[s] = v return m } +// Empty determines if the Message is empty. func (m Message) Empty() bool { return m.Key == nil && m.Value == nil } +// NewMessage creates a Message. func NewMessage(k, v interface{}) *Message { return &Message{ metadata: map[Source]interface{}{}, @@ -35,6 +40,7 @@ func NewMessage(k, v interface{}) *Message { } } +// NewMessageWithContext creates a Message with the given context. func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message { return &Message{ metadata: map[Source]interface{}{}, @@ -43,8 +49,3 @@ func NewMessageWithContext(ctx context.Context, k, v interface{}) *Message { Value: v, } } - -type NodeMessage struct { - Node Node - Msg *Message -} diff --git a/message_test.go b/message_test.go index 6906d0d..e225e65 100644 --- a/message_test.go +++ b/message_test.go @@ -34,8 +34,10 @@ func TestNewMessage(t *testing.T) { assert.Equal(t, "test", msg.Value) } +type ctxKey string + func TestNewMessageWithContext(t *testing.T) { - ctx := context.WithValue(context.Background(), "1", "2") + ctx := context.WithValue(context.Background(), ctxKey("1"), "2") msg := NewMessageWithContext(ctx, "test", "test") assert.Equal(t, ctx, msg.Ctx) diff --git a/mocks/pipe.go b/mocks/pipe.go index 5ecf27c..4b33fbf 100644 --- a/mocks/pipe.go +++ b/mocks/pipe.go @@ -13,11 +13,19 @@ type record struct { index int } +// ChildMessage represents a message forwarded to a child index. +type ChildMessage struct { + Index int + Msg *streams.Message +} + +var _ = (streams.Pipe)(&Pipe{}) + // Pipe is a mock Pipe. type Pipe struct { t *testing.T - queue []streams.NodeMessage + msgs []ChildMessage shouldError bool @@ -29,14 +37,14 @@ type Pipe struct { func NewPipe(t *testing.T) *Pipe { return &Pipe{ t: t, - queue: []streams.NodeMessage{}, + msgs: []ChildMessage{}, expectForward: []record{}, } } -// Queue gets the queued Messages for each Node. -func (p *Pipe) Queue() []streams.NodeMessage { - return p.queue +// Messages gets the queued Messages for each Node. +func (p *Pipe) Messages() []ChildMessage { + return p.msgs } // Forward queues the data to all processor children in the topology. @@ -58,12 +66,12 @@ func (p *Pipe) Forward(msg *streams.Message) error { return errors.New("test") } - p.queue = append(p.queue, streams.NodeMessage{Node: nil, Msg: msg}) + p.msgs = append(p.msgs, ChildMessage{Index: -1, Msg: msg}) return nil } -// Forward queues the data to the the given processor(s) child in the topology. +// ForwardToChild queues the data to the the given processor(s) child in the topology. func (p *Pipe) ForwardToChild(msg *streams.Message, index int) error { if len(p.expectForward) == 0 { p.t.Error("streams: mock: Unexpected call to ForwardToChild") @@ -83,7 +91,7 @@ func (p *Pipe) ForwardToChild(msg *streams.Message, index int) error { return errors.New("test") } - p.queue = append(p.queue, streams.NodeMessage{Node: nil, Msg: msg}) + p.msgs = append(p.msgs, ChildMessage{Index: index, Msg: msg}) return nil } @@ -103,22 +111,28 @@ func (p *Pipe) Commit(msg *streams.Message) error { return nil } +// ShouldError indicates that an error should be returned on the +// next operation. func (p *Pipe) ShouldError() { p.shouldError = true } +// ExpectForward registers an expectation of a Forward of the Pipe. func (p *Pipe) ExpectForward(k, v interface{}) { p.expectForward = append(p.expectForward, record{k, v, -1}) } +// ExpectForwardToChild registers an expectation of a ForwardToChild the Pipe. func (p *Pipe) ExpectForwardToChild(k, v interface{}, index int) { p.expectForward = append(p.expectForward, record{k, v, index}) } +// ExpectCommit registers an expectation of a Commit the Pipe. func (p *Pipe) ExpectCommit() { p.expectCommit = true } +// AssertExpectations asserts that the expectations were met. func (p *Pipe) AssertExpectations() { if len(p.expectForward) > 0 { p.t.Error("streams: mock: Expected a call to Forward or ForwardToChild but got none") diff --git a/mocks/pipe_test.go b/mocks/pipe_test.go index 1bfd2cb..7f6e625 100644 --- a/mocks/pipe_test.go +++ b/mocks/pipe_test.go @@ -16,30 +16,30 @@ func TestPipe_ImplementsPipeInterface(t *testing.T) { } } -func TestPipe_QueueForForward(t *testing.T) { +func TestPipe_MessagesForForward(t *testing.T) { msg := streams.NewMessage("test", "test") p := mocks.NewPipe(t) p.ExpectForward("test", "test") p.Forward(msg) - queue := p.Queue() - assert.Len(t, queue, 1) - assert.Nil(t, queue[0].Node) - assert.Exactly(t, msg, queue[0].Msg) + msgs := p.Messages() + assert.Len(t, msgs, 1) + assert.Exactly(t, -1, msgs[0].Index) + assert.Exactly(t, msg, msgs[0].Msg) } -func TestPipe_QueueForForwardToChild(t *testing.T) { +func TestPipe_MessagesForForwardToChild(t *testing.T) { msg := streams.NewMessage("test", "test") p := mocks.NewPipe(t) p.ExpectForwardToChild("test", "test", 0) p.ForwardToChild(msg, 0) - queue := p.Queue() - assert.Len(t, queue, 1) - assert.Nil(t, queue[0].Node) - assert.Exactly(t, msg, queue[0].Msg) + msgs := p.Messages() + assert.Len(t, msgs, 1) + assert.Exactly(t, 0, msgs[0].Index) + assert.Exactly(t, msg, msgs[0].Msg) } func TestPipe_HandlesExpectations(t *testing.T) { diff --git a/mocks_test.go b/mocks_test.go index 445b3ea..f451b7d 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -1,10 +1,14 @@ package streams_test import ( + "time" + "github.com/msales/streams" "github.com/stretchr/testify/mock" ) +var _ = (streams.Node)(&MockNode{}) + type MockNode struct { mock.Mock } @@ -14,10 +18,6 @@ func (mn *MockNode) Name() string { return args.String(0) } -func (mn *MockNode) WithPipe(pipe streams.Pipe) { - mn.Called(pipe) -} - func (mn *MockNode) AddChild(n streams.Node) { mn.Called(n) } @@ -27,16 +27,44 @@ func (mn *MockNode) Children() []streams.Node { return args.Get(0).([]streams.Node) } -func (mn *MockNode) Process(msg *streams.Message) ([]streams.NodeMessage, error) { - args := mn.Called(msg) - return args.Get(0).([]streams.NodeMessage), args.Error(1) +func (mn *MockNode) Processor() (streams.Processor) { + args := mn.Called() + return args.Get(0).(streams.Processor) } -func (mn *MockNode) Close() error { - args := mn.Called() +var _ = (streams.TimedPipe)(&MockTimedPipe{}) + +type MockTimedPipe struct { + mock.Mock +} + +func (p *MockTimedPipe) Reset() { + p.Called() +} + +func (p *MockTimedPipe) Duration() time.Duration { + args := p.Called() + return args.Get(0).(time.Duration) +} + +var _ = (streams.Pump)(&MockPump{}) + +type MockPump struct { + mock.Mock +} + +func (p *MockPump) Process(msg *streams.Message) error { + args := p.Called(msg) return args.Error(0) } +func (p *MockPump) Close() error { + args := p.Called() + return args.Error(0) +} + +var _ = (streams.Processor)(&MockProcessor{}) + type MockProcessor struct { mock.Mock } @@ -55,6 +83,8 @@ func (p *MockProcessor) Close() error { return args.Error(0) } +var _ = (streams.Source)(&MockSource{}) + type MockSource struct { mock.Mock } diff --git a/pipe.go b/pipe.go index 6e126af..9b46393 100644 --- a/pipe.go +++ b/pipe.go @@ -1,15 +1,21 @@ package streams import ( + "time" + "github.com/pkg/errors" ) +// TimedPipe represents a pipe that can accumulate execution time. +type TimedPipe interface { + // Reset resets the accumulative pipe duration. + Reset() + // Duration returns the accumulative pipe duration. + Duration() time.Duration +} + // Pipe allows messages to flow through the processors. type Pipe interface { - // Queue gets the queued Messages for each Node. - // - // This method should not be used by Processors. - Queue() []NodeMessage // Forward queues the data to all processor children in the topology. Forward(*Message) error // Forward queues the data to the the given processor(s) child in the topology. @@ -18,54 +24,61 @@ type Pipe interface { Commit(*Message) error } -// ProcessorPipe represents the pipe for processors. -type ProcessorPipe struct { - node Node - queue []NodeMessage +var _ = (TimedPipe)(&processorPipe{}) + +// processorPipe represents the pipe for processors. +type processorPipe struct { + children []Pump + + duration time.Duration } -// NewProcessorPipe create a new ProcessorPipe instance. -func NewProcessorPipe(node Node) *ProcessorPipe { - return &ProcessorPipe{ - node: node, - queue: []NodeMessage{}, +// NewPipe create a new processorPipe instance. +func NewPipe(children []Pump) Pipe { + return &processorPipe{ + children: children, } } -// Queue gets the queued Messages for each Node. -// -// Reading the node message queue will reset the queue. -func (p *ProcessorPipe) Queue() []NodeMessage { - defer func() { - p.queue = p.queue[:0] - }() +// Reset resets the accumulative pipe duration. +func (p *processorPipe) Reset() { + p.duration = 0 +} - return p.queue +// Duration returns the accumulative pipe duration. +func (p *processorPipe) Duration() time.Duration { + return p.duration } // Forward queues the data to all processor children in the topology. -func (p *ProcessorPipe) Forward(msg *Message) error { - for _, child := range p.node.Children() { - p.queue = append(p.queue, NodeMessage{child, msg}) +func (p *processorPipe) Forward(msg *Message) error { + defer p.time(time.Now()) + + for _, child := range p.children { + if err := child.Process(msg); err != nil { + return err + } } return nil } // Forward queues the data to the the given processor(s) child in the topology. -func (p *ProcessorPipe) ForwardToChild(msg *Message, index int) error { - if index > len(p.node.Children())-1 { +func (p *processorPipe) ForwardToChild(msg *Message, index int) error { + defer p.time(time.Now()) + + if index > len(p.children)-1 { return errors.New("streams: child index out of bounds") } - child := p.node.Children()[index] - p.queue = append(p.queue, NodeMessage{child, msg}) - - return nil + child := p.children[index] + return child.Process(msg) } // Commit commits the current state in the sources. -func (p *ProcessorPipe) Commit(msg *Message) error { +func (p *processorPipe) Commit(msg *Message) error { + defer p.time(time.Now()) + for s, v := range msg.Metadata() { if err := s.Commit(v); err != nil { return err @@ -74,3 +87,8 @@ func (p *ProcessorPipe) Commit(msg *Message) error { return nil } + +// time adds the duration of the function to the pipe accumulative duration. +func (p *processorPipe) time(t time.Time) { + p.duration += time.Since(t) +} diff --git a/pipe_test.go b/pipe_test.go index 92f0e21..e94f81f 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -8,44 +8,99 @@ import ( "github.com/stretchr/testify/assert" ) +func TestProcessorPipe_Duration(t *testing.T) { + msg := streams.NewMessage("test", "test") + child1 := new(MockPump) + child1.On("Process", msg).Return(nil) + child2 := new(MockPump) + child2.On("Process", msg).Return(nil) + pipe := streams.NewPipe([]streams.Pump{child1, child2}) + tPipe := pipe.(streams.TimedPipe) + + err := pipe.Forward(msg) + assert.NoError(t, err) + + d := tPipe.Duration() + + if d == 0 { + assert.Fail(t, "Pipe Duration returned 0") + } +} + +func TestProcessorPipe_Reset(t *testing.T) { + msg := streams.NewMessage("test", "test") + child1 := new(MockPump) + child1.On("Process", msg).Return(nil) + child2 := new(MockPump) + child2.On("Process", msg).Return(nil) + pipe := streams.NewPipe([]streams.Pump{child1, child2}) + tPipe := pipe.(streams.TimedPipe) + + err := pipe.Forward(msg) + assert.NoError(t, err) + tPipe.Reset() + + d := tPipe.Duration() + + if d != 0 { + assert.Fail(t, "Pipe Duration did not return 0") + } +} + func TestProcessorPipe_Forward(t *testing.T) { msg := streams.NewMessage("test", "test") - child1 := new(MockNode) - child2 := new(MockNode) - parent := new(MockNode) - parent.On("Children").Return([]streams.Node{child1, child2}) - pipe := streams.NewProcessorPipe(parent) - - pipe.Forward(msg) - - queue := pipe.Queue() - assert.Len(t, queue, 2) - assert.Exactly(t, child1, queue[0].Node) - assert.Exactly(t, msg, queue[0].Msg) - assert.Exactly(t, child2, queue[1].Node) - assert.Exactly(t, msg, queue[1].Msg) + child1 := new(MockPump) + child1.On("Process", msg).Return(nil) + child2 := new(MockPump) + child2.On("Process", msg).Return(nil) + pipe := streams.NewPipe([]streams.Pump{child1, child2}) + + err := pipe.Forward(msg) + + assert.NoError(t, err) + child1.AssertExpectations(t) + child2.AssertExpectations(t) +} + +func TestProcessorPipe_ForwardError(t *testing.T) { + msg := streams.NewMessage("test", "test") + child1 := new(MockPump) + child1.On("Process", msg).Return(errors.New("test")) + pipe := streams.NewPipe([]streams.Pump{child1}) + + err := pipe.Forward(msg) + + assert.Error(t, err) + child1.AssertExpectations(t) } func TestProcessorPipe_ForwardToChild(t *testing.T) { msg := streams.NewMessage("test", "test") - child := new(MockNode) - parent := new(MockNode) - parent.On("Children").Return([]streams.Node{nil, child}) - pipe := streams.NewProcessorPipe(parent) + child1 := new(MockPump) + child2 := new(MockPump) + child2.On("Process", msg).Return(nil) + pipe := streams.NewPipe([]streams.Pump{child1, child2}) - pipe.ForwardToChild(msg, 1) + err := pipe.ForwardToChild(msg, 1) - queue := pipe.Queue() - assert.Len(t, queue, 1) - assert.Exactly(t, child, queue[0].Node) - assert.Exactly(t, msg, queue[0].Msg) + assert.NoError(t, err) + child2.AssertExpectations(t) } func TestProcessorPipe_ForwardToChildIndexError(t *testing.T) { msg := streams.NewMessage("test", "test") - parent := new(MockNode) - parent.On("Children").Return([]streams.Node{}) - pipe := streams.NewProcessorPipe(parent) + child1 := new(MockPump) + child1.On("Process", msg).Return(errors.New("test")) + pipe := streams.NewPipe([]streams.Pump{child1}) + + err := pipe.ForwardToChild(msg, 1) + + assert.Error(t, err) +} + +func TestProcessorPipe_ForwardToChildError(t *testing.T) { + msg := streams.NewMessage("test", "test") + pipe := streams.NewPipe([]streams.Pump{}) err := pipe.ForwardToChild(msg, 1) @@ -55,9 +110,8 @@ func TestProcessorPipe_ForwardToChildIndexError(t *testing.T) { func TestProcessorPipe_Commit(t *testing.T) { src := new(MockSource) src.On("Commit", interface{}("test")).Return(nil) - node := new(MockNode) msg := streams.NewMessage(nil, nil).WithMetadata(src, "test") - pipe := streams.NewProcessorPipe(node) + pipe := streams.NewPipe([]streams.Pump{}) err := pipe.Commit(msg) @@ -68,9 +122,8 @@ func TestProcessorPipe_Commit(t *testing.T) { func TestProcessorPipe_CommitWithError(t *testing.T) { src := new(MockSource) src.On("Commit", interface{}("test")).Return(errors.New("test")) - node := new(MockNode) msg := streams.NewMessage(nil, nil).WithMetadata(src, "test") - pipe := streams.NewProcessorPipe(node) + pipe := streams.NewPipe([]streams.Pump{}) err := pipe.Commit(msg) diff --git a/processor_test.go b/processor_test.go index dbfb9c9..cbc6932 100644 --- a/processor_test.go +++ b/processor_test.go @@ -220,9 +220,9 @@ func TestMergeProcessor_ProcessMergesMetadata(t *testing.T) { p.Process(streams.NewMessage(nil, "test").WithMetadata(src1, "test1")) p.Process(streams.NewMessage("test", "test").WithMetadata(src2, "test2")) - queue := pipe.Queue() - assert.Len(t, queue, 2) - msg := queue[1].Msg + msgs := pipe.Messages() + assert.Len(t, msgs, 2) + msg := msgs[1].Msg assert.Len(t, msg.Metadata(), 2) assert.Equal(t, "test1", msg.Metadata()[src1]) assert.Equal(t, "test2", msg.Metadata()[src2]) diff --git a/pump.go b/pump.go new file mode 100644 index 0000000..870113c --- /dev/null +++ b/pump.go @@ -0,0 +1,192 @@ +package streams + +import ( + "context" + "sync" + "time" + + "github.com/msales/pkg/stats" +) + +// Pump represent a Message pump. +type Pump interface { + // Process processes a message in the Pump. + Process(*Message) error + // Close closes the pump. + Close() error +} + +// processorPump is an asynchronous Message Pump. +type processorPump struct { + name string + processor Processor + pipe TimedPipe + errFn ErrorFunc + + ch chan *Message + + wg sync.WaitGroup +} + +// NewPump creates a new processorPump instance. +func NewPump(node Node, pipe TimedPipe, errFn ErrorFunc) Pump { + p := &processorPump{ + name: node.Name(), + processor: node.Processor(), + pipe: pipe, + errFn: errFn, + ch: make(chan *Message, 1000), + } + + go p.run() + + return p +} + +func (p *processorPump) run() { + p.wg.Add(1) + defer p.wg.Done() + + for msg := range p.ch { + p.pipe.Reset() + start := time.Now() + + if err := p.processor.Process(msg); err != nil { + p.errFn(err) + return + } + + latency := time.Since(start) - p.pipe.Duration() + withStats(msg.Ctx, func(s stats.Stats) { + s.Timing("node.latency", latency, 0.1, "name", p.name) + s.Inc("node.throughput", 1, 0.1, "name", p.name) + s.Gauge("node.back-pressure", pressure(p.ch), 0.1, "name", p.name) + }) + } +} + +// Process processes a message in the Pump. +func (p *processorPump) Process(msg *Message) error { + p.ch <- msg + + return nil +} + +// Close closes the pump. +func (p *processorPump) Close() error { + close(p.ch) + + p.wg.Wait() + + return p.processor.Close() +} + +// pressure calculates how full a channel is. +func pressure(ch chan *Message) float64 { + l := float64(len(ch)) + c := float64(cap(ch)) + + return l / c * 100 +} + +// SourcePump represents a Message pump for sources. +type SourcePump interface { + // Stop stops the source pump from running. + Stop() + // Close closed the source pump. + Close() error +} + +// SourcePumps represents a set of source pumps. +type SourcePumps []SourcePump + +// StopAll stops all source pumps. +func (p SourcePumps) StopAll() { + for _, sp := range p { + sp.Stop() + } +} + +// sourcePump represents a Message pump for sources. +type sourcePump struct { + name string + source Source + pumps []Pump + errFn ErrorFunc + + quit chan struct{} + wg sync.WaitGroup +} + +// NewSourcePump creates a new SourcePump. +func NewSourcePump(name string, source Source, pumps []Pump, errFn ErrorFunc) SourcePump { + p := &sourcePump{ + name: name, + source: source, + pumps: pumps, + errFn: errFn, + quit: make(chan struct{}, 2), + } + + go p.run() + + return p +} + +func (p *sourcePump) run() { + p.wg.Add(1) + defer p.wg.Done() + + for { + select { + case <-p.quit: + return + default: + start := time.Now() + + msg, err := p.source.Consume() + if err != nil { + go p.errFn(err) + return + } + + if msg.Empty() { + continue + } + + latency := time.Since(start) + withStats(msg.Ctx, func(s stats.Stats) { + s.Timing("node.latency", latency, 0.1, "name", p.name) + s.Inc("node.throughput", 1, 0.1, "name", p.name) + }) + + for _, pump := range p.pumps { + err = pump.Process(msg) + if err != nil { + go p.errFn(err) + return + } + } + } + } +} + +// Stop stops the source pump from running. +func (p *sourcePump) Stop() { + p.quit <- struct{}{} + + p.wg.Wait() +} + +// Close closes the source pump. +func (p *sourcePump) Close() error { + close(p.quit) + + return p.source.Close() +} + +func withStats(ctx context.Context, fn func(s stats.Stats)) { + if s, ok := stats.FromContext(ctx); ok { + fn(s) + } +} diff --git a/pump_test.go b/pump_test.go new file mode 100644 index 0000000..a54d434 --- /dev/null +++ b/pump_test.go @@ -0,0 +1,142 @@ +package streams_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/msales/pkg/stats" + "github.com/msales/streams" + "github.com/stretchr/testify/assert" +) + +func TestProcessorPump_Process(t *testing.T) { + ctx := stats.WithStats(context.Background(), stats.Null) + msg := streams.NewMessageWithContext(ctx, "test", "test") + processor := new(MockProcessor) + processor.On("Process", msg).Return(nil) + processor.On("Close").Maybe().Return(nil) + node := streams.NewProcessorNode("test", processor) + pipe := new(MockTimedPipe) + pipe.On("Reset") + pipe.On("Duration").Return(time.Duration(0)) + p := streams.NewPump(node, pipe, func(error) {}) + defer p.Close() + + err := p.Process(msg) + + time.Sleep(time.Millisecond) + + assert.NoError(t, err) + processor.AssertExpectations(t) +} + +func TestProcessorPump_ProcessError(t *testing.T) { + var err error + + msg := streams.NewMessage("test", "test") + processor := new(MockProcessor) + processor.On("Process", msg).Return(errors.New("test")) + processor.On("Close").Return(nil) + node := streams.NewProcessorNode("test", processor) + pipe := new(MockTimedPipe) + pipe.On("Reset") + pipe.On("Duration").Return(time.Duration(0)) + p := streams.NewPump(node, pipe, func(e error) { + err = e + }) + defer p.Close() + + p.Process(msg) + + time.Sleep(time.Millisecond) + + assert.Error(t, err) +} + +func TestProcessorPump_Close(t *testing.T) { + processor := new(MockProcessor) + processor.On("Close").Return(nil) + node := streams.NewProcessorNode("test", processor) + pipe := new(MockTimedPipe) + p := streams.NewPump(node, pipe, func(error) {}) + + err := p.Close() + + assert.NoError(t, err) + processor.AssertExpectations(t) +} + +func TestProcessorPump_CloseError(t *testing.T) { + processor := new(MockProcessor) + processor.On("Close").Return(errors.New("test")) + node := streams.NewProcessorNode("test", processor) + pipe := new(MockTimedPipe) + p := streams.NewPump(node, pipe, func(error) {}) + + err := p.Close() + + assert.Error(t, err) +} + +func TestSourcePump_CanConsume(t *testing.T) { + ctx := stats.WithStats(context.Background(), stats.Null) + msg := streams.NewMessageWithContext(ctx, "test", "test") + source := new(MockSource) + source.On("Consume").Maybe().Return(msg, nil) + source.On("Close").Return(nil) + pump := new(MockPump) + pump.On("Process", msg).Return(nil) + p := streams.NewSourcePump("test", source, []streams.Pump{pump}, func(error) {}) + defer p.Close() + defer p.Stop() + + time.Sleep(time.Millisecond) + + pump.AssertExpectations(t) +} + +func TestSourcePump_HandlesPumpError(t *testing.T) { + gotError := false + msg := streams.NewMessage("test", "test") + source := new(MockSource) + source.On("Consume").Maybe().Return(msg, nil) + source.On("Close").Return(nil) + pump := new(MockPump) + pump.On("Process", msg).Return(errors.New("test")) + p := streams.NewSourcePump("test", source, []streams.Pump{pump}, func(error) { + gotError = true + }) + defer p.Close() + defer p.Stop() + + time.Sleep(time.Millisecond) + + assert.True(t, gotError) +} + +func TestSourcePump_Close(t *testing.T) { + source := new(MockSource) + source.On("Consume").Maybe().Return(streams.NewMessage("test", "test"), nil) + source.On("Close").Return(nil) + p := streams.NewSourcePump("test", source, []streams.Pump{}, func(error) {}) + p.Stop() + + err := p.Close() + + assert.NoError(t, err) + source.AssertExpectations(t) +} + +func TestSourcePump_CloseError(t *testing.T) { + source := new(MockSource) + source.On("Consume").Return(streams.NewMessage("test", "test"), nil) + source.On("Close").Return(errors.New("test")) + p := streams.NewSourcePump("test", source, []streams.Pump{}, func(error) {}) + p.Stop() + + err := p.Close() + + assert.Error(t, err) +} diff --git a/sql/sink.go b/sql/sink.go index 53c0d39..6d96764 100644 --- a/sql/sink.go +++ b/sql/sink.go @@ -25,7 +25,7 @@ func WithBatchMessages(messages int) SinkFunc { } } -// WithBatchMessages configures the frequency to send a batch +// WithBatchFrequency configures the frequency to send a batch // on the Sink. func WithBatchFrequency(freq time.Duration) SinkFunc { return func(s *Sink) { diff --git a/stream.go b/stream.go index 252ffda..b0e8c31 100644 --- a/stream.go +++ b/stream.go @@ -1,25 +1,30 @@ package streams +// StreamBuilder represents a stream builder. type StreamBuilder struct { tp *TopologyBuilder } +// NewStreamBuilder creates a new StreamBuilder. func NewStreamBuilder() *StreamBuilder { return &StreamBuilder{ tp: NewTopologyBuilder(), } } +// Source adds a Source to the stream, returning the Stream. func (sb *StreamBuilder) Source(name string, source Source) *Stream { n := sb.tp.AddSource(name, source) return newStream(sb.tp, []Node{n}) } +// Build builds the stream Topology. func (sb *StreamBuilder) Build() *Topology { return sb.tp.Build() } +// Stream represents a stream of data. type Stream struct { tp *TopologyBuilder parents []Node @@ -32,6 +37,7 @@ func newStream(tp *TopologyBuilder, parents []Node) *Stream { } } +// Filter filters the stream using a Predicate. func (s *Stream) Filter(name string, pred Predicate) *Stream { p := NewFilterProcessor(pred) n := s.tp.AddProcessor(name, p, s.parents) @@ -39,6 +45,7 @@ func (s *Stream) Filter(name string, pred Predicate) *Stream { return newStream(s.tp, []Node{n}) } +// Branch branches a stream based in the given Predcates. func (s *Stream) Branch(name string, preds ...Predicate) []*Stream { p := NewBranchProcessor(preds) n := s.tp.AddProcessor(name, p, s.parents) @@ -50,6 +57,7 @@ func (s *Stream) Branch(name string, preds ...Predicate) []*Stream { return streams } +// Map runs a Mapper on the stream. func (s *Stream) Map(name string, mapper Mapper) *Stream { p := NewMapProcessor(mapper) n := s.tp.AddProcessor(name, p, s.parents) @@ -57,6 +65,7 @@ func (s *Stream) Map(name string, mapper Mapper) *Stream { return newStream(s.tp, []Node{n}) } +// FlatMap runs a flat mapper on the stream. func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream { p := NewFlatMapProcessor(mapper) n := s.tp.AddProcessor(name, p, s.parents) @@ -64,6 +73,7 @@ func (s *Stream) FlatMap(name string, mapper FlatMapper) *Stream { return newStream(s.tp, []Node{n}) } +// Merge merges one or more streams into this stream. func (s *Stream) Merge(name string, streams ...*Stream) *Stream { parents := []Node{} parents = append(parents, s.parents...) @@ -78,10 +88,12 @@ func (s *Stream) Merge(name string, streams ...*Stream) *Stream { return newStream(s.tp, []Node{n}) } +// Print prints the data in the stream. func (s *Stream) Print(name string) *Stream { return s.Process(name, NewPrintProcessor()) } +// Process runs a custom processor on the stream. func (s *Stream) Process(name string, p Processor) *Stream { n := s.tp.AddProcessor(name, p, s.parents) diff --git a/task.go b/task.go index bb22635..e3c17c8 100644 --- a/task.go +++ b/task.go @@ -1,47 +1,48 @@ package streams import ( - "sync" - "time" - - "github.com/msales/pkg/stats" - "github.com/pkg/errors" - "github.com/tevino/abool" + "errors" ) +// ErrorFunc represents a streams error handling function. type ErrorFunc func(error) +// Task represents a streams task. type Task interface { + // Start starts the streams processors. Start() error + // OnError sets the error handler. OnError(fn ErrorFunc) + // Close stops and closes the streams processors. Close() error } type streamTask struct { topology *Topology - running *abool.AtomicBool + running bool errorFn ErrorFunc - sourceWg sync.WaitGroup - procChs map[Node]chan *Message - procSigs map[Node]chan bool + srcPumps SourcePumps + pumps map[Node]Pump } +// NewTask creates a new streams task. func NewTask(topology *Topology) Task { return &streamTask{ topology: topology, - running: abool.New(), - procChs: make(map[Node]chan *Message), - procSigs: make(map[Node]chan bool), + srcPumps: SourcePumps{}, + pumps: map[Node]Pump{}, } } +// Start starts the streams processors. func (t *streamTask) Start() error { // If we are already running, exit - if !t.running.SetToIf(false, true) { + if t.running { return errors.New("streams: task already started") } + t.running = true t.setupTopology() @@ -49,120 +50,48 @@ func (t *streamTask) Start() error { } func (t *streamTask) setupTopology() { - for _, node := range t.topology.Processors() { - pipe := NewProcessorPipe(node) - node.WithPipe(pipe) - - ch := make(chan *Message, 1000) - t.procChs[node] = ch - t.procSigs[node] = t.runProcessor(node, ch) + nodes := flattenNodeTree(t.topology.Sources()) + reverseNodes(nodes) + for _, node := range nodes { + pipe := NewPipe(t.resolvePumps(node.Children())) + node.Processor().WithPipe(pipe) + + pump := NewPump(node, pipe.(TimedPipe), t.handleError) + t.pumps[node] = pump } for source, node := range t.topology.Sources() { - pipe := NewProcessorPipe(node) - node.WithPipe(pipe) - - t.runSource(source, node) + srcPump := NewSourcePump(node.Name(), source, t.resolvePumps(node.Children()), t.handleError) + t.srcPumps = append(t.srcPumps, srcPump) } } -func (t *streamTask) runProcessor(node Node, ch chan *Message) chan bool { - done := make(chan bool, 1) - - go func() { - for msg := range ch { - start := time.Now() - - nodeMsgs, err := node.Process(msg) - if err != nil { - t.handleError(err) - } - - stats.Timing(msg.Ctx, "node.latency", time.Since(start), 1.0, "name", node.Name()) - stats.Inc(msg.Ctx, "node.throughput", 1, 1.0, "name", node.Name()) - stats.Gauge(msg.Ctx, "node.back-pressure", pressure(ch), 0.1, "name", node.Name()) - - for _, nodeMsg := range nodeMsgs { - ch := t.procChs[nodeMsg.Node] - ch <- nodeMsg.Msg - } - } - - done <- true - }() - - return done -} - -func (t *streamTask) runSource(source Source, node Node) { - go func() { - t.sourceWg.Add(1) - defer t.sourceWg.Done() - - for t.running.IsSet() { - start := time.Now() - - msg, err := source.Consume() - if err != nil { - t.handleError(err) - } - - if msg.Empty() { - continue - } - - stats.Timing(msg.Ctx, "node.latency", time.Since(start), 1.0, "name", node.Name()) - stats.Inc(msg.Ctx, "node.throughput", 1, 1.0, "name", node.Name()) - - nodeMsgs, err := node.Process(msg) - if err != nil { - t.handleError(err) - } - - for _, nodeMsg := range nodeMsgs { - ch := t.procChs[nodeMsg.Node] - ch <- nodeMsg.Msg - } - } - }() +func (t *streamTask) resolvePumps(nodes []Node) []Pump { + var pumps []Pump + for _, node := range nodes { + pumps = append(pumps, t.pumps[node]) + } + return pumps } +// Close stops and closes the streams processors. func (t *streamTask) Close() error { - t.running.UnSet() - t.sourceWg.Wait() + t.running = false + t.srcPumps.StopAll() return t.closeTopology() } func (t *streamTask) closeTopology() error { - for _, node := range t.topology.Sources() { - if err := t.closeNode(node); err != nil { + nodes := flattenNodeTree(t.topology.Sources()) + for _, node := range nodes { + if err := t.pumps[node].Close(); err != nil { return err } } - for source := range t.topology.Sources() { - if err := source.Close(); err != nil { - return err - } - } - - return nil -} - -func (t *streamTask) closeNode(n Node) error { - if done, ok := t.procSigs[n]; ok { - ch := t.procChs[n] - close(ch) - <-done - } - - if err := n.Close(); err != nil { - return err - } - - for _, child := range n.Children() { - if err := t.closeNode(child); err != nil { + for _, srcPump := range t.srcPumps { + if err := srcPump.Close(); err != nil { return err } } @@ -171,18 +100,13 @@ func (t *streamTask) closeNode(n Node) error { } func (t *streamTask) handleError(err error) { - t.running.UnSet() + t.running = false + t.srcPumps.StopAll() t.errorFn(err) } +// OnError sets the error handler. func (t *streamTask) OnError(fn ErrorFunc) { t.errorFn = fn } - -func pressure(ch chan *Message) float64 { - l := float64(len(ch)) - c := float64(cap(ch)) - - return l / c * 100 -} diff --git a/task_test.go b/task_test.go index 6e3b70c..ab83566 100644 --- a/task_test.go +++ b/task_test.go @@ -39,7 +39,10 @@ func TestStreamTask_ConsumesMessages(t *testing.T) { t.FailNow() }) - task.Start() + err := task.Start() + if err != nil { + assert.FailNow(t, err.Error()) + } msgs <- msg @@ -69,7 +72,10 @@ func TestStreamTask_Throughput(t *testing.T) { t.FailNow() }) - task.Start() + err := task.Start() + if err != nil { + assert.FailNow(t, err.Error()) + } for i := 0; i < 100; i++ { msgs <- msg @@ -163,7 +169,7 @@ func TestStreamTask_HandleCloseWithProcessorError(t *testing.T) { s.On("Close").Return(nil) p := new(MockProcessor) - p.On("WithPipe", mock.Anything).Return(nil) + p.On("WithPipe", mock.Anything) p.On("Close").Return(errors.New("test error")) b := streams.NewStreamBuilder() diff --git a/topology.go b/topology.go index 507ea3c..d294b7b 100644 --- a/topology.go +++ b/topology.go @@ -1,63 +1,65 @@ package streams +// Node represents a topology node. type Node interface { + // Name gets the node name. Name() string - WithPipe(Pipe) + // AddChild adds a child node to the node. AddChild(n Node) + // Children gets the nodes children. Children() []Node - Process(*Message) ([]NodeMessage, error) - Close() error + // Processor gets the nodes processor. + Processor() Processor } +var _ = (Node)(&SourceNode{}) + +// SourceNode represents a node between the source +// and the rest of the node tree. type SourceNode struct { name string - pipe Pipe children []Node } +// NewSourceNode create a new SourceNode. func NewSourceNode(name string) *SourceNode { return &SourceNode{ name: name, } } +// Name gets the node name. func (n *SourceNode) Name() string { return n.name } -func (n *SourceNode) WithPipe(pipe Pipe) { - n.pipe = pipe -} - +// AddChild adds a child node to the node. func (n *SourceNode) AddChild(node Node) { n.children = append(n.children, node) } +// Children gets the nodes children. func (n *SourceNode) Children() []Node { return n.children } -func (n *SourceNode) Process(msg *Message) ([]NodeMessage, error) { - if err := n.pipe.Forward(msg); err != nil { - return nil, err - } - - return n.pipe.Queue(), nil -} - -func (n *SourceNode) Close() error { +// Processor gets the nodes processor. +func (n *SourceNode) Processor() Processor { return nil } +var _ = (Node)(&ProcessorNode{}) + +// ProcessorNode represents the topology node for a processor. type ProcessorNode struct { name string - pipe Pipe processor Processor children []Node } +// NewProcessorNode creates a new ProcessorNode. func NewProcessorNode(name string, p Processor) *ProcessorNode { return &ProcessorNode{ name: name, @@ -65,53 +67,49 @@ func NewProcessorNode(name string, p Processor) *ProcessorNode { } } +// Name gets the node name. func (n *ProcessorNode) Name() string { return n.name } -func (n *ProcessorNode) WithPipe(pipe Pipe) { - n.pipe = pipe - n.processor.WithPipe(pipe) -} - +// AddChild adds a child node to the node. func (n *ProcessorNode) AddChild(node Node) { n.children = append(n.children, node) } +// Children gets the nodes children. func (n *ProcessorNode) Children() []Node { return n.children } -func (n *ProcessorNode) Process(msg *Message) ([]NodeMessage, error) { - if err := n.processor.Process(msg); err != nil { - return nil, err - } - - return n.pipe.Queue(), nil -} - -func (n *ProcessorNode) Close() error { - return n.processor.Close() +// Processor gets the nodes processor. +func (n *ProcessorNode) Processor() Processor { + return n.processor } +// Topology represents the streams topology. type Topology struct { sources map[Source]Node processors []Node } +// Sources get the topology Sources. func (t Topology) Sources() map[Source]Node { return t.sources } +// Processors gets the topology Processors. func (t Topology) Processors() []Node { return t.processors } +// TopologyBuilder represents a topology builder. type TopologyBuilder struct { sources map[Source]Node processors []Node } +// NewTopologyBuilder creates a new TopologyBuilder. func NewTopologyBuilder() *TopologyBuilder { return &TopologyBuilder{ sources: map[Source]Node{}, @@ -119,6 +117,7 @@ func NewTopologyBuilder() *TopologyBuilder { } } +// AddSource adds a Source to the builder, returning the created Node. func (tb *TopologyBuilder) AddSource(name string, source Source) Node { n := NewSourceNode(name) @@ -127,6 +126,7 @@ func (tb *TopologyBuilder) AddSource(name string, source Source) Node { return n } +// AddProcessor adds a Processor to the builder, returning the created Node. func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parents []Node) Node { n := NewProcessorNode(name, processor) for _, parent := range parents { @@ -138,9 +138,55 @@ func (tb *TopologyBuilder) AddProcessor(name string, processor Processor, parent return n } +// Build creates an immutable Topology. func (tb *TopologyBuilder) Build() *Topology { return &Topology{ sources: tb.sources, processors: tb.processors, } } + +func flattenNodeTree(roots map[Source]Node) []Node { + var nodes []Node + var visit []Node + + for _, node := range roots { + visit = append(visit, node) + } + + for len(visit) > 0 { + var n Node + n, visit = visit[0], visit[1:] + + if n.Processor() != nil { + nodes = append(nodes, n) + } + + for _, c := range n.Children() { + if contains(c, visit) { + continue + } + + visit = append(visit, c) + } + } + + return nodes +} + +func reverseNodes(nodes []Node) { + for i := len(nodes)/2 - 1; i >= 0; i-- { + opp := len(nodes) - 1 - i + nodes[i], nodes[opp] = nodes[opp], nodes[i] + } +} + +func contains(n Node, nodes []Node) bool { + for _, node := range nodes { + if node == n { + return true + } + } + + return false +} diff --git a/topology_internal_test.go b/topology_internal_test.go new file mode 100644 index 0000000..d5da67a --- /dev/null +++ b/topology_internal_test.go @@ -0,0 +1,127 @@ +package streams + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFlattenNodeTree(t *testing.T) { + node7 := &testNode{processor: &testProcessor{}} + node6 := &testNode{processor: &testProcessor{}} + node5 := &testNode{ + children: []Node{node6, node7}, + processor: &testProcessor{}, + } + node3 := &testNode{ + children: []Node{node5}, + processor: &testProcessor{}, + } + node1 := &testNode{ + children: []Node{node3}, + } + node4 := &testNode{ + children: []Node{node5}, + processor: &testProcessor{}, + } + node2 := &testNode{ + children: []Node{node4}, + } + + nodes := flattenNodeTree(map[Source]Node{ + testSource(1): node1, + testSource(2): node2, + }) + + assert.Equal(t, []Node{node3, node4, node5, node6, node7}, nodes) +} + +func TestReverse(t *testing.T) { + node1 := &testNode{} + node2 := &testNode{} + node3 := &testNode{} + node4 := &testNode{} + nodes := []Node{node1, node2, node3, node4} + + reverseNodes(nodes) + + assert.Equal(t, []Node{node4, node3, node2, node1}, nodes) +} + +func TestContains(t *testing.T) { + node1 := &testNode{} + node2 := &testNode{} + node3 := &testNode{} + node4 := &testNode{} + + tests := []struct { + node Node + nodes []Node + found bool + }{ + { + node: node1, + nodes: []Node{node1, node2, node3}, + found: true, + }, + { + node: node4, + nodes: []Node{node1, node2, node3}, + found: false, + }, + } + + for _, tt := range tests { + found := contains(tt.node, tt.nodes) + + assert.Equal(t, tt.found, found) + } +} + +type testNode struct { + name string + children []Node + processor Processor +} + +func (t *testNode) Name() string { + return "" +} + +func (t *testNode) AddChild(n Node) { + t.children = append(t.children, n) +} + +func (t *testNode) Children() []Node { + return t.children +} + +func (t *testNode) Processor() Processor { + return t.processor +} + +type testSource int + +func (s testSource) Consume() (*Message, error) { + return nil, nil +} + +func (s testSource) Commit(v interface{}) error { + return nil +} + +func (s testSource) Close() error { + return nil +} + +type testProcessor struct{} + +func (p testProcessor) WithPipe(Pipe) {} + +func (p testProcessor) Process(msg *Message) error { + return nil +} + +func (p testProcessor) Close() error { + return nil +} diff --git a/topology_test.go b/topology_test.go index c93f73d..5bba9dc 100644 --- a/topology_test.go +++ b/topology_test.go @@ -4,8 +4,6 @@ import ( "testing" "github.com/msales/streams" - "github.com/msales/streams/mocks" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -36,47 +34,12 @@ func TestSourceNode_Children(t *testing.T) { assert.Equal(t, child, children[0]) } -func TestSourceNode_Process(t *testing.T) { - key := "test" - value := "test" - - pipe := mocks.NewPipe(t) - pipe.ExpectForward(key, value) - - n := streams.SourceNode{} - n.WithPipe(pipe) - - _, err := n.Process(streams.NewMessage(key, value)) - - assert.NoError(t, err) - - pipe.AssertExpectations() -} - -func TestSourceNode_ProcessWithForwardError(t *testing.T) { - key := "test" - value := "test" - - pipe := mocks.NewPipe(t) - pipe.ExpectForward(key, value) - pipe.ShouldError() - - n := streams.SourceNode{} - n.WithPipe(pipe) - - _, err := n.Process(streams.NewMessage(key, value)) - - assert.Error(t, err) - - pipe.AssertExpectations() -} - -func TestSourceNode_Close(t *testing.T) { +func TestSourceNode_Processor(t *testing.T) { n := streams.SourceNode{} - err := n.Close() + processor := n.Processor() - assert.NoError(t, err) + assert.Nil(t, processor) } func TestNewProcessorNode(t *testing.T) { @@ -107,45 +70,13 @@ func TestProcessorNode_Children(t *testing.T) { assert.Equal(t, child, children[0]) } -func TestProcessorNode_Process(t *testing.T) { - msg := streams.NewMessage("test", "test") - pipe := mocks.NewPipe(t) - p := new(MockProcessor) - p.On("WithPipe", pipe).Return(nil) - p.On("Process", msg).Return(nil, nil) - n := streams.NewProcessorNode("test", p) - n.WithPipe(pipe) - - _, err := n.Process(msg) - - assert.NoError(t, err) - p.AssertExpectations(t) -} - -func TestProcessorNode_ProcessWithError(t *testing.T) { - msg := streams.NewMessage("test", "test") - pipe := mocks.NewPipe(t) - p := new(MockProcessor) - p.On("WithPipe", pipe).Return(nil) - p.On("Process", msg).Return(errors.New("test")) - n := streams.NewProcessorNode("test", p) - n.WithPipe(pipe) - - _, err := n.Process(msg) - - assert.Error(t, err) - p.AssertExpectations(t) -} - -func TestProcessorNode_Close(t *testing.T) { +func TestProcessorNode_Processor(t *testing.T) { p := new(MockProcessor) - p.On("Close").Return(nil) n := streams.NewProcessorNode("test", p) - err := n.Close() + processor := n.Processor() - assert.NoError(t, err) - p.AssertExpectations(t) + assert.Exactly(t, p, processor) } func TestTopologyBuilder_AddSource(t *testing.T) {