Skip to content

Commit

Permalink
Experiment: Improve concurrent merge performance by weakly owning bra…
Browse files Browse the repository at this point in the history
…nch updates (#8268)

* Add "WeakOwner", a KV-based weak ownership mechanism

Weak ownership is a best-effort lock that occasionally fails.  This one
can fail when a goroutine is delayed for a long interval.

This is fine if the calling code relies on ownership for performance but not
for correctness.  E.g. merges and commits.

* Obtain weak ownership of branch on all `BranchUpdate` operations

This includes merges.  Only one concurrent `BranchUpdate` operation can
succeed, so unless many long-lived such operations can fail there is little
point in running multiple concurrent updates.

* Better default parameters for branch ownership

* Add lakefs abuse merge command

Performs multiple small merges in parallel.

* Make branch weak ownership configurable

This shows results, even on local!

When I run lakefs (by default weak ownership is OFF) I get 6.6% errors with
concurrency 50.  Rate is <50/s.  Also the long tail is _extremely_ long.

When I switch weak ownership ON, using the default parameters, I get **0**
errors with concurrency 50.  Rate is about the same, except that the
tail (when load drops) is _short_.

See the difference [here][merge-abuse-speed-chart]: it's faster _and_
returns 0 errors.  The distribution of actual successful merge times is
somewhat slower - possibly because of the time to lock, possibly because of
the fact that errors in the really slow cases cause those slow cases to be
dropped.

Finally, note that because we do not queue, some merges take a *long* time
under sustained load.  We could improve weak ownership to hold an actual
queue of work.  This would make merges _fair_: merges will occur roughly in
order of request arrival.

==== Weak ownership OFF ====

``sh
❯ go run ./cmd/lakectl abuse merge --amount 1000 --parallelism 50 lakefs://abuse/main
Source branch: lakefs://abuse/main
merge - completed: 34, errors: 0, current rate: 33.81 done/second
merge - completed: 80, errors: 0, current rate: 45.98 done/second
merge - completed: 128, errors: 0, current rate: 48.02 done/second
merge - completed: 177, errors: 0, current rate: 49.03 done/second
merge - completed: 222, errors: 0, current rate: 44.97 done/second
merge - completed: 265, errors: 3, current rate: 43.03 done/second
merge - completed: 308, errors: 9, current rate: 42.97 done/second
merge - completed: 357, errors: 15, current rate: 49.01 done/second
merge - completed: 406, errors: 21, current rate: 49.03 done/second
merge - completed: 451, errors: 22, current rate: 44.97 done/second
merge - completed: 499, errors: 29, current rate: 48.01 done/second
merge - completed: 545, errors: 30, current rate: 46.01 done/second
merge - completed: 585, errors: 31, current rate: 39.97 done/second
merge - completed: 632, errors: 33, current rate: 47.04 done/second
merge - completed: 679, errors: 37, current rate: 47.00 done/second
merge - completed: 728, errors: 46, current rate: 48.96 done/second
merge - completed: 768, errors: 49, current rate: 40.04 done/second
merge - completed: 808, errors: 53, current rate: 39.98 done/second
merge - completed: 854, errors: 57, current rate: 45.99 done/second
merge - completed: 891, errors: 58, current rate: 37.00 done/second
merge - completed: 935, errors: 64, current rate: 44.00 done/second
merge - completed: 972, errors: 66, current rate: 36.98 done/second
merge - completed: 990, errors: 66, current rate: 18.00 done/second
merge - completed: 995, errors: 66, current rate: 5.00 done/second
merge - completed: 996, errors: 66, current rate: 1.00 done/second
merge - completed: 998, errors: 66, current rate: 2.00 done/second
merge - completed: 999, errors: 66, current rate: 1.00 done/second
merge - completed: 999, errors: 66, current rate: 0.00 done/second
merge - completed: 999, errors: 66, current rate: 0.00 done/second
completed: 1000, errors: 66, current rate: 5.27 done/second

Histogram (ms):
1       0
2       0
5       0
7       0
10      0
15      0
25      0
50      0
75      601
100     671
250     672
350     672
500     696
750     740
1000    765
5000    896
min     54
max     12022
total   934
```

==== Weak ownership ON ====

```sh
❯ go run ./cmd/lakectl abuse merge --amount 1000 --parallelism 50 lakefs://abuse/main
Source branch: lakefs://abuse/main
merge - completed: 36, errors: 0, current rate: 35.23 done/second
merge - completed: 86, errors: 0, current rate: 49.98 done/second
merge - completed: 136, errors: 0, current rate: 50.03 done/second
merge - completed: 185, errors: 0, current rate: 48.99 done/second
merge - completed: 236, errors: 0, current rate: 51.02 done/second
merge - completed: 286, errors: 0, current rate: 49.99 done/second
merge - completed: 337, errors: 0, current rate: 50.97 done/second
merge - completed: 390, errors: 0, current rate: 53.03 done/second
merge - completed: 438, errors: 0, current rate: 48.01 done/second
merge - completed: 487, errors: 0, current rate: 49.00 done/second
merge - completed: 534, errors: 0, current rate: 46.98 done/second
merge - completed: 581, errors: 0, current rate: 46.99 done/second
merge - completed: 632, errors: 0, current rate: 51.00 done/second
merge - completed: 680, errors: 0, current rate: 48.04 done/second
merge - completed: 725, errors: 0, current rate: 44.98 done/second
merge - completed: 771, errors: 0, current rate: 45.99 done/second
merge - completed: 815, errors: 0, current rate: 44.02 done/second
merge - completed: 861, errors: 0, current rate: 46.01 done/second
merge - completed: 905, errors: 0, current rate: 43.98 done/second
merge - completed: 947, errors: 0, current rate: 42.00 done/second
merge - completed: 977, errors: 0, current rate: 30.01 done/second
merge - completed: 997, errors: 0, current rate: 19.99 done/second
completed: 1000, errors: 0, current rate: 4.92 done/second

Histogram (ms):
1       0
2       0
5       0
7       0
10      0
15      0
25      0
50      0
75      457
100     464
250     468
350     468
500     642
750     647
1000    729
5000    952
min     54
max     13744
total   1000
```

* Straighten out interval handling and fix checks-validator

- Add some jitter when acquiring ownership on a branch
- Refresh _for_ refresh interval, twice _every_ refresh interval
- nolint unjustified warnings

* [CR] Bug: Ensure single owner succeeds the first time a key is owned

Use SetIf with a nil predicate.

* Remove log print from test - confusing in a codebase

* [CR] Fix comments, error phrasing, and command descriptions

* [CR] Clarify request ID handling when missing, rename own -> release

* [CR] Remove finished sentinel and break ownership update loop on error

* [CR] Run Esti test with branch ownership

This does not depend on KV implementation, so just add a matrix to one of
the AWS/S3 flavours.

* Add log line to indicate ref manager started with weak ownership

Otherwise no way to tell Esti _really_ ran it with ownership.

* [CR] Only reset if owned when cancelling weak ownership

If ownership has already been lost to another thread, do NOT delete
ownership when released.

- KV does not provide a DeleteIf operation.  Instead, use SetIf with an
  always-expired timestamp.
- Along the way, ensure "owner" string is truly unique by stringing a nanoid
  onto it.  Currently owner is the request ID, which should be unique - but
  adding randomness ensures it will always be unique regardless of the
  calling system.

* Add totals line to lakectl abuse statistics

Otherwise it doesn't even say how long it took - which is the most
interesting part for `abuse merge`.

* lakectl abuse merge: clean up branches before exiting

Only report errors.  Obviously if not all branches deleted then we left a
mess, which is too bad.  But the performance test itself succeeded, which is
the (more) important thing.

* [CR] Correctly count KV ops in comments, and some minor cleanups

* Rename basic "ownership" class and move it to pkg/distributed/

1. It's not a KV util, move it out into distributed.
1. "Weak" is overloaded, so instead call it "mostly correct" ownership.  It
   precisely describes what it is so it must be a good term of art for what
   we do here.
1. Use the term "branch approximate ownership" at a higher-level in the ref
   manager.  This context doesn't particularly mind the specific properties
   of ownership, and "approximate" is a good fit there.
  • Loading branch information
arielshaqed authored Oct 28, 2024
1 parent 90e57f6 commit ae4bc01
Show file tree
Hide file tree
Showing 17 changed files with 1,043 additions and 15 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,9 @@ jobs:
name: Run latest lakeFS app on AWS S3
needs: [deploy-image, login-to-amazon-ecr]
runs-on: ubuntu-22.04
strategy:
matrix:
branch_ownership: [false, true]
env:
TAG: ${{ needs.deploy-image.outputs.tag }}
REPO: ${{ needs.login-to-amazon-ecr.outputs.registry }}
Expand Down Expand Up @@ -866,6 +869,7 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: s3
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }}
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
LAKEFS_GRAVELER_BRANCH_OWNERSHIP_ENABLED: ${{ matrix.branch_ownership }}
LAKEFS_DATABASE_TYPE: postgres
DOCKER_REG: ${{ needs.login-to-amazon-ecr.outputs.registry }}
ESTI_BLOCKSTORE_TYPE: s3
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ gen-code: gen-api ## Run the generator for inline commands
./pkg/actions \
./pkg/auth/ \
./pkg/authentication \
./pkg/distributed \
./pkg/graveler \
./pkg/graveler/committed \
./pkg/graveler/sstable \
Expand Down
1 change: 0 additions & 1 deletion buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ plugins:
out: pkg
opt:
- paths=source_relative

197 changes: 197 additions & 0 deletions cmd/lakectl/cmd/abuse_merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package cmd

import (
"context"
"fmt"
"net/http"
"os"
"sync"
"syscall"
"time"

nanoid "github.com/matoous/go-nanoid/v2"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/testutil/stress"
"github.com/treeverse/lakefs/pkg/uri"
)

// removeBranches removes all branches whose names start with prefix, in
// parallel. It reports (all) failures but does not fail.
func removeBranches(ctx context.Context, client *apigen.ClientWithResponses, parallelism int, repo, prefix string) {
toDelete := make(chan string)
pfx := apigen.PaginationPrefix(prefix)
after := apigen.PaginationAfter("")
go func() {
defer close(toDelete)
for {
resp, err := client.ListBranchesWithResponse(ctx, repo, &apigen.ListBranchesParams{
Prefix: &pfx,
After: &after,
})
if err != nil {
fmt.Printf("Failed to request to list branches %s/%s after %s: %s\n", repo, pfx, after, err)
}
if resp.JSON200 == nil {
fmt.Printf("Failed to list branches %s/%s after %s: %s\n", repo, pfx, after, resp.Status())
break
}
for _, result := range resp.JSON200.Results {
toDelete <- result.Id
}
if !resp.JSON200.Pagination.HasMore {
break
}
after = apigen.PaginationAfter(resp.JSON200.Pagination.NextOffset)
}
}()

wg := &sync.WaitGroup{}
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func() {
for branch := range toDelete {
resp, err := client.DeleteBranchWithResponse(ctx, repo, branch, &apigen.DeleteBranchParams{})
if err != nil {
fmt.Printf("Failed to request %s deletion: %s\n", branch, err)
continue
}
if resp.StatusCode() != http.StatusNoContent {
fmt.Printf("Failed to delete %s: %s\n", branch, resp.Status())
continue
}
}
wg.Done()
}()
}
wg.Wait()
}

var abuseMergeCmd = &cobra.Command{
Use: "merge <branch URI>",
Short: "Merge non-conflicting objects to the source branch in parallel",
Hidden: false,
Args: cobra.ExactArgs(1),
ValidArgsFunction: ValidArgsRepository,
Run: func(cmd *cobra.Command, args []string) {
u := MustParseBranchURI("branch URI", args[0])
amount := Must(cmd.Flags().GetInt("amount"))
parallelism := Must(cmd.Flags().GetInt("parallelism"))

fmt.Println("Source branch: ", u)

branchPrefix := "merge-" + nanoid.Must()
fmt.Println("Branch prefix: ", branchPrefix)

generator := stress.NewGenerator("merge", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM))

client := getClient()

// generate branch names as input
generator.Setup(func(add stress.GeneratorAddFn) {
for i := 0; i < amount; i++ {
add(fmt.Sprintf("%s-%04d", branchPrefix, i+1))
}
})

defer removeBranches(cmd.Context(), client, parallelism, u.Repository, branchPrefix)

resp, err := client.GetRepositoryWithResponse(cmd.Context(), u.Repository)
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)
if resp.JSON200 == nil {
DieFmt("Bad response from server: %+v", resp)
}

ctx := cmd.Context()

// execute ALL the things!
generator.Run(func(input chan string, output chan stress.Result) {
client := getClient()
for work := range input {
start := time.Now()
err := mergeSomething(ctx, client, u, work)
output <- stress.Result{
Error: err,
Took: time.Since(start),
}
// Don't block or sleep to maximise parallel load.
}
})
},
}

func mergeSomething(ctx context.Context, client *apigen.ClientWithResponses, base *uri.URI, name string) error {
createBranchResponse, err := client.CreateBranchWithResponse(ctx, base.Repository,
apigen.CreateBranchJSONRequestBody{
Name: name,
Source: base.Ref,
},
)
if err != nil || !apiutil.IsStatusCodeOK(createBranchResponse.StatusCode()) {
if err == nil {
err = helpers.ResponseAsError(createBranchResponse)
}
return fmt.Errorf("create branch %s: %w", name, err)
}

u := base.WithRef(name)
// Use a different name on each branch, to avoid conflicts.
path := fmt.Sprintf("object-%s", name)
u.Path = &path

getResponse, err := client.GetPhysicalAddressWithResponse(ctx, u.Repository, u.Ref, &apigen.GetPhysicalAddressParams{Path: *u.Path})
if err != nil || getResponse.JSON200 == nil {
if err == nil {
err = helpers.ResponseAsError(getResponse)
}
return fmt.Errorf("get physical address for %s: %w", name, err)
}
// Link the object but do not actually upload anything - it is not
// important for merging, and would only reduce load.
stagingLocation := getResponse.JSON200
linkResponse, err := client.LinkPhysicalAddressWithResponse(ctx, u.Repository, u.Ref,
&apigen.LinkPhysicalAddressParams{
Path: *u.Path,
},
apigen.LinkPhysicalAddressJSONRequestBody{
Checksum: "deadbeef0000cafe",
Staging: apigen.StagingLocation{
PhysicalAddress: stagingLocation.PhysicalAddress,
},
UserMetadata: nil,
})
if err != nil || linkResponse.JSON200 == nil {
if err == nil {
err = helpers.ResponseAsError(linkResponse)
}
return fmt.Errorf("link physical address for %s: %w", name, err)
}

commitResponse, err := client.CommitWithResponse(ctx, u.Repository, u.Ref, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: fmt.Sprintf("commit %s", name)})
if err != nil || commitResponse.JSON201 == nil {
if err == nil {
err = helpers.ResponseAsError(commitResponse)
}
return fmt.Errorf("commit for %s: %w", name, err)
}

mergeResponse, err := client.MergeIntoBranchWithResponse(ctx, u.Repository, u.Ref, base.Ref, apigen.MergeIntoBranchJSONRequestBody{})
if err != nil || mergeResponse.JSON200 == nil {
if err == nil {
err = helpers.ResponseAsError(mergeResponse)
}
return fmt.Errorf("merge from %s: %w", name, err)
}

return nil
}

//nolint:gochecknoinits,mnd
func init() {
abuseMergeCmd.Flags().Int("amount", 1000, "amount of merges to perform")
abuseMergeCmd.Flags().Int("parallelism", abuseDefaultParallelism, "number of merges to perform in parallel")

abuseCmd.AddCommand(abuseMergeCmd)
}
19 changes: 19 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -3032,6 +3032,25 @@ lakectl abuse list <source ref URI> [flags]



### lakectl abuse merge

Merge non-conflicting objects to the source branch in parallel

```
lakectl abuse merge <branch URI> [flags]
```

#### Options
{:.no_toc}

```
--amount int amount of merges to perform (default 1000)
-h, --help help for merge
--parallelism int number of merges to perform in parallel (default 100)
```



### lakectl abuse random-delete

Delete keys from a file and generate random delete from the source ref for those keys.
Expand Down
26 changes: 19 additions & 7 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,17 @@ func (c *ctxCloser) Close() error {
return nil
}

func makeBranchApproximateOwnershipParams(cfg config.ApproximatelyCorrectOwnership) ref.BranchApproximateOwnershipParams {
if !cfg.Enabled {
// zero Durations => no branch ownership
return ref.BranchApproximateOwnershipParams{}
}
return ref.BranchApproximateOwnershipParams{
AcquireInterval: cfg.Acquire,
RefreshInterval: cfg.Refresh,
}
}

func New(ctx context.Context, cfg Config) (*Catalog, error) {
ctx, cancelFn := context.WithCancel(ctx)
adapter, err := factory.BuildBlockAdapter(ctx, nil, cfg.Config)
Expand Down Expand Up @@ -364,13 +375,14 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
addressProvider := ident.NewHexAddressProvider()
refManager := ref.NewRefManager(
ref.ManagerConfig{
Executor: executor,
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
Executor: executor,
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
BranchApproximateOwnershipParams: makeBranchApproximateOwnershipParams(cfg.Config.Graveler.BranchOwnership),
})
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix)
settingManager := settings.NewManager(refManager, cfg.KVStore)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ type Database struct {
} `mapstructure:"cosmosdb"`
}

// ApproximateOwnership configures an approximate ("mostly correct") ownership.
type ApproximatelyCorrectOwnership struct {
Enabled bool `mapstructure:"enabled"`
Refresh time.Duration `mapstructure:"refresh"`
Acquire time.Duration `mapstructure:"acquire"`
}

// Config - Output struct of configuration, used to validate. If you read a key using a viper accessor
// rather than accessing a field of this struct, that key will *not* be validated. So don't
// do that.
Expand Down Expand Up @@ -330,6 +337,13 @@ type Config struct {
RateLimit int `mapstructure:"rate_limit"`
} `mapstructure:"background"`
MaxBatchDelay time.Duration `mapstructure:"max_batch_delay"`
// Parameters for tuning performance of concurrent branch
// update operations. These do not affect correctness or
// liveness. Internally this is "*most correct* branch
// ownership" because this ownership may safely fail. This
// distinction is unimportant during configuration, so use a
// shorter name.
BranchOwnership ApproximatelyCorrectOwnership `mapstructure:"branch_ownership"`
} `mapstructure:"graveler"`
Gateways struct {
S3 struct {
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,26 @@ func setDefaults(cfgType string) {
// 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff.
viper.SetDefault("graveler.max_batch_delay", 3*time.Millisecond)

viper.SetDefault("graveler.branch_ownership.enabled", false)
// ... but if branch ownership is enabled, set up some useful defaults!

// The single concurrent branch updater has these requirements from
// KV with these settings:
//
// - Cleanly acquiring ownership performs 1 read operation and 1
// write operation. Releasing ownership performs another 1 read
// operation and 1 write operation.
//
// - While ownership is held, add 2.5 read and 2.5 write operation
// per second, an additional ~7 read operations per second per
// branch operation waiting to acquire ownership, and an
// additional write operation per branch operation acquiring
// ownership.

// See additional comments on MostlyCorrectOwner for how to compute these numbers.
viper.SetDefault("graveler.branch_ownership.refresh", 400*time.Millisecond)
viper.SetDefault("graveler.branch_ownership.acquire", 150*time.Millisecond)

viper.SetDefault("ugc.prepare_interval", time.Minute)
viper.SetDefault("ugc.prepare_max_file_size", 20*1024*1024)

Expand Down
Loading

0 comments on commit ae4bc01

Please sign in to comment.