diff --git a/pkg/dataplane/streamconsumergroup/statehandler.go b/pkg/dataplane/streamconsumergroup/statehandler.go index a711da8..f670690 100644 --- a/pkg/dataplane/streamconsumergroup/statehandler.go +++ b/pkg/dataplane/streamconsumergroup/statehandler.go @@ -58,7 +58,7 @@ func (sh *stateHandler) start() error { // stops on stop() go func() { if err := sh.refreshStatePeriodically(); err != nil { - if errors.RootCause(err) == errShardRetention { + if errors.Is(errors.RootCause(err), errShardRetention) { // signal that the Handler needs to be restarted sh.logger.ErrorWith("Aborting member", "memberID", sh.member.id) @@ -127,7 +127,7 @@ func (sh *stateHandler) refreshStatePeriodically() error { if err != nil { // in case of shard retention error we want to signal the member to restart - if errors.RootCause(err) == errShardRetention { + if errors.Is(errors.RootCause(err), errShardRetention) { sh.logger.WarnWith("Failed getting state on shard retention (requested by member)", "err", errors.GetErrorStackString(err, 10)) return errors.Wrap(err, "Failed refreshing state by demand") @@ -145,7 +145,7 @@ func (sh *stateHandler) refreshStatePeriodically() error { if err != nil { // in case of shard retention error we want to signal the member to restart - if errors.RootCause(err) == errShardRetention { + if errors.Is(errors.RootCause(err), errShardRetention) { sh.logger.WarnWith("Failed getting state on shard retention (periodic refresh)", "err", errors.GetErrorStackString(err, 10)) return errors.Wrap(err, "Failed refreshing state periodically") diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index a3d0418..8a21087 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -115,7 +115,7 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error) { state, stateMtimeNanoSeconds, stateMtimeSeconds, err := scg.getStateFromPersistency() - if err != nil && errors.Is(err, v3ioerrors.ErrNotFound) { + if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) { return true, errors.Wrap(err, "Failed getting current state from persistency") } @@ -131,7 +131,7 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, modifiedState, err = modifier(state) if err != nil { - if errors.RootCause(err) == errShardRetention { + if errors.Is(errors.RootCause(err), errShardRetention) { // if shard retention failed the member needs to be aborted, so we can stop retrying return false, errors.Wrap(err, "Failed modifying state") @@ -273,7 +273,7 @@ func (scg *streamConsumerGroup) getShardLocationFromPersistency(shardID int, // if the error is that the attribute wasn't found, but the shard was found - seek the shard // according to the configuration - if errors.Is(err, ErrShardSequenceNumberAttributeNotFound) { + if !errors.Is(err, ErrShardSequenceNumberAttributeNotFound) { return "", errors.Wrap(err, "Failed to get shard sequenceNumber from item attributes") }