diff --git a/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go b/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go index 44b6e6f..a22b671 100644 --- a/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go +++ b/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go @@ -72,6 +72,10 @@ func (snh *sequenceNumberHandler) stop() error { } func (snh *sequenceNumberHandler) markShardSequenceNumber(shardID int, sequenceNumber uint64) error { + err := snh.member.streamConsumerGroup.checkShardExists(shardID, sequenceNumber) + if err != nil { + return errors.Wrapf(err, "Failed checking shard exists. Current sequenceNumber %v", snh.markedShardSequenceNumbers[shardID]) + } snh.markedShardSequenceNumbers[shardID] = sequenceNumber return nil diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index 8a21087..f0a0642 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -367,10 +367,36 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int, Attributes: map[string]interface{}{ scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber, }, + Condition: "__obj_type == 3", }) return err } +func (scg *streamConsumerGroup) checkShardExists(shardID int, sequenceNumber uint64) error { + shardPath, err := scg.getShardPath(shardID) + if err != nil { + return errors.Wrapf(err, "Failed getting shard path: %v", shardID) + } + + response, err := scg.container.GetItemSync(&v3io.GetItemInput{ + Path: shardPath, + AttributeNames: []string{ + "__obj_type", + }, + }) + defer response.Release() + if err != nil { + return errors.Wrapf(err, "Updating shard counter %v to %v failed.Failed to fetch shard object type", shardPath, sequenceNumber) + } + getItemOutput := response.Output.(*v3io.GetItemOutput) + + objType, err2 := getItemOutput.Item.GetFieldInt("__obj_type") + if err2 != nil || objType != 3 { + return errors.Wrapf(err2, "Updating shard counter %v to %v failed. Shard is a regular file (%v)", shardPath, sequenceNumber, objType) + } + return nil +} + // returns true if the states are equal, ignoring heartbeat times func (scg *streamConsumerGroup) statesEqual(state0 *State, state1 *State) bool { if state0.SchemasVersion != state1.SchemasVersion {