Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support timeout (defaults to 90s) #71

Open
wants to merge 1 commit into
base: 0.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time

func (c *claim) fetchRecordBatch(location string) (string, error) {
getRecordsInput := v3io.GetRecordsInput{
Path: path.Join(c.member.streamConsumerGroup.streamPath, strconv.Itoa(c.shardID)),
Location: location,
Limit: c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.NumRecordsInBatch,
DataPlaneInput: *c.member.streamConsumerGroup.GetDataplaneInput(),
Path: path.Join(c.member.streamConsumerGroup.streamPath, strconv.Itoa(c.shardID)),
Location: location,
Limit: c.member.streamConsumerGroup.config.Claim.RecordBatchFetch.NumRecordsInBatch,
}

response, err := c.member.streamConsumerGroup.container.GetRecordsSync(&getRecordsInput)
Expand Down Expand Up @@ -254,8 +255,9 @@ func (c *claim) recoverLocationAfterIllegalLocationErr() (string, error) {
}

location, err := c.member.streamConsumerGroup.getShardLocationWithSeek(&v3io.SeekShardInput{
Path: streamPath,
Type: v3io.SeekShardInputTypeEarliest,
DataPlaneInput: *c.member.streamConsumerGroup.GetDataplaneInput(),
Path: streamPath,
Type: v3io.SeekShardInputTypeEarliest,
})

if err != nil {
Expand Down
35 changes: 30 additions & 5 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"path"
"strconv"
"time"

"github.com/v3io/v3io-go/pkg/common"
"github.com/v3io/v3io-go/pkg/dataplane"
Expand All @@ -24,14 +25,16 @@ type streamConsumerGroup struct {
streamPath string
maxReplicas int
totalNumShards int
dataplaneInput v3io.DataPlaneInput
}

func NewStreamConsumerGroup(parentLogger logger.Logger,
name string,
config *Config,
container v3io.Container,
streamPath string,
maxReplicas int) (StreamConsumerGroup, error) {
maxReplicas int,
dataplaneTimeout *time.Duration) (StreamConsumerGroup, error) {
var err error

if config == nil {
Expand All @@ -47,12 +50,23 @@ func NewStreamConsumerGroup(parentLogger logger.Logger,
maxReplicas: maxReplicas,
}

if dataplaneTimeout != nil {
newStreamConsumerGroup.dataplaneInput.Timeout = *dataplaneTimeout
}

// get the total number of shards for this stream
newStreamConsumerGroup.totalNumShards, err = newStreamConsumerGroup.getTotalNumberOfShards()
if err != nil {
return nil, errors.Wrap(err, "Failed to get total number of shards")
}

newStreamConsumerGroup.logger.DebugWith("StreamConsumerGroup created",
"name", name,
"config", config,
"path", streamPath,
"maxReplicas", maxReplicas,
"dataplaneTimeout", newStreamConsumerGroup.dataplaneInput.Timeout)

return &newStreamConsumerGroup, nil
}

Expand All @@ -70,13 +84,18 @@ func (scg *streamConsumerGroup) GetNumShards() (int, error) {
return scg.totalNumShards, nil
}

func (scg *streamConsumerGroup) GetDataplaneInput() *v3io.DataPlaneInput {
return &scg.dataplaneInput
}

func (scg *streamConsumerGroup) getShardPath(shardID int) (string, error) {
return path.Join(scg.streamPath, strconv.Itoa(shardID)), nil
}

func (scg *streamConsumerGroup) getTotalNumberOfShards() (int, error) {
response, err := scg.container.DescribeStreamSync(&v3io.DescribeStreamInput{
Path: scg.streamPath,
DataPlaneInput: *scg.GetDataplaneInput(),
Path: scg.streamPath,
})
if err != nil {
return 0, errors.Wrapf(err, "Failed describing stream: %s", scg.streamPath)
Expand Down Expand Up @@ -148,8 +167,9 @@ func (scg *streamConsumerGroup) setStateInPersistency(state *State, mtime *int)
}

_, err = scg.container.UpdateItemSync(&v3io.UpdateItemInput{
Path: scg.getStateFilePath(),
Condition: condition,
DataPlaneInput: *scg.GetDataplaneInput(),
Path: scg.getStateFilePath(),
Condition: condition,
Attributes: map[string]interface{}{
stateContentsAttributeKey: string(stateContents),
},
Expand All @@ -163,6 +183,7 @@ func (scg *streamConsumerGroup) setStateInPersistency(state *State, mtime *int)

func (scg *streamConsumerGroup) getStateFromPersistency() (*State, *int, error) {
response, err := scg.container.GetItemSync(&v3io.GetItemInput{
DataPlaneInput: *scg.GetDataplaneInput(),
Path: scg.getStateFilePath(),
AttributeNames: []string{"__mtime_nsecs", stateContentsAttributeKey},
})
Expand Down Expand Up @@ -211,7 +232,9 @@ func (scg *streamConsumerGroup) getStateFilePath() string {
func (scg *streamConsumerGroup) getShardLocationFromPersistency(shardID int) (string, error) {
scg.logger.DebugWith("Getting shard sequenceNumber from persistency", "shardID", shardID)

seekShardInput := v3io.SeekShardInput{}
seekShardInput := v3io.SeekShardInput{
DataPlaneInput: *scg.GetDataplaneInput(),
}

// get the shard sequenceNumber from the item
shardSequenceNumber, err := scg.getShardSequenceNumberFromPersistency(shardID)
Expand Down Expand Up @@ -247,6 +270,7 @@ func (scg *streamConsumerGroup) getShardSequenceNumberFromPersistency(shardID in
}

response, err := scg.container.GetItemSync(&v3io.GetItemInput{
DataPlaneInput: *scg.GetDataplaneInput(),
Path: shardPath,
AttributeNames: []string{scg.getShardCommittedSequenceNumberAttributeName()},
})
Expand Down Expand Up @@ -309,6 +333,7 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int,
}

_, err = scg.container.UpdateItemSync(&v3io.UpdateItemInput{
DataPlaneInput: *scg.GetDataplaneInput(),
Path: shardPath,
Attributes: map[string]interface{}{
scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber,
Expand Down
5 changes: 4 additions & 1 deletion pkg/dataplane/test/streamconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,15 @@ func (suite *streamConsumerGroupTestSuite) createStreamConsumerGroup(maxReplicas
streamConsumerGroupConfig.Claim.RecordBatchFetch.NumRecordsInBatch = 10
streamConsumerGroupConfig.Claim.RecordBatchFetch.Interval = 50 * time.Millisecond

timeout := 2000 * time.Millisecond

streamConsumerGroup, err := streamconsumergroup.NewStreamConsumerGroup(suite.logger,
consumerGroupName,
streamConsumerGroupConfig,
suite.container,
suite.streamPath,
maxReplicas)
maxReplicas,
&timeout)
suite.Require().NoError(err, "Failed creating stream consumer group")

return streamConsumerGroup
Expand Down