From e3d8fa3b7f12f3765e73ff801289ba5d822b84bb Mon Sep 17 00:00:00 2001 From: Eran Duchan Date: Sun, 6 Sep 2020 10:57:06 +0300 Subject: [PATCH] timeout --- pkg/dataplane/streamconsumergroup/claim.go | 12 ++++--- .../streamconsumergroup.go | 35 ++++++++++++++++--- .../test/streamconsumergroup_test.go | 5 ++- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index bfeed2e..9905f10 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -142,9 +142,10 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time func (c *claim) fetchRecordBatch(location string) (string, error) { getRecordsInput := v3io.GetRecordsInput{ - Path: path.Join(c.member.streamConsumerGroup.streamPath, strconv.Itoa(c.shardID)), - Location: location, - Limit: c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.NumRecordsInBatch, + DataPlaneInput: *c.member.streamConsumerGroup.GetDataplaneInput(), + Path: path.Join(c.member.streamConsumerGroup.streamPath, strconv.Itoa(c.shardID)), + Location: location, + Limit: c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.NumRecordsInBatch, } response, err := c.member.streamConsumerGroup.container.GetRecordsSync(&getRecordsInput) @@ -254,8 +255,9 @@ func (c *claim) recoverLocationAfterIllegalLocationErr() (string, error) { } location, err := c.member.streamConsumerGroup.getShardLocationWithSeek(&v3io.SeekShardInput{ - Path: streamPath, - Type: v3io.SeekShardInputTypeEarliest, + DataPlaneInput: *c.member.streamConsumerGroup.GetDataplaneInput(), + Path: streamPath, + Type: v3io.SeekShardInputTypeEarliest, }) if err != nil { diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index c1733a0..f3e911e 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -7,6 +7,7 @@ import ( "net/http" "path" "strconv" + "time" "github.com/v3io/v3io-go/pkg/common" "github.com/v3io/v3io-go/pkg/dataplane" @@ -24,6 +25,7 @@ type streamConsumerGroup struct { streamPath string maxReplicas int totalNumShards int + dataplaneInput v3io.DataPlaneInput } func NewStreamConsumerGroup(parentLogger logger.Logger, @@ -31,7 +33,8 @@ func NewStreamConsumerGroup(parentLogger logger.Logger, config *Config, container v3io.Container, streamPath string, - maxReplicas int) (StreamConsumerGroup, error) { + maxReplicas int, + dataplaneTimeout *time.Duration) (StreamConsumerGroup, error) { var err error if config == nil { @@ -47,12 +50,23 @@ func NewStreamConsumerGroup(parentLogger logger.Logger, maxReplicas: maxReplicas, } + if dataplaneTimeout != nil { + newStreamConsumerGroup.dataplaneInput.Timeout = *dataplaneTimeout + } + // get the total number of shards for this stream newStreamConsumerGroup.totalNumShards, err = newStreamConsumerGroup.getTotalNumberOfShards() if err != nil { return nil, errors.Wrap(err, "Failed to get total number of shards") } + newStreamConsumerGroup.logger.DebugWith("StreamConsumerGroup created", + "name", name, + "config", config, + "path", streamPath, + "maxReplicas", maxReplicas, + "dataplaneTimeout", newStreamConsumerGroup.dataplaneInput.Timeout) + return &newStreamConsumerGroup, nil } @@ -70,13 +84,18 @@ func (scg *streamConsumerGroup) GetNumShards() (int, error) { return scg.totalNumShards, nil } +func (scg *streamConsumerGroup) GetDataplaneInput() *v3io.DataPlaneInput { + return &scg.dataplaneInput +} + func (scg *streamConsumerGroup) getShardPath(shardID int) (string, error) { return path.Join(scg.streamPath, strconv.Itoa(shardID)), nil } func (scg *streamConsumerGroup) getTotalNumberOfShards() (int, error) { response, err := scg.container.DescribeStreamSync(&v3io.DescribeStreamInput{ - Path: scg.streamPath, + DataPlaneInput: *scg.GetDataplaneInput(), + Path: scg.streamPath, }) if err != nil { return 0, errors.Wrapf(err, "Failed describing stream: %s", scg.streamPath) @@ -148,8 +167,9 @@ func (scg *streamConsumerGroup) setStateInPersistency(state *State, mtime *int) } _, err = scg.container.UpdateItemSync(&v3io.UpdateItemInput{ - Path: scg.getStateFilePath(), - Condition: condition, + DataPlaneInput: *scg.GetDataplaneInput(), + Path: scg.getStateFilePath(), + Condition: condition, Attributes: map[string]interface{}{ stateContentsAttributeKey: string(stateContents), }, @@ -163,6 +183,7 @@ func (scg *streamConsumerGroup) setStateInPersistency(state *State, mtime *int) func (scg *streamConsumerGroup) getStateFromPersistency() (*State, *int, error) { response, err := scg.container.GetItemSync(&v3io.GetItemInput{ + DataPlaneInput: *scg.GetDataplaneInput(), Path: scg.getStateFilePath(), AttributeNames: []string{"__mtime_nsecs", stateContentsAttributeKey}, }) @@ -211,7 +232,9 @@ func (scg *streamConsumerGroup) getStateFilePath() string { func (scg *streamConsumerGroup) getShardLocationFromPersistency(shardID int) (string, error) { scg.logger.DebugWith("Getting shard sequenceNumber from persistency", "shardID", shardID) - seekShardInput := v3io.SeekShardInput{} + seekShardInput := v3io.SeekShardInput{ + DataPlaneInput: *scg.GetDataplaneInput(), + } // get the shard sequenceNumber from the item shardSequenceNumber, err := scg.getShardSequenceNumberFromPersistency(shardID) @@ -247,6 +270,7 @@ func (scg *streamConsumerGroup) getShardSequenceNumberFromPersistency(shardID in } response, err := scg.container.GetItemSync(&v3io.GetItemInput{ + DataPlaneInput: *scg.GetDataplaneInput(), Path: shardPath, AttributeNames: []string{scg.getShardCommittedSequenceNumberAttributeName()}, }) @@ -309,6 +333,7 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int, } _, err = scg.container.UpdateItemSync(&v3io.UpdateItemInput{ + DataPlaneInput: *scg.GetDataplaneInput(), Path: shardPath, Attributes: map[string]interface{}{ scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber, diff --git a/pkg/dataplane/test/streamconsumergroup_test.go b/pkg/dataplane/test/streamconsumergroup_test.go index 08de08d..bf1812e 100644 --- a/pkg/dataplane/test/streamconsumergroup_test.go +++ b/pkg/dataplane/test/streamconsumergroup_test.go @@ -129,12 +129,15 @@ func (suite *streamConsumerGroupTestSuite) createStreamConsumerGroup(maxReplicas streamConsumerGroupConfig.Claim.RecordBatchFetch.NumRecordsInBatch = 10 streamConsumerGroupConfig.Claim.RecordBatchFetch.Interval = 50 * time.Millisecond + timeout := 2000 * time.Millisecond + streamConsumerGroup, err := streamconsumergroup.NewStreamConsumerGroup(suite.logger, consumerGroupName, streamConsumerGroupConfig, suite.container, suite.streamPath, - maxReplicas) + maxReplicas, + &timeout) suite.Require().NoError(err, "Failed creating stream consumer group") return streamConsumerGroup