diff --git a/kafka/txn_integration_test.go b/kafka/txn_integration_test.go index 4d2c57461..4cd298465 100644 --- a/kafka/txn_integration_test.go +++ b/kafka/txn_integration_test.go @@ -297,6 +297,19 @@ func TestTransactionalSendOffsets(t *testing.T) { } } + // Create consumer (to read committed offsets) prior to closing the + // consumer to trigger the race condition where the transaction is + // not fully committed by the time consumer.Committed() is called. + // Prior to KIP-447 this would result in the committed offsets not + // showing up, but with KIP-447 the consumer automatically retries + // the offset retrieval. + t.Logf("Creating consumer for (later) offset retrieval\n") + consumer, err := NewConsumer(consumerConfig) + if err != nil { + t.Fatalf("Failed to create Consumer client: %s\n", err) + } + + // Close producer // signal go-routine to finish close(termChan) // wait for go-routine to finish @@ -304,27 +317,7 @@ func TestTransactionalSendOffsets(t *testing.T) { producer.Close() - // Until KIP-447 is implemented we need to call - // InitTransactions() after transactions are committed to make - // sure the committed offsets are made available to consumers. - producer2, err := NewProducer(config) - if err != nil { - t.Fatalf("Failed to create Producer client: %s\n", err) - } - - err = producer2.InitTransactions(nil) - if err != nil { - t.Fatalf("InitTransactions() failed: %v\n", err) - } - - producer2.Close() - - // Read committed offsets. - consumer, err := NewConsumer(consumerConfig) - if err != nil { - t.Fatalf("Failed to create Consumer client: %s\n", err) - } - + t.Logf("Retrieving committed offsets\n") committed, err := consumer.Committed(partitions, -1) if err != nil { t.Fatalf("Failed to get committed offsets: %s\n", err)