diff --git a/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go b/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go index a22b671..44b6e6f 100644 --- a/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go +++ b/pkg/dataplane/streamconsumergroup/sequencenumberhandler.go @@ -72,10 +72,6 @@ func (snh *sequenceNumberHandler) stop() error { } func (snh *sequenceNumberHandler) markShardSequenceNumber(shardID int, sequenceNumber uint64) error { - err := snh.member.streamConsumerGroup.checkShardExists(shardID, sequenceNumber) - if err != nil { - return errors.Wrapf(err, "Failed checking shard exists. Current sequenceNumber %v", snh.markedShardSequenceNumbers[shardID]) - } snh.markedShardSequenceNumbers[shardID] = sequenceNumber return nil diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index f0a0642..8a21087 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -367,36 +367,10 @@ func (scg *streamConsumerGroup) setShardSequenceNumberInPersistency(shardID int, Attributes: map[string]interface{}{ scg.getShardCommittedSequenceNumberAttributeName(): sequenceNumber, }, - Condition: "__obj_type == 3", }) return err } -func (scg *streamConsumerGroup) checkShardExists(shardID int, sequenceNumber uint64) error { - shardPath, err := scg.getShardPath(shardID) - if err != nil { - return errors.Wrapf(err, "Failed getting shard path: %v", shardID) - } - - response, err := scg.container.GetItemSync(&v3io.GetItemInput{ - Path: shardPath, - AttributeNames: []string{ - "__obj_type", - }, - }) - defer response.Release() - if err != nil { - return errors.Wrapf(err, "Updating shard counter %v to %v failed.Failed to fetch shard object type", shardPath, sequenceNumber) - } - getItemOutput := response.Output.(*v3io.GetItemOutput) - - objType, err2 := getItemOutput.Item.GetFieldInt("__obj_type") - if err2 != nil || objType != 3 { - return errors.Wrapf(err2, "Updating shard counter %v to %v failed. Shard is a regular file (%v)", shardPath, sequenceNumber, objType) - } - return nil -} - // returns true if the states are equal, ignoring heartbeat times func (scg *streamConsumerGroup) statesEqual(state0 *State, state1 *State) bool { if state0.SchemasVersion != state1.SchemasVersion {