diff --git a/kafka/source.go b/kafka/source.go index 10a16b1..b8440c1 100644 --- a/kafka/source.go +++ b/kafka/source.go @@ -265,9 +265,9 @@ func (s *Source) Commit(v interface{}) error { // If commit strategy is not CommitAuto, session should perform global, synchronous commit of current marked offsets. if s.commitStrategy != CommitAuto { s.session.Commit() + runtime.Gosched() // If any error from consumer side happens after Commiting, it will be read and s.lastErr will be set. } - runtime.Gosched() // If any error from consumer side happens after Commiting, it will be read and s.lastErr will be set. return s.lastErr }