Skip to content

Commit

Permalink
Reduce log spam from consumer group (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius authored Feb 18, 2020
1 parent 29e890d commit f116c6a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
36 changes: 33 additions & 3 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier) (*State, error)
return true, errors.Wrap(err, "Failed modifying state")
}

scg.logger.DebugWith("Modified state, saving",
"previousState", previousState,
"modifiedState", modifiedState)
// log only on change
if !scg.statesEqual(previousState, modifiedState) {
scg.logger.DebugWith("Modified state, saving",
"previousState", previousState,
"modifiedState", modifiedState)
}

err = scg.setStateInPersistency(modifiedState, mtime)
if err != nil {
Expand Down Expand Up @@ -312,3 +315,30 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int,
},
})
}

// returns true if the states are equal, ignoring heartbeat times
func (scg *streamConsumerGroup) statesEqual(state0 *State, state1 *State) bool {
if state0.SchemasVersion != state1.SchemasVersion {
return false
}

if len(state0.SessionStates) != len(state1.SessionStates) {
return false
}

// since we compared lengths, we can only do this from state0
for _, state0SessionState := range state0.SessionStates {
session1SessionState := state1.findSessionStateByMemberID(state0SessionState.MemberID)

// if couldn't find session state
if session1SessionState == nil {
return false
}

if !common.IntSlicesEqual(state0SessionState.Shards, session1SessionState.Shards) {
return false
}
}

return true
}
4 changes: 4 additions & 0 deletions pkg/dataplane/test/streamconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type streamConsumerGroupTestSuite struct {
func (suite *streamConsumerGroupTestSuite) SetupSuite() {
suite.streamTestSuite.SetupSuite()
suite.createContainer()
}

func (suite *streamConsumerGroupTestSuite) SetupTest() {
suite.streamTestSuite.SetupTest()
suite.streamPath = fmt.Sprintf("%s/test-stream-0/", suite.testPath)
}

Expand Down

0 comments on commit f116c6a

Please sign in to comment.