Skip to content

Commit

Permalink
Fix calls with empty lists (#1084)
Browse files Browse the repository at this point in the history
* Empty topic collection and
increased test coverage

* Empty partition map returns empty ResultInfos
Supports request timeout
ResultsInfos -> ResultInfos
Increased test coverage

* Update documentation

---------

Co-authored-by: Milind L <[email protected]>
Co-authored-by: Milind L <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2023
1 parent 1c0a3cd commit cd781ba
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 35 deletions.
2 changes: 1 addition & 1 deletion examples/admin_list_offsets/admin_list_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func main() {
}
// map[TopicPartition]ListOffsetsResultInfo
// Print results
for tp, info := range results.ResultsInfos {
for tp, info := range results.ResultInfos {
fmt.Printf("Topic: %s Partition: %d\n", *tp.Topic, tp.Partition)
if info.Error.Code() != kafka.ErrNoError {
fmt.Printf(" ErrorCode: %d ErrorMessage: %s\n\n", info.Error.Code(), info.Error.String())
Expand Down
26 changes: 17 additions & 9 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ type ListOffsetsResultInfo struct {

// ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request.
type ListOffsetsResult struct {
ResultsInfos map[TopicPartition]ListOffsetsResultInfo
ResultInfos map[TopicPartition]ListOffsetsResultInfo
}

// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
Expand Down Expand Up @@ -1140,9 +1140,9 @@ func (a *AdminClient) cToAuthorizedOperations(
// cToUUID converts a C rd_kafka_Uuid_t to a Go UUID.
func (a *AdminClient) cToUUID(cUUID *C.rd_kafka_Uuid_t) UUID {
uuid := UUID{
mostSignificantBits: int64(C.rd_kafka_Uuid_most_significant_bits(cUUID)),
mostSignificantBits: int64(C.rd_kafka_Uuid_most_significant_bits(cUUID)),
leastSignificantBits: int64(C.rd_kafka_Uuid_least_significant_bits(cUUID)),
base64str: C.GoString(C.rd_kafka_Uuid_base64str(cUUID)),
base64str: C.GoString(C.rd_kafka_Uuid_base64str(cUUID)),
}
return uuid
}
Expand Down Expand Up @@ -1420,7 +1420,7 @@ func cToDescribeUserScramCredentialsResult(
// cToListOffsetsResult converts a C
// rd_kafka_ListOffsets_result_t to a Go ListOffsetsResult
func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOffsetsResult) {
result = ListOffsetsResult{ResultsInfos: make(map[TopicPartition]ListOffsetsResultInfo)}
result = ListOffsetsResult{ResultInfos: make(map[TopicPartition]ListOffsetsResultInfo)}
var cPartitionCount C.size_t
cResultInfos := C.rd_kafka_ListOffsets_result_infos(cRes, &cPartitionCount)
for itr := 0; itr < int(cPartitionCount); itr++ {
Expand All @@ -1436,7 +1436,7 @@ func cToListOffsetsResult(cRes *C.rd_kafka_ListOffsets_result_t) (result ListOff
resultInfo.LeaderEpoch = &cLeaderEpoch
}
resultInfo.Error = newError(cPartition.err)
result.ResultsInfos[Partition] = resultInfo
result.ResultInfos[Partition] = resultInfo
}
return result
}
Expand Down Expand Up @@ -2728,7 +2728,8 @@ func (a *AdminClient) DescribeConsumerGroups(
// Parameters:
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topics` - Collection of topics to describe. This should not be nil/empty.
// - `topics` - Collection of topics to describe. This should not have nil
// topic names.
// - `options` - DescribeTopicsAdminOption options.
//
// Returns DescribeTopicsResult, which contains a slice of
Expand All @@ -2751,6 +2752,11 @@ func (a *AdminClient) DescribeTopics(
cTopicNameList := make([]*C.char, len(topics.topicNames))
cTopicNameCount := C.size_t(len(topics.topicNames))

if topics.topicNames == nil {
return describeResult, newErrorFromString(ErrInvalidArg,
"TopicCollection of topic names cannot be nil")
}

for idx, topic := range topics.topicNames {
cTopicNameList[idx] = C.CString(topic)
defer C.free(unsafe.Pointer(cTopicNameList[idx]))
Expand Down Expand Up @@ -3216,18 +3222,20 @@ func (a *AdminClient) DescribeUserScramCredentials(
// specified TopicPartiton based on an OffsetSpec.
//
// Parameters:
//
// - `ctx` - context with the maximum amount of time to block, or nil for
// indefinite.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it holds either the OffsetSpec enum value or timestamp.
// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it
// holds either the OffsetSpec enum value or timestamp. Must not be nil.
// - `options` - ListOffsetsAdminOption options.
//
// Returns a ListOffsetsResult.
// Each TopicPartition's ListOffset can have an individual error.
func (a *AdminClient) ListOffsets(
ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec,
options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) {
if len(topicPartitionOffsets) < 1 || topicPartitionOffsets == nil {
return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets of size greater or equal 1.")
if topicPartitionOffsets == nil {
return result, newErrorFromString(ErrInvalidArg, "expected topicPartitionOffsets parameter.")
}

topicPartitions := C.rd_kafka_topic_partition_list_new(C.int(len(topicPartitionOffsets)))
Expand Down
218 changes: 196 additions & 22 deletions kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,29 +487,73 @@ func testAdminAPIsDescribeConsumerGroups(

func testAdminAPIsDescribeTopics(
what string, a *AdminClient, expDuration time.Duration, t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()
descres, err := a.DescribeTopics(
ctx, NewTopicCollectionOfTopicNames(nil), SetAdminRequestTimeout(time.Second))
if descres.TopicDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeTopics to fail, but got result: %v, err: %v",
descres, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg with empty topics list, but got %s", err)
}
requestTimeout := SetAdminRequestTimeout(time.Second)
for _, options := range [][]DescribeTopicsAdminOption{
{},
{requestTimeout},
{requestTimeout, SetAdminOptionIncludeAuthorizedOperations(true)},
{SetAdminOptionIncludeAuthorizedOperations(false)},
} {

// nil slice gives error
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()
descres, err := a.DescribeTopics(
ctx, NewTopicCollectionOfTopicNames(nil), options...)
if descres.TopicDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeTopics to fail, but got result: %v, err: %v",
descres, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg with nil slice, but got %s", err)
}

ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
descres, err = a.DescribeTopics(
ctx, NewTopicCollectionOfTopicNames([]string{"test"}),
SetAdminRequestTimeout(time.Second))
if descres.TopicDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeTopics to fail, but got result: %v, err: %v",
descres, err)
}
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %s %v", err.(Error).Code(), ctx.Err())
// Empty slice returns empty TopicDescription slice
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
descres, err = a.DescribeTopics(
ctx, NewTopicCollectionOfTopicNames([]string{}), options...)
if descres.TopicDescriptions == nil || err != nil {
t.Fatalf("Expected DescribeTopics to succeed, but got result: %v, err: %v",
descres, err)
}
if len(descres.TopicDescriptions) > 0 {
t.Fatalf("Expected an empty TopicDescription slice, but got %d elements",
len(descres.TopicDescriptions))
}

// Empty topic names
for _, topicCollection := range []TopicCollection{
NewTopicCollectionOfTopicNames([]string{""}),
NewTopicCollectionOfTopicNames([]string{"correct", ""}),
} {
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
descres, err = a.DescribeTopics(
ctx, topicCollection,
options...)
if descres.TopicDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeTopics to fail, but got result: %v, err: %v",
descres, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %d %v", err.(Error).Code(), err.Error())
}
}

// Normal call
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
descres, err = a.DescribeTopics(
ctx, NewTopicCollectionOfTopicNames([]string{"test"}),
options...)
if descres.TopicDescriptions != nil || err == nil {
t.Fatalf("Expected DescribeTopics to fail, but got result: %v, err: %v",
descres, err)
}
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %s %v", err.(Error).Code(), ctx.Err())
}
}
}

Expand Down Expand Up @@ -638,6 +682,135 @@ func testAdminAPIsAlterConsumerGroupOffsets(
t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
}
}

func testAdminAPIsListOffsets(
what string, a *AdminClient, expDuration time.Duration, t *testing.T) {
topic := "test"
invalidTopic := ""
requestTimeout := SetAdminRequestTimeout(time.Second)

// Invalid option value
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()
result, err := a.ListOffsets(
ctx,
map[TopicPartition]OffsetSpec{}, SetAdminRequestTimeout(-1))
if result.ResultInfos != nil || err == nil {
t.Fatalf("Expected ListOffsets to fail, but got result: %v, err: %v",
result, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %v", err)
}

for _, options := range [][]ListOffsetsAdminOption{
{},
{requestTimeout},
{requestTimeout, SetAdminIsolationLevel(IsolationLevelReadUncommitted)},
{SetAdminIsolationLevel(IsolationLevelReadCommitted)},
} {
// nil argument should fail, not being treated as empty
ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()
result, err := a.ListOffsets(
ctx,
nil, options...)
if result.ResultInfos != nil || err == nil {
t.Fatalf("Expected ListOffsets to fail, but got result: %v, err: %v",
result, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %v", err)
}

// Empty map returns empty ResultInfos
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
result, err = a.ListOffsets(
ctx,
map[TopicPartition]OffsetSpec{}, options...)
if result.ResultInfos == nil || err != nil {
t.Fatalf("Expected ListOffsets to succeed, but got result: %v, err: %v",
result, err)
}
if len(result.ResultInfos) > 0 {
t.Fatalf("Expected empty ResultInfos, not %v", result.ResultInfos)
}

// Invalid TopicPartition
for _, topicPartitionOffsets := range []map[TopicPartition]OffsetSpec{
{{Topic: &invalidTopic, Partition: 0}: EarliestOffsetSpec},
{{Topic: &topic, Partition: -1}: EarliestOffsetSpec},
} {
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
result, err = a.ListOffsets(
ctx,
topicPartitionOffsets, options...)
if result.ResultInfos != nil || err == nil {
t.Fatalf("Expected ListOffsets to fail, but got result: %v, err: %v",
result, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %v", err)
}
}

// Same partition with different offsets
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
topicPartitionOffsets := map[TopicPartition]OffsetSpec{
{Topic: &topic, Partition: 0, Offset: 10}: EarliestOffsetSpec,
{Topic: &topic, Partition: 0, Offset: 20}: LatestOffsetSpec,
}
result, err = a.ListOffsets(
ctx,
topicPartitionOffsets, options...)
if result.ResultInfos != nil || err == nil {
t.Fatalf("Expected ListOffsets to fail, but got result: %v, err: %v",
result, err)
}
if err.(Error).Code() != ErrInvalidArg {
t.Fatalf("Expected ErrInvalidArg, not %v", err)
}

// Two different partitions
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
topicPartitionOffsets = map[TopicPartition]OffsetSpec{
{Topic: &topic, Partition: 0, Offset: 10}: EarliestOffsetSpec,
{Topic: &topic, Partition: 1, Offset: 20}: EarliestOffsetSpec,
}
result, err = a.ListOffsets(
ctx,
topicPartitionOffsets, options...)
if result.ResultInfos != nil || err == nil {
t.Fatalf("Expected ListOffsets to fail, but got result: %v, err: %v",
result, err)
}
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
}

// Single partition
ctx, cancel = context.WithTimeout(context.Background(), expDuration)
defer cancel()
topicPartitionOffsets = map[TopicPartition]OffsetSpec{
{Topic: &topic, Partition: 0}: EarliestOffsetSpec,
}
result, err = a.ListOffsets(
ctx,
topicPartitionOffsets, options...)
if result.ResultInfos != nil || err == nil {
t.Fatalf("Expected ListOffsets to fail, but got result: %v, err: %v",
result, err)
}
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
}
}
}

func testAdminAPIsUserScramCredentials(what string, a *AdminClient, expDuration time.Duration, t *testing.T) {
var users []string

Expand Down Expand Up @@ -958,6 +1131,7 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) {

testAdminAPIsListConsumerGroupOffsets(what, a, expDuration, t)
testAdminAPIsAlterConsumerGroupOffsets(what, a, expDuration, t)
testAdminAPIsListOffsets(what, a, expDuration, t)

testAdminAPIsUserScramCredentials(what, a, expDuration, t)
}
Expand Down
2 changes: 2 additions & 0 deletions kafka/adminoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (ao AdminOptionRequestTimeout) supportsListConsumerGroupOffsets() {
}
func (ao AdminOptionRequestTimeout) supportsAlterConsumerGroupOffsets() {
}
func (ao AdminOptionRequestTimeout) supportsListOffsets() {
}
func (ao AdminOptionRequestTimeout) supportsDescribeUserScramCredentials() {
}
func (ao AdminOptionRequestTimeout) supportsAlterUserScramCredentials() {
Expand Down
6 changes: 3 additions & 3 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3206,7 +3206,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListOffsets() {
results, err = a.ListOffsets(ctx, topicPartitionOffsets, SetAdminIsolationLevel(IsolationLevelReadCommitted))
assert.Nil(err, "ListOffsets should not fail.")

for _, info := range results.ResultsInfos {
for _, info := range results.ResultInfos {
assert.Equal(info.Error.Code(), ErrNoError, "Error code should be ErrNoError.")
assert.Equal(info.Offset, int64(0), "Offset should be ErrNoError.")
}
Expand All @@ -3215,7 +3215,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListOffsets() {
results, err = a.ListOffsets(ctx, topicPartitionOffsets, SetAdminIsolationLevel(IsolationLevelReadCommitted))
assert.Nil(err, "ListOffsets should not fail.")

for _, info := range results.ResultsInfos {
for _, info := range results.ResultInfos {
assert.Equal(info.Error.Code(), ErrNoError, "Error code should be ErrNoError.")
assert.Equal(info.Offset, int64(3), "Offset should be 3.")
}
Expand All @@ -3224,7 +3224,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListOffsets() {
results, err = a.ListOffsets(ctx, topicPartitionOffsets, SetAdminIsolationLevel(IsolationLevelReadCommitted))
assert.Nil(err, "ListOffsets should not fail.")

for _, info := range results.ResultsInfos {
for _, info := range results.ResultInfos {
assert.Equal(info.Error.Code(), ErrNoError, "Error code should be ErrNoError.")
assert.Equal(info.Offset, int64(1), "Offset should be 1.")
}
Expand Down

0 comments on commit cd781ba

Please sign in to comment.