Skip to content

Commit

Permalink
Feature/delete sensor (#7523)
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har authored Mar 12, 2024
1 parent 1c38573 commit 1706a19
Show file tree
Hide file tree
Showing 9 changed files with 454 additions and 16 deletions.
74 changes: 74 additions & 0 deletions cmd/lakectl/cmd/abuse_random_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cmd

import (
"fmt"
"math/rand"
"net/http"
"os"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/testutil/stress"
)

var abuseRandomDeletesCmd = &cobra.Command{
Use: "random-delete <source ref URI>",
Short: "Delete keys from a file and generate random delete from the source ref for those keys.",
Hidden: false,
Args: cobra.ExactArgs(1),
ValidArgsFunction: ValidArgsRepository,
Run: func(cmd *cobra.Command, args []string) {
u := MustParseRefURI("source ref URI", args[0])
amount := Must(cmd.Flags().GetInt("amount"))
parallelism := Must(cmd.Flags().GetInt("parallelism"))
fromFile := Must(cmd.Flags().GetString("from-file"))

fmt.Println("Source ref:", u)
// read the input file
keys, err := readLines(fromFile)
if err != nil {
DieErr(err)
}
fmt.Printf("read a total of %d keys from key file\n", len(keys))

generator := stress.NewGenerator("delete", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM))

// generate randomly selected keys as input
generator.Setup(func(add stress.GeneratorAddFn) {
for i := 0; i < amount; i++ {
//nolint:gosec
add(keys[rand.Intn(len(keys))])
}
})

// execute the things!
generator.Run(func(input chan string, output chan stress.Result) {
ctx := cmd.Context()
client := getClient()
for work := range input {
start := time.Now()
resp, err := client.DeleteObject(ctx, u.Repository, u.Ref, &apigen.DeleteObjectParams{
Path: work,
})
if err == nil && resp.StatusCode != http.StatusOK {
err = helpers.ResponseAsError(resp)
}
output <- stress.Result{
Error: err,
Took: time.Since(start),
}
}
})
},
}

//nolint:gochecknoinits
func init() {
abuseCmd.AddCommand(abuseRandomDeletesCmd)
abuseRandomDeletesCmd.Flags().String("from-file", "", "read keys from this file (\"-\" for stdin)")
abuseRandomDeletesCmd.Flags().Int("amount", abuseDefaultAmount, "amount of reads to do")
abuseRandomDeletesCmd.Flags().Int("parallelism", abuseDefaultParallelism, "amount of reads to do in parallel")
}
20 changes: 20 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ lakectl abuse list <source ref URI> [flags]



### lakectl abuse random-delete

Delete keys from a file and generate random delete from the source ref for those keys.

```
lakectl abuse random-delete <source ref URI> [flags]
```

#### Options
{:.no_toc}

```
--amount int amount of reads to do (default 1000000)
--from-file string read keys from this file ("-" for stdin)
-h, --help help for random-delete
--parallelism int amount of reads to do in parallel (default 100)
```



### lakectl abuse random-read

Read keys from a file and generate random reads from the source ref for those keys.
Expand Down
19 changes: 18 additions & 1 deletion pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type Catalog struct {
KVStore kv.Store
KVStoreLimited kv.Store
addressProvider *ident.HexAddressProvider
deleteSensor *graveler.DeleteSensor
UGCPrepareMaxFileSize int64
UGCPrepareInterval time.Duration
}
Expand Down Expand Up @@ -366,7 +367,19 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {

protectedBranchesManager := branch.NewProtectionManager(settingManager)
stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, cfg.Config.Graveler.BatchDBIOTransactionMarkers, executor)
gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager)
var deleteSensor *graveler.DeleteSensor
if cfg.Config.Graveler.CompactionSensorThreshold > 0 {
cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) {
logging.FromContext(ctx).WithFields(logging.Fields{
"repositoryID": repositoryID,
"branchID": branchID,
"stagingTokenID": stagingTokenID,
"inGrace": inGrace,
}).Info("Delete sensor callback")
}
deleteSensor = graveler.NewDeleteSensor(cfg.Config.Graveler.CompactionSensorThreshold, cb)
}
gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, deleteSensor)

// The size of the workPool is determined by the number of workers and the number of desired pending tasks for each worker.
workPool := pond.New(sharedWorkers, sharedWorkers*pendingTasksPerWorker, pond.Context(ctx))
Expand All @@ -384,6 +397,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
managers: []io.Closer{sstableManager, sstableMetaManager, &ctxCloser{cancelFn}},
KVStoreLimited: storeLimiter,
addressProvider: addressProvider,
deleteSensor: deleteSensor,
}, nil
}

Expand Down Expand Up @@ -2763,6 +2777,9 @@ func (c *Catalog) Close() error {
}
}
c.workPool.StopAndWaitFor(workersMaxDrainDuration)
if c.deleteSensor != nil {
c.deleteSensor.Close()
}
return errs
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ type Config struct {
Graveler struct {
EnsureReadableRootNamespace bool `mapstructure:"ensure_readable_root_namespace"`
BatchDBIOTransactionMarkers bool `mapstructure:"batch_dbio_transaction_markers"`
CompactionSensorThreshold int `mapstructure:"compaction_sensor_threshold"`
RepositoryCache struct {
Size int `mapstructure:"size"`
Expiry time.Duration `mapstructure:"expiry"`
Expand Down
136 changes: 136 additions & 0 deletions pkg/graveler/delete_sensor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package graveler

import (
"context"
"sync"

"github.com/treeverse/lakefs/pkg/logging"
)

const (
callbackChannelSize = 1000
)

type DeleteSensorCB func(repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken, inGrace bool)

// StagingTokenCounter holds a counter for a specific staging token.
type StagingTokenCounter struct {
StagingTokenID StagingToken
Counter int
}

// stagingTokenData holds data regarding a staging token.
type stagingTokenData struct {
repositoryID RepositoryID
branchID BranchID
stagingTokenID StagingToken
}

type DeleteSensor struct {
cb DeleteSensorCB
triggerAt int
callbacks chan stagingTokenData
wg sync.WaitGroup
mutex sync.Mutex
// stopped used as flag that the sensor has stopped. stop processing CountDelete.
stopped bool
branchTombstoneCounter map[RepositoryID]map[BranchID]*StagingTokenCounter
}

type DeleteSensorOpts func(s *DeleteSensor)

func WithCBBufferSize(bufferSize int) DeleteSensorOpts {
return func(s *DeleteSensor) {
s.callbacks = make(chan stagingTokenData, bufferSize)
}
}

func NewDeleteSensor(triggerAt int, cb DeleteSensorCB, opts ...DeleteSensorOpts) *DeleteSensor {
ds := &DeleteSensor{
cb: cb,
triggerAt: triggerAt,
stopped: false,
mutex: sync.Mutex{},
branchTombstoneCounter: make(map[RepositoryID]map[BranchID]*StagingTokenCounter),
callbacks: make(chan stagingTokenData, callbackChannelSize),
}
for _, opt := range opts {
opt(ds)
}
ds.wg.Add(1)
go ds.processCallbacks()
return ds
}

// triggerTombstone triggers a tombstone event for a specific staging token. if stopped, the event is not triggered.
// if the staging token has changed, the counter is reset. if the counter reaches the triggerAt value, the event is triggered.
// in case the callback channel is full, the event is dropped and will be retried in the next call.
func (s *DeleteSensor) triggerTombstone(ctx context.Context, st stagingTokenData) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.stopped {
return
}
if _, ok := s.branchTombstoneCounter[st.repositoryID]; !ok {
s.branchTombstoneCounter[st.repositoryID] = make(map[BranchID]*StagingTokenCounter)
}
stCounter, ok := s.branchTombstoneCounter[st.repositoryID][st.branchID]
if !ok {
stCounter = &StagingTokenCounter{
StagingTokenID: st.stagingTokenID,
Counter: 1,
}
s.branchTombstoneCounter[st.repositoryID][st.branchID] = stCounter
return
}
// Reset the counter if the staging token has changed, under the assumption that staging tokens are updated only during sealing processes, which occur during a commit or compaction.
if stCounter.StagingTokenID != st.stagingTokenID {
stCounter.StagingTokenID = st.stagingTokenID
stCounter.Counter = 1
return
}
if stCounter.Counter < s.triggerAt {
stCounter.Counter++
}
if stCounter.Counter >= s.triggerAt {
select {
case s.callbacks <- st:
stCounter.Counter = 0
default:
logging.FromContext(ctx).WithFields(logging.Fields{"repositoryID": st.repositoryID, "branchID": st.branchID, "stagingTokenID": st.stagingTokenID}).Info("delete sensor callback channel is full, dropping delete event")
}
return
}
}

func (s *DeleteSensor) processCallbacks() {
s.mutex.Lock()
isStopped := s.stopped
s.mutex.Unlock()
defer s.wg.Done()
for cb := range s.callbacks {
s.cb(cb.repositoryID, cb.branchID, cb.stagingTokenID, isStopped)
}
}

func (s *DeleteSensor) CountDelete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, stagingTokenID StagingToken) {
st := stagingTokenData{
repositoryID: repositoryID,
branchID: branchID,
stagingTokenID: stagingTokenID,
}
s.triggerTombstone(ctx, st)
}

func (s *DeleteSensor) Close() {
s.mutex.Lock()
if s.stopped {
s.mutex.Unlock()
return
}
s.stopped = true
s.mutex.Unlock()

close(s.callbacks)
s.wg.Wait()
}
Loading

0 comments on commit 1706a19

Please sign in to comment.