From 1706a198e90e05725b0ca1a5c6b836d81e068459 Mon Sep 17 00:00:00 2001 From: guy-har <60321938+guy-har@users.noreply.github.com> Date: Tue, 12 Mar 2024 11:30:40 +0200 Subject: [PATCH] Feature/delete sensor (#7523) --- cmd/lakectl/cmd/abuse_random_delete.go | 74 +++++++++++ docs/reference/cli.md | 20 +++ pkg/catalog/catalog.go | 19 ++- pkg/config/config.go | 1 + pkg/graveler/delete_sensor.go | 136 +++++++++++++++++++ pkg/graveler/delete_sensor_test.go | 176 +++++++++++++++++++++++++ pkg/graveler/graveler.go | 40 ++++-- pkg/graveler/graveler_test.go | 2 +- pkg/graveler/testutil/graveler_mock.go | 2 +- 9 files changed, 454 insertions(+), 16 deletions(-) create mode 100644 cmd/lakectl/cmd/abuse_random_delete.go create mode 100644 pkg/graveler/delete_sensor.go create mode 100644 pkg/graveler/delete_sensor_test.go diff --git a/cmd/lakectl/cmd/abuse_random_delete.go b/cmd/lakectl/cmd/abuse_random_delete.go new file mode 100644 index 00000000000..a0c051c45b4 --- /dev/null +++ b/cmd/lakectl/cmd/abuse_random_delete.go @@ -0,0 +1,74 @@ +package cmd + +import ( + "fmt" + "math/rand" + "net/http" + "os" + "syscall" + "time" + + "github.com/spf13/cobra" + "github.com/treeverse/lakefs/pkg/api/apigen" + "github.com/treeverse/lakefs/pkg/api/helpers" + "github.com/treeverse/lakefs/pkg/testutil/stress" +) + +var abuseRandomDeletesCmd = &cobra.Command{ + Use: "random-delete ", + Short: "Delete keys from a file and generate random delete from the source ref for those keys.", + Hidden: false, + Args: cobra.ExactArgs(1), + ValidArgsFunction: ValidArgsRepository, + Run: func(cmd *cobra.Command, args []string) { + u := MustParseRefURI("source ref URI", args[0]) + amount := Must(cmd.Flags().GetInt("amount")) + parallelism := Must(cmd.Flags().GetInt("parallelism")) + fromFile := Must(cmd.Flags().GetString("from-file")) + + fmt.Println("Source ref:", u) + // read the input file + keys, err := readLines(fromFile) + if err != nil { + DieErr(err) + } + fmt.Printf("read a total of %d keys from key file\n", len(keys)) + + generator := stress.NewGenerator("delete", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM)) + + // generate randomly selected keys as input + generator.Setup(func(add stress.GeneratorAddFn) { + for i := 0; i < amount; i++ { + //nolint:gosec + add(keys[rand.Intn(len(keys))]) + } + }) + + // execute the things! + generator.Run(func(input chan string, output chan stress.Result) { + ctx := cmd.Context() + client := getClient() + for work := range input { + start := time.Now() + resp, err := client.DeleteObject(ctx, u.Repository, u.Ref, &apigen.DeleteObjectParams{ + Path: work, + }) + if err == nil && resp.StatusCode != http.StatusOK { + err = helpers.ResponseAsError(resp) + } + output <- stress.Result{ + Error: err, + Took: time.Since(start), + } + } + }) + }, +} + +//nolint:gochecknoinits +func init() { + abuseCmd.AddCommand(abuseRandomDeletesCmd) + abuseRandomDeletesCmd.Flags().String("from-file", "", "read keys from this file (\"-\" for stdin)") + abuseRandomDeletesCmd.Flags().Int("amount", abuseDefaultAmount, "amount of reads to do") + abuseRandomDeletesCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of reads to do in parallel") +} diff --git a/docs/reference/cli.md b/docs/reference/cli.md index b1a59dbefc4..484252adc31 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -229,6 +229,26 @@ lakectl abuse list [flags] +### lakectl abuse random-delete + +Delete keys from a file and generate random delete from the source ref for those keys. + +``` +lakectl abuse random-delete [flags] +``` + +#### Options +{:.no_toc} + +``` + --amount int amount of reads to do (default 1000000) + --from-file string read keys from this file ("-" for stdin) + -h, --help help for random-delete + --parallelism int amount of reads to do in parallel (default 100) +``` + + + ### lakectl abuse random-read Read keys from a file and generate random reads from the source ref for those keys. diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 0c520ceee1c..da871dba6c1 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -222,6 +222,7 @@ type Catalog struct { KVStore kv.Store KVStoreLimited kv.Store addressProvider *ident.HexAddressProvider + deleteSensor *graveler.DeleteSensor UGCPrepareMaxFileSize int64 UGCPrepareInterval time.Duration } @@ -366,7 +367,19 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { protectedBranchesManager := branch.NewProtectionManager(settingManager) stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, cfg.Config.Graveler.BatchDBIOTransactionMarkers, executor) - gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager) + var deleteSensor *graveler.DeleteSensor + if cfg.Config.Graveler.CompactionSensorThreshold > 0 { + cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { + logging.FromContext(ctx).WithFields(logging.Fields{ + "repositoryID": repositoryID, + "branchID": branchID, + "stagingTokenID": stagingTokenID, + "inGrace": inGrace, + }).Info("Delete sensor callback") + } + deleteSensor = graveler.NewDeleteSensor(cfg.Config.Graveler.CompactionSensorThreshold, cb) + } + gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, deleteSensor) // The size of the workPool is determined by the number of workers and the number of desired pending tasks for each worker. workPool := pond.New(sharedWorkers, sharedWorkers*pendingTasksPerWorker, pond.Context(ctx)) @@ -384,6 +397,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { managers: []io.Closer{sstableManager, sstableMetaManager, &ctxCloser{cancelFn}}, KVStoreLimited: storeLimiter, addressProvider: addressProvider, + deleteSensor: deleteSensor, }, nil } @@ -2763,6 +2777,9 @@ func (c *Catalog) Close() error { } } c.workPool.StopAndWaitFor(workersMaxDrainDuration) + if c.deleteSensor != nil { + c.deleteSensor.Close() + } return errs } diff --git a/pkg/config/config.go b/pkg/config/config.go index db29d6e7905..294692599dd 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -288,6 +288,7 @@ type Config struct { Graveler struct { EnsureReadableRootNamespace bool `mapstructure:"ensure_readable_root_namespace"` BatchDBIOTransactionMarkers bool `mapstructure:"batch_dbio_transaction_markers"` + CompactionSensorThreshold int `mapstructure:"compaction_sensor_threshold"` RepositoryCache struct { Size int `mapstructure:"size"` Expiry time.Duration `mapstructure:"expiry"` diff --git a/pkg/graveler/delete_sensor.go b/pkg/graveler/delete_sensor.go new file mode 100644 index 00000000000..74864fc2be8 --- /dev/null +++ b/pkg/graveler/delete_sensor.go @@ -0,0 +1,136 @@ +package graveler + +import ( + "context" + "sync" + + "github.com/treeverse/lakefs/pkg/logging" +) + +const ( + callbackChannelSize = 1000 +) + +type DeleteSensorCB func(repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken, inGrace bool) + +// StagingTokenCounter holds a counter for a specific staging token. +type StagingTokenCounter struct { + StagingTokenID StagingToken + Counter int +} + +// stagingTokenData holds data regarding a staging token. +type stagingTokenData struct { + repositoryID RepositoryID + branchID BranchID + stagingTokenID StagingToken +} + +type DeleteSensor struct { + cb DeleteSensorCB + triggerAt int + callbacks chan stagingTokenData + wg sync.WaitGroup + mutex sync.Mutex + // stopped used as flag that the sensor has stopped. stop processing CountDelete. + stopped bool + branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter +} + +type DeleteSensorOpts func(s *DeleteSensor) + +func WithCBBufferSize(bufferSize int) DeleteSensorOpts { + return func(s *DeleteSensor) { + s.callbacks = make(chan stagingTokenData, bufferSize) + } +} + +func NewDeleteSensor(triggerAt int, cb DeleteSensorCB, opts ...DeleteSensorOpts) *DeleteSensor { + ds := &DeleteSensor{ + cb: cb, + triggerAt: triggerAt, + stopped: false, + mutex: sync.Mutex{}, + branchTombstoneCounter: make(map[RepositoryID]map[BranchID]*StagingTokenCounter), + callbacks: make(chan stagingTokenData, callbackChannelSize), + } + for _, opt := range opts { + opt(ds) + } + ds.wg.Add(1) + go ds.processCallbacks() + return ds +} + +// triggerTombstone triggers a tombstone event for a specific staging token. if stopped, the event is not triggered. +// if the staging token has changed, the counter is reset. if the counter reaches the triggerAt value, the event is triggered. +// in case the callback channel is full, the event is dropped and will be retried in the next call. +func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.stopped { + return + } + if _, ok := s.branchTombstoneCounter[st.repositoryID]; !ok { + s.branchTombstoneCounter[st.repositoryID] = make(map[BranchID]*StagingTokenCounter) + } + stCounter, ok := s.branchTombstoneCounter[st.repositoryID][st.branchID] + if !ok { + stCounter = &StagingTokenCounter{ + StagingTokenID: st.stagingTokenID, + Counter: 1, + } + s.branchTombstoneCounter[st.repositoryID][st.branchID] = stCounter + return + } + // Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur during a commit or compaction. + if stCounter.StagingTokenID != st.stagingTokenID { + stCounter.StagingTokenID = st.stagingTokenID + stCounter.Counter = 1 + return + } + if stCounter.Counter < s.triggerAt { + stCounter.Counter++ + } + if stCounter.Counter >= s.triggerAt { + select { + case s.callbacks <- st: + stCounter.Counter = 0 + default: + logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event") + } + return + } +} + +func (s *DeleteSensor) processCallbacks() { + s.mutex.Lock() + isStopped := s.stopped + s.mutex.Unlock() + defer s.wg.Done() + for cb := range s.callbacks { + s.cb(cb.repositoryID, cb.branchID, cb.stagingTokenID, isStopped) + } +} + +func (s *DeleteSensor) CountDelete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken) { + st := stagingTokenData{ + repositoryID: repositoryID, + branchID: branchID, + stagingTokenID: stagingTokenID, + } + s.triggerTombstone(ctx, st) +} + +func (s *DeleteSensor) Close() { + s.mutex.Lock() + if s.stopped { + s.mutex.Unlock() + return + } + s.stopped = true + s.mutex.Unlock() + + close(s.callbacks) + s.wg.Wait() +} diff --git a/pkg/graveler/delete_sensor_test.go b/pkg/graveler/delete_sensor_test.go new file mode 100644 index 00000000000..c812667ed2e --- /dev/null +++ b/pkg/graveler/delete_sensor_test.go @@ -0,0 +1,176 @@ +package graveler_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/treeverse/lakefs/pkg/graveler" +) + +func TestDeletedSensor(t *testing.T) { + type commandFlow struct { + repositoryID graveler.RepositoryID + branchID graveler.BranchID + stagingTokenID graveler.StagingToken + count int + } + tt := []struct { + name string + triggerAt int + commandFlow []commandFlow + expectedTriggeredBranchesCount map[string]int + }{ + { + name: "trigger after 10", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 2}, + {repositoryID: "repo1", branchID: "branch2", stagingTokenID: "100-example-uuid", count: 10}, + }, + expectedTriggeredBranchesCount: map[string]int{"repo1-branch2": 1}, + }, + { + name: "trigger two", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "555-example-uuid", count: 10}, + {repositoryID: "repo1", branchID: "branch2", stagingTokenID: "555-example-uuid", count: 11}, + }, + expectedTriggeredBranchesCount: map[string]int{"repo1-branch1": 1, "repo1-branch2": 1}, + }, + { + name: "trigger twice after 20", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 20}, + }, + expectedTriggeredBranchesCount: map[string]int{"repo1-branch1": 2}, + }, + { + name: "trigger once before 20", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "444-example-uuid", count: 19}, + }, + expectedTriggeredBranchesCount: map[string]int{"repo1-branch1": 1}, + }, + { + name: "different repos no trigger", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 9}, + {repositoryID: "repo2", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 9}, + {repositoryID: "repo3", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 9}, + }, + }, + { + name: "different repos trigger once", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 8}, + {repositoryID: "repo2", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 8}, + {repositoryID: "repo3", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 8}, + {repositoryID: "repo2", branchID: "branch1", stagingTokenID: "100-example-uuid", count: 8}, + }, + expectedTriggeredBranchesCount: map[string]int{"repo2-branch1": 1}, + }, + { + name: "different staging token id trigger once", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-1", count: 8}, + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-2", count: 8}, + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-3", count: 8}, + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-3", count: 8}, + }, + expectedTriggeredBranchesCount: map[string]int{"repo1-branch1": 1}, + }, + { + name: "different staging token id no trigger", + triggerAt: 10, + commandFlow: []commandFlow{ + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-1", count: 8}, + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-2", count: 8}, + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-3", count: 8}, + {repositoryID: "repo1", branchID: "branch1", stagingTokenID: "uuid-token-4", count: 8}, + }, + }, + } + ctx := context.Background() + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + triggredBranches := make(map[string]int) + cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { + triggredBranches[string(repositoryID)+"-"+string(branchID)]++ + } + sensor := graveler.NewDeleteSensor(tc.triggerAt, cb) + for _, flow := range tc.commandFlow { + for i := 0; i < flow.count; i++ { + sensor.CountDelete(ctx, flow.repositoryID, flow.branchID, flow.stagingTokenID) + } + } + sensor.Close() + if len(triggredBranches) != len(tc.expectedTriggeredBranchesCount) { + t.Errorf("expected %d branches to be triggered, got %d", len(tc.expectedTriggeredBranchesCount), len(triggredBranches)) + } + for branchID, count := range triggredBranches { + if count != tc.expectedTriggeredBranchesCount[branchID] { + t.Errorf("expected %s to be triggered %d times, got %d", branchID, tc.expectedTriggeredBranchesCount[branchID], count) + } + } + }) + } + +} + +func TestDeletedSensor_Close(t *testing.T) { + cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { + } + sensor := graveler.NewDeleteSensor(10, cb) + sensor.Close() +} + +func TestDeletedSensor_CloseTwice(t *testing.T) { + cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { + } + sensor := graveler.NewDeleteSensor(10, cb) + sensor.Close() + sensor.Close() +} + +func TestDeletedSensor_CountAfterClose(t *testing.T) { + cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { + } + sensor := graveler.NewDeleteSensor(10, cb) + sensor.Close() + ctx := context.Background() + sensor.CountDelete(ctx, "repo1", "branch1", "uuid") +} + +func TestDeletedSensor_CheckNonBlocking(t *testing.T) { + closerCall := sync.Once{} + closerCh := make(chan struct{}) + + cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { + if inGrace { + return + } + time.Sleep(5 * time.Second) + closerCall.Do(func() { + close(closerCh) + }) + } + sensor := graveler.NewDeleteSensor(1, cb, graveler.WithCBBufferSize(1)) + ctx := context.Background() + for i := 0; i < 11; i++ { + select { + case <-closerCh: + t.Fatal("should not block") + return + default: + sensor.CountDelete(ctx, "repo1", "branch1", "uuid") + } + } +} diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index 5ce40e9e2dc..051ad7f0c2f 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -1050,9 +1050,10 @@ type Graveler struct { // available. logger logging.Logger BranchUpdateBackOff backoff.BackOff + deleteSensor *DeleteSensor } -func NewGraveler(committedManager CommittedManager, stagingManager StagingManager, refManager RefManager, gcManager GarbageCollectionManager, protectedBranchesManager ProtectedBranchesManager) *Graveler { +func NewGraveler(committedManager CommittedManager, stagingManager StagingManager, refManager RefManager, gcManager GarbageCollectionManager, protectedBranchesManager ProtectedBranchesManager, deleteSensor *DeleteSensor) *Graveler { branchUpdateBackOff := backoff.NewExponentialBackOff() branchUpdateBackOff.MaxInterval = BranchUpdateMaxInterval @@ -1065,6 +1066,7 @@ func NewGraveler(committedManager CommittedManager, stagingManager StagingManage protectedBranchesManager: protectedBranchesManager, garbageCollectionManager: gcManager, logger: logging.ContextUnavailable().WithField("service_name", "graveler_graveler"), + deleteSensor: deleteSensor, } } @@ -1799,7 +1801,7 @@ func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, bra log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "delete"}) err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{}, func(branch *Branch) error { - return g.deleteUnsafe(ctx, repository, branch, key, nil) + return g.deleteUnsafe(ctx, repository, key, nil, BranchRecord{branchID, branch}) }, "delete") return err } @@ -1832,7 +1834,7 @@ func (g *Graveler) DeleteBatch(ctx context.Context, repository *RepositoryRecord err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{}, func(branch *Branch) error { var cachedMetaRangeID MetaRangeID // used to cache the committed branch metarange ID for _, key := range keys { - err := g.deleteUnsafe(ctx, repository, branch, key, &cachedMetaRangeID) + err := g.deleteUnsafe(ctx, repository, key, &cachedMetaRangeID, BranchRecord{branchID, branch}) if err != nil { m = multierror.Append(m, &DeleteError{Key: key, Err: err}) } @@ -1842,9 +1844,9 @@ func (g *Graveler) DeleteBatch(ctx context.Context, repository *RepositoryRecord return err } -func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecord, branch *Branch, key Key, cachedMetaRangeID *MetaRangeID) error { +func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecord, key Key, cachedMetaRangeID *MetaRangeID, branchRecord BranchRecord) error { // First attempt to update on staging token - err := g.StagingManager.Set(ctx, branch.StagingToken, key, nil, true) + err := g.deleteAndNotify(ctx, repository.RepositoryID, branchRecord, key, true) if !errors.Is(err, kv.ErrPredicateFailed) { return err } @@ -1854,7 +1856,7 @@ func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecor if cachedMetaRangeID != nil && *cachedMetaRangeID != "" { metaRangeID = *cachedMetaRangeID } else { - commit, err := g.RefManager.GetCommit(ctx, repository, branch.CommitID) + commit, err := g.RefManager.GetCommit(ctx, repository, branchRecord.Branch.CommitID) if err != nil { return err } @@ -1864,7 +1866,7 @@ func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecor _, err = g.CommittedManager.Get(ctx, repository.StorageNamespace, metaRangeID, key) if err == nil { // found in committed, set tombstone - return g.StagingManager.Set(ctx, branch.StagingToken, key, nil, false) + return g.deleteAndNotify(ctx, repository.RepositoryID, branchRecord, key, false) } if !errors.Is(err, ErrNotFound) { // unknown error @@ -1873,14 +1875,14 @@ func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecor // else key is nowhere to be found - continue to staged // check staging for entry or tombstone - val, err := g.getFromStagingArea(ctx, branch, key) + val, err := g.getFromStagingArea(ctx, branchRecord.Branch, key) if err == nil { if val == nil { // found tombstone in staging, do nothing return nil } // found in staging, set tombstone - return g.StagingManager.Set(ctx, branch.StagingToken, key, nil, false) + return g.deleteAndNotify(ctx, repository.RepositoryID, branchRecord, key, false) } if !errors.Is(err, ErrNotFound) { return fmt.Errorf("reading from staging: %w", err) @@ -2381,10 +2383,22 @@ func (g *Graveler) Reset(ctx context.Context, repository *RepositoryRecord, bran return nil } +// deleteAndNotify deletes a key from the staging area and notifies the delete sensor +func (g *Graveler) deleteAndNotify(ctx context.Context, repositoryID RepositoryID, branchRecord BranchRecord, key Key, requireExists bool) error { + err := g.StagingManager.Set(ctx, branchRecord.Branch.StagingToken, key, nil, requireExists) + if err != nil { + return err + } + if g.deleteSensor != nil { + g.deleteSensor.CountDelete(ctx, repositoryID, branchRecord.BranchID, branchRecord.Branch.StagingToken) + } + return nil +} + // resetKey resets given key on branch // Since we cannot (will not) modify sealed tokens data, we overwrite changes done on entry on a new staging token, effectively reverting it // to the current state in the branch committed data. If entry is not committed return an error -func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, branch *Branch, key Key, stagedValue *Value, st StagingToken) error { +func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, branchID BranchID, branch *Branch, key Key, stagedValue *Value, st StagingToken) error { isCommitted := true committed, err := g.Get(ctx, repository, branch.CommitID.Ref(), key) if err != nil { @@ -2402,7 +2416,7 @@ func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, b // entry not committed and changed in staging area => override with tombstone // If not committed and staging == tombstone => ignore } else if !isCommitted && stagedValue != nil { - return g.StagingManager.Set(ctx, st, key, nil, false) + return g.deleteAndNotify(ctx, repository.RepositoryID, BranchRecord{branchID, branch}, key, false) } return nil @@ -2438,7 +2452,7 @@ func (g *Graveler) ResetKey(ctx context.Context, repository *RepositoryRecord, b return err } - err = g.resetKey(ctx, repository, branch, key, staged, branch.StagingToken) + err = g.resetKey(ctx, repository, branchID, branch, key, staged, branch.StagingToken) if err != nil { if !errors.Is(err, ErrNotFound) { // Not found in staging => ignore return err @@ -2491,7 +2505,7 @@ func (g *Graveler) ResetPrefix(ctx context.Context, repository *RepositoryRecord break } wg.Go(func() error { - err = g.resetKey(ctx, repository, branch, value.Key, value.Value, newStagingToken) + err = g.resetKey(ctx, repository, branchID, branch, value.Key, value.Value, newStagingToken) if err != nil { return err } diff --git a/pkg/graveler/graveler_test.go b/pkg/graveler/graveler_test.go index 5c4af6b08e0..0d9b63dca7c 100644 --- a/pkg/graveler/graveler_test.go +++ b/pkg/graveler/graveler_test.go @@ -148,7 +148,7 @@ func newGraveler(t *testing.T, committedManager graveler.CommittedManager, stagi ) catalog.Store { t.Helper() - return graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager) + return graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, nil) } func TestGraveler_List(t *testing.T) { diff --git a/pkg/graveler/testutil/graveler_mock.go b/pkg/graveler/testutil/graveler_mock.go index a6ccfa8873a..e70203f5df7 100644 --- a/pkg/graveler/testutil/graveler_mock.go +++ b/pkg/graveler/testutil/graveler_mock.go @@ -33,7 +33,7 @@ func InitGravelerTest(t *testing.T) *GravelerTest { KVStore: kvmock.NewMockStore(ctrl), } - test.Sut = graveler.NewGraveler(test.CommittedManager, test.StagingManager, test.RefManager, test.GarbageCollectionManager, test.ProtectedBranchesManager) + test.Sut = graveler.NewGraveler(test.CommittedManager, test.StagingManager, test.RefManager, test.GarbageCollectionManager, test.ProtectedBranchesManager, nil) return test }