Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Skandalik committed Nov 1, 2023
1 parent 858b52c commit e99133b
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
run: go vet ./...

- name: Test
run: go test -bench=. -covermode=count -coverprofile=profile_full.cov -coverpkg=github.com/msales/streams/... ./...
run: go test -race -bench=. -covermode=count -coverprofile=profile_full.cov -coverpkg=github.com/msales/streams/... ./...

- name: Goveralls
run: |
Expand Down
4 changes: 2 additions & 2 deletions kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (s *Source) Commit(v interface{}) error {
// If commit strategy is not CommitAuto, session should perform global, synchronous commit of current marked offsets.
if s.commitStrategy != CommitAuto {
s.session.Commit()
runtime.Gosched() // If any error from consumer side happens after Commiting, it will be read and s.lastErr will be set.
runtime.Gosched() // If any error from consumer side happens after Committing, it will be read and sent to errors channel.
}

select {
Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *Source) runConsumerGroup(ctx context.Context, topic string) {

for {
err := s.consumer.Consume(ctx, []string{topic}, s)
if ctx.Err() == context.Canceled { // This is the proper way to end the consumption.
if errors.Is(ctx.Err(), context.Canceled) { // This is the proper way to end the consumption.
return
}
if err == nil {
Expand Down
2 changes: 0 additions & 2 deletions kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,6 @@ func TestSource_Commit_Manual_ReturnError(t *testing.T) {
_, err = s.Consume()
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

err = s.Commit(meta)

assert.Error(t, err)
Expand Down

0 comments on commit e99133b

Please sign in to comment.