diff --git a/.changelog/6cb6bc26277c41be84cb8aa523a78989.json b/.changelog/6cb6bc26277c41be84cb8aa523a78989.json new file mode 100644 index 00000000000..cf2ffa15a73 --- /dev/null +++ b/.changelog/6cb6bc26277c41be84cb8aa523a78989.json @@ -0,0 +1,11 @@ +{ + "id": "6cb6bc26-277c-41be-84cb-8aa523a78989", + "type": "feature", + "collapse": true, + "description": "Add handwritten paginators that were present in some services in the v1 SDK.", + "modules": [ + "service/amplifybackend", + "service/dynamodb", + "service/kinesis" + ] +} \ No newline at end of file diff --git a/service/amplifybackend/handwritten_paginators.go b/service/amplifybackend/handwritten_paginators.go new file mode 100644 index 00000000000..d0c14a85671 --- /dev/null +++ b/service/amplifybackend/handwritten_paginators.go @@ -0,0 +1,96 @@ +package amplifybackend + +import ( + "context" + "fmt" +) + +// ListBackendJobsPaginatorOptions is the paginator options for ListBackendJobs +type ListBackendJobsPaginatorOptions struct { + // (Optional) The maximum number of shards to return in a single call + Limit int32 + + // Set to true if pagination should stop if the service returns a pagination token + // that matches the most recent token provided to the service. + StopOnDuplicateToken bool +} + +// ListBackendJobsPaginator is a paginator for ListBackendJobs +type ListBackendJobsPaginator struct { + options ListBackendJobsPaginatorOptions + client ListBackendJobsAPIClient + params *ListBackendJobsInput + firstPage bool + nextToken *string + isTruncated bool +} + +// ListBackendJobsAPIClient is a client that implements the ListBackendJobs operation. +type ListBackendJobsAPIClient interface { + ListBackendJobs(context.Context, *ListBackendJobsInput, ...func(*Options)) (*ListBackendJobsOutput, error) +} + +// NewListBackendJobsPaginator returns a new ListBackendJobsPaginator +func NewListBackendJobsPaginator(client ListBackendJobsAPIClient, params *ListBackendJobsInput, optFns ...func(options *ListBackendJobsPaginatorOptions)) *ListBackendJobsPaginator { + if params == nil { + params = &ListBackendJobsInput{} + } + + options := ListBackendJobsPaginatorOptions{} + options.Limit = params.MaxResults + + for _, fn := range optFns { + fn(&options) + } + + return &ListBackendJobsPaginator{ + options: options, + client: client, + params: params, + firstPage: true, + nextToken: params.NextToken, + } +} + +// HasMorePages returns a boolean indicating whether more pages are available +func (p *ListBackendJobsPaginator) HasMorePages() bool { + return p.firstPage || p.isTruncated +} + +// NextPage retrieves the next ListBackendJobs page. +func (p *ListBackendJobsPaginator) NextPage(ctx context.Context, optFns ...func(*Options)) (*ListBackendJobsOutput, error) { + if !p.HasMorePages() { + return nil, fmt.Errorf("no more pages available") + } + + params := *p.params + params.NextToken = p.nextToken + + var limit int32 + if p.options.Limit > 0 { + limit = p.options.Limit + } + params.MaxResults = limit + + result, err := p.client.ListBackendJobs(ctx, ¶ms, optFns...) + if err != nil { + return nil, err + } + p.firstPage = false + + prevToken := p.nextToken + p.isTruncated = result.NextToken != nil + p.nextToken = nil + if result.NextToken != nil { + p.nextToken = result.NextToken + } + + if p.options.StopOnDuplicateToken && + prevToken != nil && + p.nextToken != nil && + *prevToken == *p.nextToken { + p.isTruncated = false + } + + return result, nil +} diff --git a/service/amplifybackend/handwritten_paginators_test.go b/service/amplifybackend/handwritten_paginators_test.go new file mode 100644 index 00000000000..37e8e6eb884 --- /dev/null +++ b/service/amplifybackend/handwritten_paginators_test.go @@ -0,0 +1,241 @@ +package amplifybackend + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/service/amplifybackend/types" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" +) + +type mockListBackendJobsClient struct { + outputs []*ListBackendJobsOutput + inputs []*ListBackendJobsInput + t *testing.T + limit int32 +} + +func (c *mockListBackendJobsClient) ListBackendJobs(ctx context.Context, input *ListBackendJobsInput, optFns ...func(*Options)) (*ListBackendJobsOutput, error) { + c.inputs = append(c.inputs, input) + requestCnt := len(c.inputs) + testCurRequest(len(c.outputs), requestCnt, c.limit, input.MaxResults, c.t) + return c.outputs[requestCnt-1], nil +} + +type listBackendJobsTestCase struct { + limit int32 + requestCnt int + stopOnDuplicationToken bool + outputs []*ListBackendJobsOutput +} + +func TestListBackendJobsPaginator(t *testing.T) { + cases := map[string]listBackendJobsTestCase{ + "page limit 3": { + limit: 3, + requestCnt: 3, + outputs: []*ListBackendJobsOutput{ + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job1"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job2"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job3"), + }, + }, + NextToken: aws.String("token1"), + }, + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job4"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job5"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job6"), + }, + }, + NextToken: aws.String("token2"), + }, + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job7"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job8"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job9"), + }, + }, + }, + }, + }, + "total count 2 due to nil nextToken": { + limit: 3, + requestCnt: 2, + outputs: []*ListBackendJobsOutput{ + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job1"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job2"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job3"), + }, + }, + NextToken: aws.String("token1"), + }, + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job4"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job5"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job6"), + }, + }, + }, + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job7"), + }, + }, + }, + }, + }, + "total count 2 due to duplicate nextToken": { + limit: 3, + requestCnt: 2, + stopOnDuplicationToken: true, + outputs: []*ListBackendJobsOutput{ + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job1"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job2"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job3"), + }, + }, + NextToken: aws.String("token1"), + }, + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job4"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job5"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job6"), + }, + }, + NextToken: aws.String("token1"), + }, + { + Jobs: []types.BackendJobRespObj{ + { + AppId: aws.String("App"), + JobId: aws.String("Job7"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job8"), + }, + { + AppId: aws.String("App"), + JobId: aws.String("Job9"), + }, + }, + }, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + client := mockListBackendJobsClient{ + t: t, + outputs: c.outputs, + inputs: []*ListBackendJobsInput{}, + limit: c.limit, + } + paginator := NewListBackendJobsPaginator(&client, &ListBackendJobsInput{}, func(options *ListBackendJobsPaginatorOptions) { + options.Limit = c.limit + options.StopOnDuplicateToken = c.stopOnDuplicationToken + }) + + for paginator.HasMorePages() { + _, err := paginator.NextPage(context.TODO()) + if err != nil { + t.Errorf("error: %v", err) + } + } + + inputLen := len(client.inputs) + testTotalRequests(c.requestCnt, inputLen, t) + for i := 1; i < inputLen; i++ { + if *client.inputs[i].NextToken != *c.outputs[i-1].NextToken { + t.Errorf("Expect next input's nextToken to be eaqul to %s, got %s", + *c.outputs[i-1].NextToken, *client.inputs[i].NextToken) + } + } + }) + } +} + +func testCurRequest(maxReqCnt, actualReqCnt int, expectLimit, actualLimit int32, t *testing.T) { + if actualReqCnt > maxReqCnt { + t.Errorf("Paginator calls client more than expected %d times", maxReqCnt) + } + if expectLimit != actualLimit { + t.Errorf("Expect page limit to be %d, got %d", expectLimit, actualLimit) + } +} + +func testTotalRequests(expect, actual int, t *testing.T) { + if actual != expect { + t.Errorf("Expect total request number to be %d, got %d", expect, actual) + } +} diff --git a/service/dynamodb/handwritten_paginators.go b/service/dynamodb/handwritten_paginators.go new file mode 100644 index 00000000000..399b13e7ad9 --- /dev/null +++ b/service/dynamodb/handwritten_paginators.go @@ -0,0 +1,88 @@ +package dynamodb + +import ( + "context" + "fmt" + "github.com/aws/aws-sdk-go-v2/internal/awsutil" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +// BatchGetItemPaginatorOptions is the paginator options for BatchGetItem +type BatchGetItemPaginatorOptions struct { + // Set to true if pagination should stop if the service returns a pagination token + // that matches the most recent token provided to the service. + StopOnDuplicateToken bool +} + +// BatchGetItemPaginator is a paginator for BatchGetItem +type BatchGetItemPaginator struct { + options BatchGetItemPaginatorOptions + client BatchGetItemAPIClient + params *BatchGetItemInput + firstPage bool + requestItems map[string]types.KeysAndAttributes + isTruncated bool +} + +// BatchGetItemAPIClient is a client that implements the BatchGetItem operation. +type BatchGetItemAPIClient interface { + BatchGetItem(context.Context, *BatchGetItemInput, ...func(*Options)) (*BatchGetItemOutput, error) +} + +// NewBatchGetItemPaginator returns a new BatchGetItemPaginator +func NewBatchGetItemPaginator(client BatchGetItemAPIClient, params *BatchGetItemInput, optFns ...func(*BatchGetItemPaginatorOptions)) *BatchGetItemPaginator { + if params == nil { + params = &BatchGetItemInput{} + } + + options := BatchGetItemPaginatorOptions{} + + for _, fn := range optFns { + fn(&options) + } + + return &BatchGetItemPaginator{ + options: options, + client: client, + params: params, + firstPage: true, + requestItems: params.RequestItems, + } +} + +// HasMorePages returns a boolean indicating whether more pages are available +func (p *BatchGetItemPaginator) HasMorePages() bool { + return p.firstPage || p.isTruncated +} + +// NextPage retrieves the next BatchGetItem page. +func (p *BatchGetItemPaginator) NextPage(ctx context.Context, optFns ...func(*Options)) (*BatchGetItemOutput, error) { + if !p.HasMorePages() { + return nil, fmt.Errorf("no more pages available") + } + + params := *p.params + params.RequestItems = p.requestItems + + result, err := p.client.BatchGetItem(ctx, ¶ms, optFns...) + if err != nil { + return nil, err + } + p.firstPage = false + + prevToken := p.requestItems + p.isTruncated = len(result.UnprocessedKeys) != 0 + p.requestItems = nil + if p.isTruncated { + p.requestItems = result.UnprocessedKeys + } + + if p.options.StopOnDuplicateToken && + prevToken != nil && + p.requestItems != nil && + awsutil.DeepEqual(prevToken, p.requestItems) { + p.isTruncated = false + } + + return result, nil +} diff --git a/service/dynamodb/handwritten_paginators_test.go b/service/dynamodb/handwritten_paginators_test.go new file mode 100644 index 00000000000..2557502f725 --- /dev/null +++ b/service/dynamodb/handwritten_paginators_test.go @@ -0,0 +1,139 @@ +package dynamodb + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/internal/awsutil" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "testing" +) + +type mockBatchGetItemClient struct { + inputs []*BatchGetItemInput + outputs []*BatchGetItemOutput + t *testing.T +} + +func (c *mockBatchGetItemClient) BatchGetItem(ctx context.Context, input *BatchGetItemInput, optFns ...func(*Options)) (*BatchGetItemOutput, error) { + c.inputs = append(c.inputs, input) + requestCnt := len(c.inputs) + if len(c.outputs) < requestCnt { + c.t.Errorf("Paginator calls client more than expected %d times", len(c.outputs)) + } + return c.outputs[requestCnt-1], nil +} + +type batchGetItemTestCase struct { + requestCnt int + stopOnDuplicationToken bool + outputs []*BatchGetItemOutput +} + +func TestBatchGetItemPaginator(t *testing.T) { + cases := map[string]batchGetItemTestCase{ + "total count 3": { + requestCnt: 3, + outputs: []*BatchGetItemOutput{ + { + UnprocessedKeys: map[string]types.KeysAndAttributes{ + "key1": { + AttributesToGet: []string{ + "attr1", + }, + ConsistentRead: aws.Bool(true), + ProjectionExpression: aws.String("attr2, attr3"), + }, + }, + }, + { + UnprocessedKeys: map[string]types.KeysAndAttributes{ + "key2": { + AttributesToGet: []string{ + "attr4", + }, + ConsistentRead: aws.Bool(true), + ProjectionExpression: aws.String("attr5, attr6"), + }, + }, + }, + { + UnprocessedKeys: map[string]types.KeysAndAttributes{}, + }, + }, + }, + "total count 2 due to duplicate token": { + requestCnt: 2, + stopOnDuplicationToken: true, + outputs: []*BatchGetItemOutput{ + { + UnprocessedKeys: map[string]types.KeysAndAttributes{ + "key1": { + AttributesToGet: []string{ + "attr1", + }, + ConsistentRead: aws.Bool(true), + ProjectionExpression: aws.String("attr2, attr3"), + }, + }, + }, + { + UnprocessedKeys: map[string]types.KeysAndAttributes{ + "key1": { + AttributesToGet: []string{ + "attr1", + }, + ConsistentRead: aws.Bool(true), + ProjectionExpression: aws.String("attr2, attr3"), + }, + }, + }, + { + UnprocessedKeys: map[string]types.KeysAndAttributes{ + "key2": { + AttributesToGet: []string{ + "attr4", + }, + ConsistentRead: aws.Bool(true), + ProjectionExpression: aws.String("attr5, attr6"), + }, + }, + }, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + client := mockBatchGetItemClient{ + t: t, + outputs: c.outputs, + inputs: []*BatchGetItemInput{}, + } + paginator := NewBatchGetItemPaginator(&client, &BatchGetItemInput{}, func(options *BatchGetItemPaginatorOptions) { + options.StopOnDuplicateToken = c.stopOnDuplicationToken + }) + + for paginator.HasMorePages() { + _, err := paginator.NextPage(context.TODO()) + if err != nil { + t.Errorf("error: %v", err) + } + } + + inputLen := len(client.inputs) + testTotalRequests(c.requestCnt, inputLen, t) + for i := 1; i < inputLen; i++ { + if !awsutil.DeepEqual(client.inputs[i].RequestItems, c.outputs[i-1].UnprocessedKeys) { + t.Errorf("Expect next input's request items to be eaqul to %v, got %v", + c.outputs[i-1].UnprocessedKeys, client.inputs[i].RequestItems) + } + } + }) + } +} + +func testTotalRequests(expect, actual int, t *testing.T) { + if actual != expect { + t.Errorf("Expect total request number to be %d, got %d", expect, actual) + } +} diff --git a/service/kinesis/handwritten_paginators.go b/service/kinesis/handwritten_paginators.go new file mode 100644 index 00000000000..85b103fd6d5 --- /dev/null +++ b/service/kinesis/handwritten_paginators.go @@ -0,0 +1,94 @@ +package kinesis + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" +) + +// DescribeStreamPaginatorOptions is the paginator options for DescribeStream +type DescribeStreamPaginatorOptions struct { + // (Optional) The maximum number of shards to return in a single call + Limit *int32 + + // Set to true if pagination should stop if the service returns a pagination token + // that matches the most recent token provided to the service. + StopOnDuplicateToken bool +} + +// DescribeStreamPaginator is a paginator for DescribeStream +type DescribeStreamPaginator struct { + options DescribeStreamPaginatorOptions + client DescribeStreamAPIClient + params *DescribeStreamInput + firstPage bool + exclusiveStartShardID *string + isTruncated *bool +} + +// NewDescribeStreamPaginator returns a new DescribeStreamPaginator +func NewDescribeStreamPaginator(client DescribeStreamAPIClient, params *DescribeStreamInput, optFns ...func(*DescribeStreamPaginatorOptions)) *DescribeStreamPaginator { + if params == nil { + params = &DescribeStreamInput{} + } + + options := DescribeStreamPaginatorOptions{} + options.Limit = params.Limit + + for _, fn := range optFns { + fn(&options) + } + + return &DescribeStreamPaginator{ + options: options, + client: client, + params: params, + firstPage: true, + exclusiveStartShardID: params.ExclusiveStartShardId, + } +} + +// HasMorePages returns a boolean indicating whether more pages are available +func (p *DescribeStreamPaginator) HasMorePages() bool { + return p.firstPage || *p.isTruncated +} + +// NextPage retrieves the next DescribeStream page. +func (p *DescribeStreamPaginator) NextPage(ctx context.Context, optFns ...func(*Options)) (*DescribeStreamOutput, error) { + if !p.HasMorePages() { + return nil, fmt.Errorf("no more pages available") + } + + params := *p.params + params.ExclusiveStartShardId = p.exclusiveStartShardID + + var limit *int32 + if *p.options.Limit > 0 { + limit = p.options.Limit + } + params.Limit = limit + + result, err := p.client.DescribeStream(ctx, ¶ms, optFns...) + if err != nil { + return nil, err + } + p.firstPage = false + + prevToken := p.exclusiveStartShardID + p.isTruncated = result.StreamDescription.HasMoreShards + p.exclusiveStartShardID = nil + if *result.StreamDescription.HasMoreShards { + shardsLength := len(result.StreamDescription.Shards) + p.exclusiveStartShardID = result.StreamDescription.Shards[shardsLength-1].ShardId + } + + if p.options.StopOnDuplicateToken && + prevToken != nil && + p.exclusiveStartShardID != nil && + *prevToken == *p.exclusiveStartShardID { + p.isTruncated = aws.Bool(false) + } + + return result, nil +} diff --git a/service/kinesis/handwritten_paginators_test.go b/service/kinesis/handwritten_paginators_test.go new file mode 100644 index 00000000000..b5cba484bf0 --- /dev/null +++ b/service/kinesis/handwritten_paginators_test.go @@ -0,0 +1,227 @@ +package kinesis + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" +) + +type mockDescribeStreamClient struct { + outputs []*DescribeStreamOutput + inputs []*DescribeStreamInput + t *testing.T + limit int32 +} + +func (c *mockDescribeStreamClient) DescribeStream(ctx context.Context, input *DescribeStreamInput, optFns ...func(*Options)) (*DescribeStreamOutput, error) { + c.inputs = append(c.inputs, input) + requestCnt := len(c.inputs) + testCurRequest(len(c.outputs), requestCnt, c.limit, *input.Limit, c.t) + return c.outputs[requestCnt-1], nil +} + +type describeStreamTestCase struct { + limit int32 + requestCnt int + stopOnDuplicationToken bool + outputs []*DescribeStreamOutput +} + +func TestDescribeStreamPaginator(t *testing.T) { + cases := map[string]describeStreamTestCase{ + "page limit 3": { + limit: 3, + requestCnt: 3, + outputs: []*DescribeStreamOutput{ + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard1"), + }, + { + ShardId: aws.String("shard2"), + }, + { + ShardId: aws.String("shard3"), + }, + }, + HasMoreShards: aws.Bool(true), + }, + }, + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard4"), + }, + { + ShardId: aws.String("shard5"), + }, + { + ShardId: aws.String("shard6"), + }, + }, + HasMoreShards: aws.Bool(true), + }, + }, + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard7"), + }, + }, + HasMoreShards: aws.Bool(false), + }, + }, + }, + }, + "total count 2 due to no more shards marker": { + limit: 3, + requestCnt: 2, + outputs: []*DescribeStreamOutput{ + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard1"), + }, + { + ShardId: aws.String("shard2"), + }, + { + ShardId: aws.String("shard3"), + }, + }, + HasMoreShards: aws.Bool(true), + }, + }, + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard4"), + }, + { + ShardId: aws.String("shard5"), + }, + { + ShardId: aws.String("shard6"), + }, + }, + HasMoreShards: aws.Bool(false), + }, + }, + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard7"), + }, + }, + HasMoreShards: aws.Bool(false), + }, + }, + }, + }, + "total count 2 due to duplicate shard ID": { + limit: 3, + requestCnt: 2, + stopOnDuplicationToken: true, + outputs: []*DescribeStreamOutput{ + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard1"), + }, + { + ShardId: aws.String("shard2"), + }, + { + ShardId: aws.String("shard3"), + }, + }, + HasMoreShards: aws.Bool(true), + }, + }, + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard4"), + }, + { + ShardId: aws.String("shard5"), + }, + { + ShardId: aws.String("shard3"), + }, + }, + HasMoreShards: aws.Bool(true), + }, + }, + { + StreamDescription: &types.StreamDescription{ + Shards: []types.Shard{ + { + ShardId: aws.String("shard7"), + }, + }, + HasMoreShards: aws.Bool(false), + }, + }, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + client := mockDescribeStreamClient{ + t: t, + outputs: c.outputs, + inputs: []*DescribeStreamInput{}, + limit: c.limit, + } + paginator := NewDescribeStreamPaginator(&client, &DescribeStreamInput{}, func(options *DescribeStreamPaginatorOptions) { + options.Limit = &c.limit + options.StopOnDuplicateToken = c.stopOnDuplicationToken + }) + + for paginator.HasMorePages() { + _, err := paginator.NextPage(context.TODO()) + if err != nil { + t.Errorf("error: %v", err) + } + } + + inputLen := len(client.inputs) + testTotalRequests(c.requestCnt, inputLen, t) + for i := 1; i < inputLen; i++ { + shardsLength := len(c.outputs[i-1].StreamDescription.Shards) + if *client.inputs[i].ExclusiveStartShardId != *c.outputs[i-1].StreamDescription.Shards[shardsLength-1].ShardId { + t.Errorf("Expect next input's exclusive start shard ID to be eaqul to %s, got %s", + *c.outputs[i-1].StreamDescription.Shards[shardsLength-1].ShardId, *client.inputs[i].ExclusiveStartShardId) + } + } + }) + } +} + +func testCurRequest(maxReqCnt, actualReqCnt int, expectLimit, actualLimit int32, t *testing.T) { + if actualReqCnt > maxReqCnt { + t.Errorf("Paginator calls client more than expected %d times", maxReqCnt) + } + if expectLimit != actualLimit { + t.Errorf("Expect page limit to be %d, got %d", expectLimit, actualLimit) + } +} + +func testTotalRequests(expect, actual int, t *testing.T) { + if actual != expect { + t.Errorf("Expect total request number to be %d, got %d", expect, actual) + } +}