Skip to content

Commit

Permalink
Fix: lakectl local diff slowness (#7842)
Browse files Browse the repository at this point in the history
* Fix: lakectl local diff slowness

* Fix upload

* Update pkg/local/diff.go

Co-authored-by: Ariel Shaqed (Scolnicov) <[email protected]>

* Add unit tests

* Add a fuzzer for walking POSIX directories in S3 order

Looks like it found somethine almost immediately :-/

AFAIU tests will run _existing_ failures in testdata/fuzz.  You can fuzz
more by running something like:

```sh
$ go test -v ./pkg/local/ -fuzz FuzzWalkS3 --fuzztime 5m
=== RUN   TestDiffLocal
=== RUN   TestDiffLocal/t1_no_diff
=== RUN   TestDiffLocal/t1_modified
=== RUN   TestDiffLocal/t1_local_before
=== RUN   TestDiffLocal/t1_local_after
=== RUN   TestDiffLocal/t1_hidden_changed
--- PASS: TestDiffLocal (0.00s)
    --- PASS: TestDiffLocal/t1_no_diff (0.00s)
    --- PASS: TestDiffLocal/t1_modified (0.00s)
    --- PASS: TestDiffLocal/t1_local_before (0.00s)
    --- PASS: TestDiffLocal/t1_local_after (0.00s)
    --- PASS: TestDiffLocal/t1_hidden_changed (0.00s)
=== RUN   TestWalkS3
=== RUN   TestWalkS3/reverse_order
=== RUN   TestWalkS3/file_in_the_middle
=== RUN   TestWalkS3/dirs_at_the_end
=== RUN   TestWalkS3/files_mixed_with_directories
--- PASS: TestWalkS3 (0.00s)
    --- PASS: TestWalkS3/reverse_order (0.00s)
    --- PASS: TestWalkS3/file_in_the_middle (0.00s)
    --- PASS: TestWalkS3/dirs_at_the_end (0.00s)
    --- PASS: TestWalkS3/files_mixed_with_directories (0.00s)
=== RUN   TestWriteIndex
--- PASS: TestWriteIndex (0.00s)
=== RUN   TestReadIndex
--- PASS: TestReadIndex (0.00s)
=== RUN   TestFindIndices
--- PASS: TestFindIndices (0.00s)
=== RUN   FuzzWalkS3
fuzz: elapsed: 0s, gathering baseline coverage: 0/47 completed
failure while testing seed corpus entry: FuzzWalkS3/0345f16af6907ab1
fuzz: elapsed: 0s, gathering baseline coverage: 0/47 completed
--- FAIL: FuzzWalkS3 (0.03s)
    --- FAIL: FuzzWalkS3 (0.00s)
        diff_test.go:323:
                Error Trace:    /home/ariels/dev/lakeFS/pkg/local/diff_test.go:323
                                                        /home/ariels/sdk/go1.21.3/src/reflect/value.go:596
                                                        /home/ariels/sdk/go1.21.3/src/reflect/value.go:380
                                                        /home/ariels/sdk/go1.21.3/src/testing/fuzz.go:335
                Error:          Not equal:
                                expected: []string{"imported 0", "imported/0"}
                                actual  : []string{"imported/0", "imported 0"}

                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,4 +1,4 @@
                                 ([]string) (len=2) {
                                - (string) (len=10) "imported 0",
                                - (string) (len=10) "imported/0"
                                + (string) (len=10) "imported/0",
                                + (string) (len=10) "imported 0"
                                 }
                Test:           FuzzWalkS3

=== NAME
FAIL
exit status 1
FAIL    github.com/treeverse/lakefs/pkg/local   0.038s
```

Note that Go only finds a _first_ bug - but at least it tries to minimize it.

* Add sorted queue

* CR fixes

* CR Fixes last

---------

Co-authored-by: Ariel Shaqed (Scolnicov) <[email protected]>
  • Loading branch information
N-o-Z and arielshaqed authored Jun 18, 2024
1 parent 65918ac commit 5630d85
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 31 deletions.
7 changes: 6 additions & 1 deletion cmd/lakectl/cmd/fs_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ var fsUploadCmd = &cobra.Command{
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, os.Kill)
defer stop()

if !recursive { // Assume source is a single file
stat, err := os.Stat(source)
if err != nil {
Die("failed to stat source", 1)
}

if !recursive || !stat.IsDir() { // Ignore recursive if source is a file and not a directory
if strings.HasSuffix(remotePath, uri.PathSeparator) {
Die("target path is not a valid URI", 1)
}
Expand Down
13 changes: 6 additions & 7 deletions esti/lakectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,8 @@ func TestLakectlFsUpload(t *testing.T) {
})
t.Run("single_file_with_recursive", func(t *testing.T) {
vars["FILE_PATH"] = "data/ro/ro_1k.0"
sanitizedResult := runCmd(t, Lakectl()+" fs upload --recursive -s files/ro_1k lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"], false, false, vars)
require.Contains(t, sanitizedResult, "diff 'local://files/ro_1k/' <--> 'lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"]+"'...")
require.Contains(t, sanitizedResult, "upload .")
require.Contains(t, sanitizedResult, "Upload Summary:")
require.Contains(t, sanitizedResult, "Downloaded: 0")
require.Contains(t, sanitizedResult, "Uploaded: 1")
require.Contains(t, sanitizedResult, "Removed: 0")
RunCmdAndVerifySuccessWithFile(t, Lakectl()+" fs upload --recursive -s files/ro_1k lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"]+" -s files/ro_1k", false, "lakectl_fs_upload", vars)

})
t.Run("dir", func(t *testing.T) {
vars["FILE_PATH"] = "data/ro/"
Expand All @@ -589,6 +584,10 @@ func TestLakectlFsUpload(t *testing.T) {
vars["FILE_PATH"] = "data/ro/"
RunCmdAndVerifyFailure(t, Lakectl()+" fs upload -s files/ lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"], false, "target path is not a valid URI\nError executing command.\n", vars)
})
t.Run("dir_without_recursive_to_file", func(t *testing.T) {
vars["FILE_PATH"] = "data/ro/1.txt"
RunCmdAndVerifyFailureContainsText(t, Lakectl()+" fs upload -s files/ lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"], false, "read files/: is a directory", vars)
})
}

func getStorageConfig(t *testing.T) *apigen.StorageConfig {
Expand Down
84 changes: 61 additions & 23 deletions pkg/local/diff.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package local

import (
"container/heap"
"context"
"fmt"
"io/fs"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/go-openapi/swag"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/block/local"
"github.com/treeverse/lakefs/pkg/block/params"
"github.com/treeverse/lakefs/pkg/gateway/path"
"github.com/treeverse/lakefs/pkg/uri"
)

Expand Down Expand Up @@ -188,6 +186,57 @@ func Undo(c Changes) Changes {
return reversed
}

// WalkS3 - walk like an Egyptian... ¯\_(ツ)¯\_
// This walker function simulates the way object listing is performed by S3. Contrary to how a standard FS walk function behaves, S3
// does not take into consideration the directory hierarchy. Instead, object paths include the entire path relative to the root and as a result
// the directory or "path separator" is also taken into account when providing the listing in a lexicographical order.
func WalkS3(root string, callbackFunc func(p string, info fs.FileInfo, err error) error) error {
var stringHeap StringHeap

err := filepath.Walk(root, func(p string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if p == root {
return nil
}
if info.IsDir() {
// TODO: We don't return dir results for the listing, how will this effect directory markers, and can we even support directory markers?
// Save encountered directories in a min heap and compare them with the first appearance of a file in that level
heap.Push(&stringHeap, p+path.Separator) // add path separator to dir name and sort it later
return filepath.SkipDir
}

for stringHeap.Len() > 0 {
dir := stringHeap.Peek().(string)
if p < dir { // file should be processed before dir
break
}
heap.Pop(&stringHeap) // remove from queue
if err = WalkS3(dir, callbackFunc); err != nil {
return err
}
}
// Process the file after we finished processing all the dirs that precede it
if err = callbackFunc(p, info, err); err != nil {
return err
}
return nil
})
if err != nil {
return err
}

// Finally, finished walking over FS, handle remaining dirs
for stringHeap.Len() > 0 {
dir := heap.Pop(&stringHeap).(string)
if err = WalkS3(dir, callbackFunc); err != nil {
return err
}
}
return nil
}

// DiffLocalWithHead Checks changes between a local directory and the head it is pointing to. The diff check assumes the remote
// is an immutable set so any changes found resulted from changes in the local directory
// left is an object channel which contains results from a remote source. rightPath is the local directory to diff with
Expand All @@ -198,25 +247,14 @@ func DiffLocalWithHead(left <-chan apigen.ObjectStats, rightPath string) (Change
currentRemoteFile apigen.ObjectStats
hasMore bool
)
absPath, err := filepath.Abs(rightPath)
if err != nil {
return nil, err
}
uri := url.URL{Scheme: "local", Path: absPath}
reader := local.NewLocalWalker(params.Local{
ImportEnabled: false,
ImportHidden: true,
AllowedExternalPrefixes: []string{absPath},
})
err = reader.Walk(context.Background(), &uri, block.WalkOptions{}, func(e block.ObjectStoreEntry) error {
info, err := os.Stat(e.FullKey)
err := WalkS3(rightPath, func(p string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() || diffShouldIgnore(info.Name()) {
if diffShouldIgnore(info.Name()) {
return nil
}
localPath := e.RelativeKey
localPath := strings.TrimPrefix(p, rightPath)
localPath = strings.TrimPrefix(localPath, string(filepath.Separator))
localPath = filepath.ToSlash(localPath) // normalize to use "/" always

Expand Down Expand Up @@ -288,18 +326,18 @@ func ListRemote(ctx context.Context, client apigen.ClientWithResponsesInterface,
return fmt.Errorf("list remote failed. HTTP %d: %w", listResp.StatusCode(), ErrRemoteFailure)
}
for _, o := range listResp.JSON200.Results {
path := strings.TrimPrefix(o.Path, loc.GetPath())
p := strings.TrimPrefix(o.Path, loc.GetPath())
// skip directory markers
if path == "" || (strings.HasSuffix(path, uri.PathSeparator) && swag.Int64Value(o.SizeBytes) == 0) {
if p == "" || (strings.HasSuffix(p, uri.PathSeparator) && swag.Int64Value(o.SizeBytes) == 0) {
continue
}
path = strings.TrimPrefix(path, uri.PathSeparator)
p = strings.TrimPrefix(p, uri.PathSeparator)
objects <- apigen.ObjectStats{
Checksum: o.Checksum,
ContentType: o.ContentType,
Metadata: o.Metadata,
Mtime: o.Mtime,
Path: path,
Path: p,
PathType: o.PathType,
PhysicalAddress: o.PhysicalAddress,
SizeBytes: o.SizeBytes,
Expand Down
138 changes: 138 additions & 0 deletions pkg/local/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"io/fs"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -189,3 +191,139 @@ func fixTime(t *testing.T, localPath string) {
})
require.NoError(t, err)
}

// hasPrefix returns true if slice starts with prefix.
func hasPrefix(slice, prefix []string) bool {
if len(slice) < len(prefix) {
return false
}
return slices.Equal(slice[:len(prefix)], prefix)
}

// cleanPrefixes removes all paths from fileList which are prefixes of
// another path. The result is a list of paths which can be created as
// files in a POSIX directory structure. As a special case, an empty string
// is considered a self-prefix: it will be a directory, which already
// exists.
func cleanPrefixes(t testing.TB, fileList []string) []string {
t.Helper()
for i, file := range fileList {
if file != "" && file[0] == '/' {
file = "." + file
}
fileList[i] = filepath.Clean(file)
}
sort.Strings(fileList)

ret := make([]string, 0, len(fileList))
var cur []string

for _, file := range fileList {
// Split on "/" explicitly: this is how we construct our
// test data.
//
// TODO(ariels): Fuzzing on Windows will probably fail
// because of this, and we will need to translate
// backslashes to slashes above.
next := strings.Split(file, "/")
if file == "." || next[len(next)-1] == "" {
continue
}
if cur != nil && hasPrefix(next, cur) {
continue
}
cur = next
ret = append(ret, file)
}
return ret
}

// setupFiles creates a directory structure containing all files in
// fileList. fileList should already be cleaned of prefixes by
// cleanPrefixes.
func setupFiles(t testing.TB, fileList []string) string {
t.Helper()
dir := t.TempDir() + string(filepath.Separator)
for _, file := range fileList {
p := filepath.Join(dir, file)
require.NoError(t, os.MkdirAll(filepath.Dir(p), os.ModePerm))
fd, err := os.Create(p)
require.NoError(t, err)
_ = fd.Close()
}
return dir
}

func TestWalkS3(t *testing.T) {
cases := []struct {
Name string
FileList []string
}{
{
Name: "reverse order",
FileList: []string{"imported/new-prefix/prefix-1/file000001", "imported./new-prefix/prefix-1/file002100"},
},
{
Name: "file in the middle",
FileList: []string{"imported/file000001", "imported/new-prefix/prefix-1/file000001", "imported./new-prefix/prefix-1/file002100"},
},
{
Name: "dirs at the end",
FileList: []string{"imported/new-prefix/prefix-1/file000001", "imported./new-prefix/prefix-1/file002100", "file000001"},
},
{
Name: "files mixed with directories",
FileList: []string{"imported/0000/1", "imported/00000", "imported/00010/1"},
},
{
Name: "file in between",
FileList: []string{"imported,/0000/1", "imported.", "imported/00010/1"},
},
}
for _, tt := range cases {
t.Run(tt.Name, func(t *testing.T) {
dir := setupFiles(t, tt.FileList)
var walkOrder []string
err := local.WalkS3(dir, func(p string, info fs.FileInfo, err error) error {
walkOrder = append(walkOrder, strings.TrimPrefix(p, dir))
return nil
})
require.NoError(t, err)
sort.Strings(tt.FileList)
require.Equal(t, tt.FileList, walkOrder)
})
}
}

func FuzzWalkS3(f *testing.F) {
testcases := [][]string{
{"imported/file000001", "imported/new-prefix/prefix-1/file000001", "imported./new-prefix/prefix-1/file002100"},
{"imported/new-prefix/prefix-1/file000001", "imported./new-prefix/prefix-1/file002100", "file000001"},
{"imported/0000/1", "imported/00000", "imported/00010/1"},
}

// Go fuzzing only supports string test cases. Since \0 is not
// valid in POSIX filenames, decode that as a separator.
for _, tc := range testcases {
f.Add(strings.Join(tc, "\x00"))
}
f.Fuzz(func(t *testing.T, tc string) {
tcf := strings.Split(tc, "\x00")
files := cleanPrefixes(t, tcf)

dir := setupFiles(t, files)
var walkOrder []string
err := local.WalkS3(dir, func(p string, info fs.FileInfo, err error) error {
walkOrder = append(walkOrder, strings.TrimPrefix(p, dir))
return nil
})
require.NoError(t, err)
sort.Strings(files)
if len(files) == 0 {
// require.Equal doesn't understand empty slices;
// our code represents empty slices as nil.
files = nil
}
require.Equal(t, files, walkOrder)
})
}
33 changes: 33 additions & 0 deletions pkg/local/string_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package local

// A StringHeap is a min-heap of strings
type StringHeap []string

func (pq *StringHeap) Len() int { return len(*pq) }

func (pq *StringHeap) Less(i, j int) bool {
// We want Pop to give us the smallest string
return (*pq)[i] < (*pq)[j]
}

func (pq *StringHeap) Swap(i, j int) {
(*pq)[i], (*pq)[j] = (*pq)[j], (*pq)[i]
}

func (pq *StringHeap) Push(x any) {
*pq = append(*pq, x.(string))
}

func (pq *StringHeap) Pop() any {
old := *pq
n := len(old)
x := old[n-1]
*pq = old[0 : n-1]
return x
}

// Peek - according to heap documentation (https://pkg.go.dev/container/heap) "The minimum element in the tree is the root, at index 0."
func (pq *StringHeap) Peek() any {
x := (*pq)[0]
return x
}
39 changes: 39 additions & 0 deletions pkg/local/string_heap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package local_test

import (
"container/heap"
"fmt"
"sort"
"testing"

"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/local"
)

func TestStringHeap(t *testing.T) {
// Some items and their priorities.
items := []string{"imported/0000/1", "imported../00000", "imported/00010/1"}

// Create a priority queue, put the items in it, and
// establish the priority queue (heap) invariants.
pq := local.StringHeap{}
heap.Init(&pq)

for _, item := range items {
heap.Push(&pq, item)
}

// Insert a new item and then modify its priority.
items = append(items, "imported./0000/1")
heap.Push(&pq, "imported./0000/1")

// Take the items out; they arrive in decreasing priority order.
sort.Strings(items)
fmt.Println(items)
i := 0
for pq.Len() > 0 {
item := heap.Pop(&pq).(string)
require.Equal(t, items[i], item)
i++
}
}
Loading

0 comments on commit 5630d85

Please sign in to comment.