Skip to content

Commit

Permalink
Merge pull request #19 from lovoo/goka-update
Browse files Browse the repository at this point in the history
update goka to fix logging dependencies
  • Loading branch information
frairon authored Oct 5, 2021
2 parents db4b091 + d47d5ab commit e147d7e
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 97 deletions.
2 changes: 1 addition & 1 deletion cmd/schedbench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {

startExperiment(ctx, errg)

if err := errg.Wait().NilOrError(); err != nil {
if err := errg.Wait().ErrorOrNil(); err != nil {
log.Fatalf("Error running processors")
}

Expand Down
13 changes: 2 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,18 @@ require (
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.10.10 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lovoo/goka v1.0.5-0.20201126132004-639835ca4e74
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/lovoo/goka v1.1.0-beta.1
github.com/onsi/ginkgo v1.13.0 // indirect
github.com/pierrec/lz4 v2.5.2+incompatible // indirect
github.com/prometheus/client_golang v1.7.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rs/xid v1.2.1
github.com/rs/xid v1.3.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1 // indirect
go.opencensus.io v0.22.4 // indirect
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/tools v0.0.0-20200618112559-5fddd300b653 // indirect
google.golang.org/api v0.27.0 // indirect
google.golang.org/genproto v0.0.0-20200709005830-7a2ca40e9dc3 // indirect
google.golang.org/grpc v1.30.0 // indirect
google.golang.org/protobuf v1.25.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
)
83 changes: 13 additions & 70 deletions go.sum

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func NewGokaMockContext(t *testing.T) *GokaMockContext {
}
}

func (g *GokaMockContext) Group() goka.Group {
return goka.Group("mock-group")
}

func (g *GokaMockContext) Reset() {
g.emits = make(map[goka.Stream][]KeyedMessage)
}
Expand All @@ -64,13 +68,13 @@ func (g *GokaMockContext) Offset() int64 {
func (g *GokaMockContext) Value() interface{} {
return g.value
}
func (g *GokaMockContext) Headers() map[string][]byte {
func (g *GokaMockContext) Headers() goka.Headers {
return g.headers
}
func (g *GokaMockContext) SetValue(value interface{}) {
func (g *GokaMockContext) SetValue(value interface{}, options ...goka.ContextOption) {
g.value = value
}
func (g *GokaMockContext) Delete() {
func (g *GokaMockContext) Delete(options ...goka.ContextOption) {
g.value = nil
}
func (g *GokaMockContext) Timestamp() time.Time {
Expand All @@ -82,13 +86,13 @@ func (g *GokaMockContext) Join(topic goka.Table) interface{} {
func (g *GokaMockContext) Lookup(topic goka.Table, key string) interface{} {
return g.lookups[topic][key]
}
func (g *GokaMockContext) Emit(topic goka.Stream, key string, value interface{}) {
func (g *GokaMockContext) Emit(topic goka.Stream, key string, value interface{}, options ...goka.ContextOption) {
g.emits[topic] = append(g.emits[topic], KeyedMessage{
Key: key,
Value: value,
})
}
func (g *GokaMockContext) Loopback(key string, value interface{}) {
func (g *GokaMockContext) Loopback(key string, value interface{}, options ...goka.ContextOption) {
g.loopbacks = append(g.loopbacks, KeyedMessage{
Key: key,
Value: value,
Expand Down
13 changes: 6 additions & 7 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/logger"
"github.com/lovoo/goka/multierr"
)

Expand Down Expand Up @@ -52,7 +51,7 @@ func (pt prefixAndTime) get(prefix string, timeDef int64) string {
}

var (
schedLog = logger.Default().Prefix("goka-tools-scheduler")
schedLog = goka.DefaultLogger()
)

// Emitter is a generic emitter, but probably a *goka.Emitter
Expand Down Expand Up @@ -240,12 +239,12 @@ func makeSortedIntervalList(intervals []time.Duration) []time.Duration {
interval = interval.Truncate(time.Millisecond)

if interval == 0 {
schedLog.Printf("WARNING: Wait times smaller than 1ms are not supported. Ignoring wait time.")
schedLog.Printf("Scheduler (goka-tools) WARNING: Wait times smaller than 1ms are not supported. Ignoring wait time.")
continue
}

if intervalSet[interval] {
schedLog.Printf("WARNING: The interval list contains a duplicate for %dms", interval.Milliseconds())
schedLog.Printf("Scheduler (goka-tools) WARNING: The interval list contains a duplicate for %dms", interval.Milliseconds())
continue
}
intervalSet[interval] = true
Expand Down Expand Up @@ -296,7 +295,7 @@ func (s *Scheduler) placeOrder(ctx goka.Context, msg interface{}) {
newOrder := msg.(*Order)
// validate order and drop if in valid
if err := newOrder.validate(); err != nil {
schedLog.Printf("Ignoring request for invalid order: %v", err)
schedLog.Printf("Scheduler (goka-tools) Ignoring request for invalid order: %v", err)
return
}

Expand All @@ -321,7 +320,7 @@ func (s *Scheduler) placeOrder(ctx goka.Context, msg interface{}) {
switch newOrder.OrderType {
case OrderType_Delay:
if order != nil {
schedLog.Printf("duplicate DELAY order. Each delay needs a unique key. Dropping the new order.")
schedLog.Printf("Scheduler (goka-tools) duplicate DELAY order. Each delay needs a unique key. Dropping the new order.")
return
}
if clk.Now().Sub(newOrder.ExecutionTime.AsTime()) > s.config.orderCatchupTimeout {
Expand Down Expand Up @@ -435,7 +434,7 @@ func (s *Scheduler) executeOrder(ctx goka.Context, msg interface{}) {
order := msg.(*Order)

if err := order.validate(); err != nil {
schedLog.Printf("Ignoring execution of invalid order: %v", err)
schedLog.Printf("Scheduler (goka-tools) Ignoring execution of invalid order: %v", err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestScheduler_Integration(t *testing.T) {
cancel()

defer errg.Wait()
if err := errg.Wait().NilOrError(); err != nil {
if err := errg.Wait().ErrorOrNil(); err != nil {
t.Errorf("Error running scheduler: %v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestSchedulerIntegration(t *testing.T) {
defer func() {
cancel()

if err := errg.Wait().NilOrError(); err != nil {
if err := errg.Wait().ErrorOrNil(); err != nil {
t.Errorf("Error running processors: %v", err)
}

Expand Down Expand Up @@ -357,7 +357,7 @@ func ExampleScheduler() {
})
}

if err := errg.Wait().NilOrError(); err != nil {
if err := errg.Wait().ErrorOrNil(); err != nil {
log.Printf("Error running scheduler: %v", err)
}

Expand Down

0 comments on commit e147d7e

Please sign in to comment.