From 6a71759a0e2a3c79420cb01dcd172d8e3f10a184 Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Thu, 7 Sep 2023 14:45:57 +0300 Subject: [PATCH 1/8] Fix SeekGE in DDB --- pkg/kv/dynamodb/store.go | 1 + pkg/kv/kvtest/iterators.go | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/kv/dynamodb/store.go b/pkg/kv/dynamodb/store.go index 45dfa76139e..fd857c6f4e5 100644 --- a/pkg/kv/dynamodb/store.go +++ b/pkg/kv/dynamodb/store.go @@ -380,6 +380,7 @@ func (s *Store) DropTable() error { func (e *EntriesIterator) SeekGE(key []byte) { if !e.isInRange(key) { e.startKey = key + e.exclusiveStartKey = nil e.runQuery() return } diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index fac88722f9a..2deddc7e650 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -35,7 +35,21 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { store := ms(t, ctx) // prepare data - modelNames := []string{"a", "aa", "b", "c", "d"} + modelNames := []string{"a", "aa", "b", "c", "d", + "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", + "d10", "d11", "d12", "d13", "d14", "d15", "d16", "d17", "d18", "d19", + "d20", "d21", "d22", "d23", "d24", "d25", "d26", "d27", "d28", "d29", + "d30", "d31", "d32", "d33", "d34", "d35", "d36", "d37", "d38", "d39", + "d40", "d41", "d42", "d43", "d44", "d45", "d46", "d47", "d48", "d49", + "d50", "d51", "d52", "d53", "d54", "d55", "d56", "d57", "d58", "d59", + "d60", "d61", "d62", "d63", "d64", "d65", "d66", "d67", "d68", "d69", + "d70", "d71", "d72", "d73", "d74", "d75", "d76", "d77", "d78", "d79", + "d80", "d81", "d82", "d83", "d84", "d85", "d86", "d87", "d88", "d89", + "d90", "d91", "d92", "d93", "d94", "d95", "d96", "d97", "d98", "d99", + "d100", "d101", "d102", "d103", "d104", "d105", "d106", "d107", "d108", "d109", + "d110", "d111", "d112", "d113", "d114", "d115", "d116", "d117", "d118", "d119", + "d120", "d121", "d122", "d123", "d124", "d125", "d126", "d127", "d128", "d129", + "z"} for _, name := range modelNames { model := TestModel{Name: []byte(name)} for _, partitionKey := range []string{firstPartitionKey, secondPartitionKey} { @@ -64,7 +78,7 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { t.Fatalf("failed to create partition iterator") } defer itr.Close() - for _, seekValue := range []string{"b", "aaa", "b"} { + for _, seekValue := range []string{"b", "aaa", "b", "z"} { itr.SeekGE([]byte(seekValue)) names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { From 90f26fa5073e0f5dccd1c986bcfd55621b892966 Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Thu, 7 Sep 2023 14:49:41 +0300 Subject: [PATCH 2/8] Fix test --- pkg/kv/kvtest/iterators.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index 2deddc7e650..93d1180293a 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -78,13 +78,10 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { t.Fatalf("failed to create partition iterator") } defer itr.Close() - for _, seekValue := range []string{"b", "aaa", "b", "z"} { - itr.SeekGE([]byte(seekValue)) - names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { - t.Fatalf("got wrong list of names: %v", diffs) - } - } + seekValues := []string{"b", "aaa", "b"} + seekAndCompare(t, seekValues, itr, []string{"b", "c", "d"}) + seekAndCompare(t, []string{"z"}, itr, []string{"z"}) + seekAndCompare(t, seekValues, itr, []string{"b", "c", "d"}) }) t.Run("count scans on successive SeekGE operations", func(t *testing.T) { @@ -186,6 +183,16 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { }) } +func seekAndCompare(t *testing.T, seekValues []string, itr *kv.PartitionIterator, expected []string) { + for _, seekValue := range seekValues { + itr.SeekGE([]byte(seekValue)) + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, expected); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) + } + } +} + // scanPartitionIterator scans the iterator and returns a slice of the results of applying fn to each model. // slice element type is based on the callback 'fn' function return type. func scanPartitionIterator[E any](t *testing.T, itr kv.MessageIterator, fn func(key []byte, model *TestModel) E) []E { From ef384dd8b446571e4a8adad2c95d72850bce3d10 Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Thu, 7 Sep 2023 19:04:28 +0300 Subject: [PATCH 3/8] Use callbacks in kv store creation for tests isolation --- pkg/gateway/testutil/gateway_setup.go | 5 +- pkg/kv/cosmosdb/store_test.go | 16 ++++-- pkg/kv/dynamodb/main_test.go | 15 ++---- pkg/kv/dynamodb/store_test.go | 24 ++++++++- pkg/kv/kvtest/iterators.go | 76 ++++++++++++++++++--------- pkg/kv/kvtest/store.go | 16 +----- pkg/kv/local/store_test.go | 22 +++++--- pkg/kv/mem/store_test.go | 14 ++++- pkg/kv/msg_test.go | 2 +- pkg/kv/postgres/main_test.go | 12 ++--- pkg/kv/postgres/store_test.go | 19 ++++++- 11 files changed, 146 insertions(+), 75 deletions(-) diff --git a/pkg/gateway/testutil/gateway_setup.go b/pkg/gateway/testutil/gateway_setup.go index 7d247d44dbf..70d2ae0d9cf 100644 --- a/pkg/gateway/testutil/gateway_setup.go +++ b/pkg/gateway/testutil/gateway_setup.go @@ -2,6 +2,7 @@ package testutil import ( "context" + "github.com/treeverse/lakefs/pkg/kv" "net/http" "os" "testing" @@ -16,7 +17,6 @@ import ( "github.com/treeverse/lakefs/pkg/gateway" "github.com/treeverse/lakefs/pkg/gateway/multipart" "github.com/treeverse/lakefs/pkg/kv/kvparams" - "github.com/treeverse/lakefs/pkg/kv/kvtest" _ "github.com/treeverse/lakefs/pkg/kv/mem" "github.com/treeverse/lakefs/pkg/logging" "github.com/treeverse/lakefs/pkg/stats" @@ -34,7 +34,8 @@ func GetBasicHandler(t *testing.T, authService *FakeAuthService, repoName string ctx := context.Background() viper.Set(config.BlockstoreTypeKey, block.BlockstoreTypeMem) - store := kvtest.MakeStoreByName("mem", kvparams.Config{})(t, ctx) + store, err := kv.Open(ctx, kvparams.Config{Type: "mem"}) + testutil.MustDo(t, "open kv store", err) defer store.Close() multipartTracker := multipart.NewTracker(store) diff --git a/pkg/kv/cosmosdb/store_test.go b/pkg/kv/cosmosdb/store_test.go index 7d3839314f3..c2fe665b018 100644 --- a/pkg/kv/cosmosdb/store_test.go +++ b/pkg/kv/cosmosdb/store_test.go @@ -1,15 +1,25 @@ package cosmosdb_test import ( - "testing" - + "context" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/cosmosdb" "github.com/treeverse/lakefs/pkg/kv/kvparams" + "testing" + "github.com/treeverse/lakefs/pkg/kv/kvtest" ) func TestCosmosDB(t *testing.T) { t.Skip("CosmosDB tests are flaky due to the emulator. If you plan on running those, make sure to assign at least 3CPUs and" + " 4GB of memory to the container running the emulator.") - kvtest.DriverTest(t, cosmosdb.DriverName, kvparams.Config{CosmosDB: testParams}) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{CosmosDB: testParams, Type: cosmosdb.DriverName}) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", cosmosdb.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) } diff --git a/pkg/kv/dynamodb/main_test.go b/pkg/kv/dynamodb/main_test.go index 7ba3b5b18d5..fad7372b6ea 100644 --- a/pkg/kv/dynamodb/main_test.go +++ b/pkg/kv/dynamodb/main_test.go @@ -10,22 +10,15 @@ import ( ) var testParams *kvparams.DynamoDB +var databaseURI string func TestMain(m *testing.M) { - databaseURI, cleanupFunc, err := testutil.GetDynamoDBInstance() + var err error + var cleanupFunc func() + databaseURI, cleanupFunc, err = testutil.GetDynamoDBInstance() if err != nil { log.Fatalf("Could not connect to Docker: %s", err) } - - testParams = &kvparams.DynamoDB{ - TableName: testutil.UniqueKVTableName(), - ScanLimit: 10, - Endpoint: databaseURI, - AwsRegion: "us-east-1", - AwsAccessKeyID: "fakeMyKeyId", - AwsSecretAccessKey: "fakeSecretAccessKey", - } - code := m.Run() cleanupFunc() os.Exit(code) diff --git a/pkg/kv/dynamodb/store_test.go b/pkg/kv/dynamodb/store_test.go index e1f3aa00b8e..982497f38dd 100644 --- a/pkg/kv/dynamodb/store_test.go +++ b/pkg/kv/dynamodb/store_test.go @@ -1,13 +1,33 @@ package dynamodb_test import ( + "context" + "github.com/treeverse/lakefs/pkg/kv" + "github.com/treeverse/lakefs/pkg/kv/dynamodb" + "github.com/treeverse/lakefs/pkg/testutil" "testing" - "github.com/treeverse/lakefs/pkg/kv/dynamodb" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" ) func TestDynamoKV(t *testing.T) { - kvtest.DriverTest(t, dynamodb.DriverName, kvparams.Config{DynamoDB: testParams}) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + testParams = &kvparams.DynamoDB{ + TableName: testutil.UniqueKVTableName(), + ScanLimit: 10, + Endpoint: databaseURI, + AwsRegion: "us-east-1", + AwsAccessKeyID: "fakeMyKeyId", + AwsSecretAccessKey: "fakeSecretAccessKey", + } + + store, err := kv.Open(ctx, kvparams.Config{DynamoDB: testParams, Type: dynamodb.DriverName}) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", dynamodb.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) } diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index 93d1180293a..4457b2f0007 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -3,6 +3,7 @@ package kvtest import ( "context" "errors" + "github.com/stretchr/testify/require" "sync/atomic" "testing" @@ -35,21 +36,7 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { store := ms(t, ctx) // prepare data - modelNames := []string{"a", "aa", "b", "c", "d", - "d1", "d2", "d3", "d4", "d5", "d6", "d7", "d8", "d9", - "d10", "d11", "d12", "d13", "d14", "d15", "d16", "d17", "d18", "d19", - "d20", "d21", "d22", "d23", "d24", "d25", "d26", "d27", "d28", "d29", - "d30", "d31", "d32", "d33", "d34", "d35", "d36", "d37", "d38", "d39", - "d40", "d41", "d42", "d43", "d44", "d45", "d46", "d47", "d48", "d49", - "d50", "d51", "d52", "d53", "d54", "d55", "d56", "d57", "d58", "d59", - "d60", "d61", "d62", "d63", "d64", "d65", "d66", "d67", "d68", "d69", - "d70", "d71", "d72", "d73", "d74", "d75", "d76", "d77", "d78", "d79", - "d80", "d81", "d82", "d83", "d84", "d85", "d86", "d87", "d88", "d89", - "d90", "d91", "d92", "d93", "d94", "d95", "d96", "d97", "d98", "d99", - "d100", "d101", "d102", "d103", "d104", "d105", "d106", "d107", "d108", "d109", - "d110", "d111", "d112", "d113", "d114", "d115", "d116", "d117", "d118", "d119", - "d120", "d121", "d122", "d123", "d124", "d125", "d126", "d127", "d128", "d129", - "z"} + modelNames := []string{"a", "aa", "b", "c", "d"} for _, name := range modelNames { model := TestModel{Name: []byte(name)} for _, partitionKey := range []string{firstPartitionKey, secondPartitionKey} { @@ -78,10 +65,13 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { t.Fatalf("failed to create partition iterator") } defer itr.Close() - seekValues := []string{"b", "aaa", "b"} - seekAndCompare(t, seekValues, itr, []string{"b", "c", "d"}) - seekAndCompare(t, []string{"z"}, itr, []string{"z"}) - seekAndCompare(t, seekValues, itr, []string{"b", "c", "d"}) + for _, seekValue := range []string{"b", "aaa", "b"} { + itr.SeekGE([]byte(seekValue)) + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) + } + } }) t.Run("count scans on successive SeekGE operations", func(t *testing.T) { @@ -181,16 +171,52 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { t.Fatalf("expected value a from iterator") } }) -} -func seekAndCompare(t *testing.T, seekValues []string, itr *kv.PartitionIterator, expected []string) { - for _, seekValue := range seekValues { - itr.SeekGE([]byte(seekValue)) + t.Run("listing values SeekGE with pagination", func(t *testing.T) { + // load much more data + moreModelNames := []string{ + "da", "db", "dc", "dd", "de", "df", "dg", "dh", "di", "dj", + "dk", "dl", "dm", "dn", "do", "dp", "dq", "dr", "ds", "dt", + "du", "dv", "dw", "dx", "dy", "dz", + "ea", "eb", "ec", "ed", "ee", "ef", "eg", "eh", "ei", "ej", + "ek", "el", "em", "en", "eo", "ep", "eq", "er", "es", "et", + "eu", "ev", "ew", "ex", "ey", "ez", + "fa", "fb", "fc", "fd", "fe", "ff", "fg", "fh", "fi", "fj", + "fk", "fl", "fm", "fn", "fo", "fp", "fq", "fr", "fs", "ft", + "fu", "fv", "fw", "fx", "fy", "fz", + "ga", "gb", "gc", "gd", "ge", "gf", "gg", "gh", "gi", "gj", + "gk", "gl", "gm", "gn", "go", "gp", "gq", "gr", "gs", "gt", + "gu", "gv", "gw", "gx", "gy", "gz", + "z", + } + for _, name := range moreModelNames { + model := TestModel{Name: []byte(name)} + for _, partitionKey := range []string{firstPartitionKey, secondPartitionKey} { + err := kv.SetMsg(ctx, store, partitionKey, model.Name, &model) + if err != nil { + t.Fatalf("failed to set model (partition %s, name %s): %s", partitionKey, name, err) + } + } + } + + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) + if itr == nil { + t.Fatalf("failed to create partition iterator") + } + defer itr.Close() + + itr.SeekGE([]byte("b")) + require.True(t, itr.Next()) + + itr.SeekGE([]byte("z")) + require.True(t, itr.Next()) + + itr.SeekGE([]byte("d1")) names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, expected); diffs != nil { + if diffs := deep.Equal(names, moreModelNames); diffs != nil { t.Fatalf("got wrong list of names: %v", diffs) } - } + }) } // scanPartitionIterator scans the iterator and returns a slice of the results of applying fn to each model. diff --git a/pkg/kv/kvtest/store.go b/pkg/kv/kvtest/store.go index 37fcdb099a6..09ffba34e73 100644 --- a/pkg/kv/kvtest/store.go +++ b/pkg/kv/kvtest/store.go @@ -50,9 +50,8 @@ func sampleEntry(prefix string, n int) kv.Entry { return kv.Entry{Key: []byte(k), Value: []byte(v)} } -func DriverTest(t *testing.T, name string, params kvparams.Config) { +func DriverTest(t *testing.T, ms MakeStore) { t.Helper() - ms := MakeStoreByName(name, params) t.Run("Driver_Open", func(t *testing.T) { testDriverOpen(t, ms) }) t.Run("Store_SetGet", func(t *testing.T) { testStoreSetGet(t, ms) }) t.Run("Store_SetIf", func(t *testing.T) { testStoreSetIf(t, ms) }) @@ -457,19 +456,6 @@ func testStoreScan(t *testing.T, ms MakeStore) { }) } -func MakeStoreByName(name string, kvParams kvparams.Config) MakeStore { - return func(t testing.TB, ctx context.Context) kv.Store { - t.Helper() - kvParams.Type = name - store, err := kv.Open(ctx, kvParams) - if err != nil { - t.Fatalf("failed to open kv '%s' store: %s", name, err) - } - t.Cleanup(store.Close) - return store - } -} - func testStoreMissingArgument(t *testing.T, ms MakeStore) { ctx := context.Background() store := ms(t, ctx) diff --git a/pkg/kv/local/store_test.go b/pkg/kv/local/store_test.go index 29cd1f6aebc..78ea6307e86 100644 --- a/pkg/kv/local/store_test.go +++ b/pkg/kv/local/store_test.go @@ -1,6 +1,8 @@ package local_test import ( + "context" + "github.com/treeverse/lakefs/pkg/kv" "testing" "github.com/treeverse/lakefs/pkg/kv/kvparams" @@ -9,11 +11,19 @@ import ( ) func TestLocalKV(t *testing.T) { - kvtest.DriverTest(t, local.DriverName, kvparams.Config{ - Type: local.DriverName, - Local: &kvparams.Local{ - Path: t.TempDir(), - EnableLogging: true, - }, + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{ + Type: local.DriverName, + Local: &kvparams.Local{ + Path: t.TempDir(), + EnableLogging: true, + }, + }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", local.DriverName, err) + } + t.Cleanup(store.Close) + return store }) } diff --git a/pkg/kv/mem/store_test.go b/pkg/kv/mem/store_test.go index 02b5e3ebf31..c1189dabffd 100644 --- a/pkg/kv/mem/store_test.go +++ b/pkg/kv/mem/store_test.go @@ -1,6 +1,8 @@ package mem_test import ( + "context" + "github.com/treeverse/lakefs/pkg/kv" "testing" "github.com/treeverse/lakefs/pkg/kv/kvparams" @@ -9,5 +11,15 @@ import ( ) func TestMemKV(t *testing.T) { - kvtest.DriverTest(t, mem.DriverName, kvparams.Config{}) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{ + Type: mem.DriverName, + }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", mem.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) } diff --git a/pkg/kv/msg_test.go b/pkg/kv/msg_test.go index 0e3d2fb5d0c..ccd6f9324fe 100644 --- a/pkg/kv/msg_test.go +++ b/pkg/kv/msg_test.go @@ -146,7 +146,7 @@ func BenchmarkDrivers(b *testing.B) { defer closer() dynamoStore := testutil.GetDynamoDBProd(ctx, b) - postgresStore := kvtest.MakeStoreByName(postgres.DriverName, kvparams.Config{Postgres: &kvparams.Postgres{ConnectionString: databaseURI}})(b, ctx) + postgresStore, err := kv.Open(ctx, kvparams.Config{Type: postgres.DriverName, Postgres: &kvparams.Postgres{ConnectionString: databaseURI}}) defer postgresStore.Close() tests := []struct { diff --git a/pkg/kv/postgres/main_test.go b/pkg/kv/postgres/main_test.go index 57cc8525c6c..cee79d6a578 100644 --- a/pkg/kv/postgres/main_test.go +++ b/pkg/kv/postgres/main_test.go @@ -17,16 +17,15 @@ const ( ) var ( - pool *dockertest.Pool - databaseURI string + pool *dockertest.Pool ) -func runDBInstance(dockerPool *dockertest.Pool) (string, func()) { +func runDBInstance(dockerPool *dockertest.Pool, dbName string) (string, func()) { ctx := context.Background() resource, err := dockerPool.Run("postgres", "11", []string{ "POSTGRES_USER=lakefs", "POSTGRES_PASSWORD=lakefs", - "POSTGRES_DB=lakefs_db", + "POSTGRES_DB=" + dbName, }) if err != nil { panic("Could not start postgresql: " + err.Error()) @@ -49,7 +48,7 @@ func runDBInstance(dockerPool *dockertest.Pool) (string, func()) { // create connection var pgPool *pgxpool.Pool port := resource.GetPort("5432/tcp") - uri := fmt.Sprintf("postgres://lakefs:lakefs@localhost:%s/lakefs_db?sslmode=disable", port) + uri := fmt.Sprintf("postgres://lakefs:lakefs@localhost:%s/%s?sslmode=disable", port, dbName) err = dockerPool.Retry(func() error { var err error pgPool, err = pgxpool.New(ctx, uri) @@ -73,9 +72,6 @@ func TestMain(m *testing.M) { if err != nil { log.Fatalf("Could not connect to Docker: %s", err) } - var cleanup func() - databaseURI, cleanup = runDBInstance(pool) code := m.Run() - cleanup() os.Exit(code) } diff --git a/pkg/kv/postgres/store_test.go b/pkg/kv/postgres/store_test.go index 93d2f10e224..3ff58811a56 100644 --- a/pkg/kv/postgres/store_test.go +++ b/pkg/kv/postgres/store_test.go @@ -1,6 +1,9 @@ package postgres_test import ( + "context" + "github.com/treeverse/lakefs/pkg/kv" + "github.com/treeverse/lakefs/pkg/testutil" "testing" "github.com/treeverse/lakefs/pkg/kv/kvparams" @@ -9,5 +12,19 @@ import ( ) func TestPostgresKV(t *testing.T) { - kvtest.DriverTest(t, postgres.DriverName, kvparams.Config{Postgres: &kvparams.Postgres{ConnectionString: databaseURI, ScanPageSize: 10}}) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + databaseURI, cleanup := runDBInstance(pool, testutil.UniqueKVTableName()) + + store, err := kv.Open(ctx, kvparams.Config{ + Type: postgres.DriverName, + Postgres: &kvparams.Postgres{ConnectionString: databaseURI, ScanPageSize: 10}, + }) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", postgres.DriverName, err) + } + t.Cleanup(store.Close) + t.Cleanup(cleanup) + return store + }) } From e4f7c960cd4a80d4c0518d6dfd7b413fb8ceb5da Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Fri, 8 Sep 2023 23:04:13 +0300 Subject: [PATCH 4/8] make linter happy --- pkg/gateway/testutil/gateway_setup.go | 2 +- pkg/kv/kvtest/iterators.go | 234 ++++++++++++++------------ 2 files changed, 129 insertions(+), 107 deletions(-) diff --git a/pkg/gateway/testutil/gateway_setup.go b/pkg/gateway/testutil/gateway_setup.go index 70d2ae0d9cf..595817a7ede 100644 --- a/pkg/gateway/testutil/gateway_setup.go +++ b/pkg/gateway/testutil/gateway_setup.go @@ -2,7 +2,6 @@ package testutil import ( "context" - "github.com/treeverse/lakefs/pkg/kv" "net/http" "os" "testing" @@ -16,6 +15,7 @@ import ( "github.com/treeverse/lakefs/pkg/config" "github.com/treeverse/lakefs/pkg/gateway" "github.com/treeverse/lakefs/pkg/gateway/multipart" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" _ "github.com/treeverse/lakefs/pkg/kv/mem" "github.com/treeverse/lakefs/pkg/logging" diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index 4457b2f0007..00a3296532c 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -3,11 +3,11 @@ package kvtest import ( "context" "errors" - "github.com/stretchr/testify/require" "sync/atomic" "testing" "github.com/go-test/deep" + "github.com/stretchr/testify/require" "github.com/treeverse/lakefs/pkg/graveler" "github.com/treeverse/lakefs/pkg/kv" ) @@ -47,81 +47,94 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { } } - t.Run("listing all values of partition", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) - if itr == nil { - t.Fatalf("failed to create partition iterator") - } - defer itr.Close() - names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, []string{"a", "aa", "b", "c", "d"}); diffs != nil { - t.Fatalf("got wrong list of names: %v", diffs) - } - }) + t.Run("listing all values of partition", testPartitionIteratorListAll(ctx, store)) + t.Run("listing values SeekGE", listPartitionIteratorWithSeekGE(ctx, store)) + t.Run("count scans on successive SeekGE operations", testPartitionIteratorCountScansOnSeekGE(store, ctx)) + t.Run("failed SeekGE partition not found", testPartitionIteratorSeekGEOnPartitionNotFound(ctx, store)) + t.Run("SeekGE past end", testPartitionIteratorSeekGEPastEnd(ctx, store)) + t.Run("SeekGE seek back", testPartitionIteratorSeekGESeekBack(ctx, store)) + t.Run("listing values SeekGE with pagination", testPartitionIteratorSeekGEWithPagination(ctx, store)) +} - t.Run("listing values SeekGE", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) - if itr == nil { - t.Fatalf("failed to create partition iterator") +func testPartitionIteratorSeekGEWithPagination(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + // load much more data + moreModelNames := []string{ + "da", "db", "dc", "dd", "de", "df", "dg", "dh", "di", "dj", + "dk", "dl", "dm", "dn", "do", "dp", "dq", "dr", "ds", "dt", + "du", "dv", "dw", "dx", "dy", "dz", + "ea", "eb", "ec", "ed", "ee", "ef", "eg", "eh", "ei", "ej", + "ek", "el", "em", "en", "eo", "ep", "eq", "er", "es", "et", + "eu", "ev", "ew", "ex", "ey", "ez", + "fa", "fb", "fc", "fd", "fe", "ff", "fg", "fh", "fi", "fj", + "fk", "fl", "fm", "fn", "fo", "fp", "fq", "fr", "fs", "ft", + "fu", "fv", "fw", "fx", "fy", "fz", + "ga", "gb", "gc", "gd", "ge", "gf", "gg", "gh", "gi", "gj", + "gk", "gl", "gm", "gn", "go", "gp", "gq", "gr", "gs", "gt", + "gu", "gv", "gw", "gx", "gy", "gz", + "z", } - defer itr.Close() - for _, seekValue := range []string{"b", "aaa", "b"} { - itr.SeekGE([]byte(seekValue)) - names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { - t.Fatalf("got wrong list of names: %v", diffs) + for _, name := range moreModelNames { + model := TestModel{Name: []byte(name)} + for _, partitionKey := range []string{firstPartitionKey, secondPartitionKey} { + err := kv.SetMsg(ctx, store, partitionKey, model.Name, &model) + if err != nil { + t.Fatalf("failed to set model (partition %s, name %s): %s", partitionKey, name, err) + } } } - }) - t.Run("count scans on successive SeekGE operations", func(t *testing.T) { - store := NewStoreWithCounter(store) itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() - for _, seekValue := range []string{"b", "c", "d"} { - itr.SeekGE([]byte(seekValue)) - if !itr.Next() { - t.Fatalf("Expected iterator to have a value") - } - if err := itr.Err(); err != nil { - t.Fatalf("unexpected error: %v", err) - } - k := itr.Entry().Key - if string(k) != seekValue { - t.Fatalf("Expected to find value %s. Found %s", seekValue, k) - } - } - scanCalls := atomic.LoadInt64(&store.ScanCalls) - if scanCalls != 1 { - t.Fatalf("Expected exactly 1 call to Scan. got: %d", scanCalls) + + itr.SeekGE([]byte("b")) + require.True(t, itr.Next()) + + itr.SeekGE([]byte("z")) + require.True(t, itr.Next()) + + itr.SeekGE([]byte("d1")) + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, moreModelNames); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) } - }) + } +} - t.Run("failed SeekGE partition not found", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), "", 0) +func testPartitionIteratorSeekGESeekBack(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() - itr.SeekGE([]byte("d")) + itr.SeekGE([]byte("z")) if itr.Next() { - t.Fatalf("next after seekGE expected to be false") + t.Fatal("expected Next to be false") } - - itr.SeekGE([]byte("d")) - if itr.Next() { - t.Fatalf("next after seekGE expected to be false") + if err := itr.Err(); err != nil { + t.Fatalf("unexpected error: %s", err) } - - if err := itr.Err(); !errors.Is(err, kv.ErrMissingPartitionKey) { - t.Fatalf("expected error: %s, got %v", kv.ErrMissingPartitionKey, err) + itr.SeekGE([]byte("a")) + if !itr.Next() { + t.Fatalf("expected Next to be true") } - }) + if err := itr.Err(); err != nil { + t.Fatalf("unexpected error: %s", err) + } + e := itr.Entry() + model := e.Value.(*TestModel) + if string(model.Name) != "a" { + t.Fatalf("expected value a from iterator") + } + } +} - t.Run("SeekGE past end", func(t *testing.T) { +func testPartitionIteratorSeekGEPastEnd(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") @@ -143,80 +156,89 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { if err := itr.Err(); err != nil { t.Fatalf("unexpected error: %v", err) } - }) + } +} - t.Run("SeekGE seek back", func(t *testing.T) { - itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) +func testPartitionIteratorSeekGEOnPartitionNotFound(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), "", 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() - itr.SeekGE([]byte("z")) + itr.SeekGE([]byte("d")) if itr.Next() { - t.Fatal("expected Next to be false") - } - if err := itr.Err(); err != nil { - t.Fatalf("unexpected error: %s", err) - } - itr.SeekGE([]byte("a")) - if !itr.Next() { - t.Fatalf("expected Next to be true") + t.Fatalf("next after seekGE expected to be false") } - if err := itr.Err(); err != nil { - t.Fatalf("unexpected error: %s", err) + + itr.SeekGE([]byte("d")) + if itr.Next() { + t.Fatalf("next after seekGE expected to be false") } - e := itr.Entry() - model := e.Value.(*TestModel) - if string(model.Name) != "a" { - t.Fatalf("expected value a from iterator") + + if err := itr.Err(); !errors.Is(err, kv.ErrMissingPartitionKey) { + t.Fatalf("expected error: %s, got %v", kv.ErrMissingPartitionKey, err) } - }) + } +} - t.Run("listing values SeekGE with pagination", func(t *testing.T) { - // load much more data - moreModelNames := []string{ - "da", "db", "dc", "dd", "de", "df", "dg", "dh", "di", "dj", - "dk", "dl", "dm", "dn", "do", "dp", "dq", "dr", "ds", "dt", - "du", "dv", "dw", "dx", "dy", "dz", - "ea", "eb", "ec", "ed", "ee", "ef", "eg", "eh", "ei", "ej", - "ek", "el", "em", "en", "eo", "ep", "eq", "er", "es", "et", - "eu", "ev", "ew", "ex", "ey", "ez", - "fa", "fb", "fc", "fd", "fe", "ff", "fg", "fh", "fi", "fj", - "fk", "fl", "fm", "fn", "fo", "fp", "fq", "fr", "fs", "ft", - "fu", "fv", "fw", "fx", "fy", "fz", - "ga", "gb", "gc", "gd", "ge", "gf", "gg", "gh", "gi", "gj", - "gk", "gl", "gm", "gn", "go", "gp", "gq", "gr", "gs", "gt", - "gu", "gv", "gw", "gx", "gy", "gz", - "z", +func testPartitionIteratorCountScansOnSeekGE(store kv.Store, ctx context.Context) func(t *testing.T) { + return func(t *testing.T) { + store := NewStoreWithCounter(store) + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) + if itr == nil { + t.Fatalf("failed to create partition iterator") } - for _, name := range moreModelNames { - model := TestModel{Name: []byte(name)} - for _, partitionKey := range []string{firstPartitionKey, secondPartitionKey} { - err := kv.SetMsg(ctx, store, partitionKey, model.Name, &model) - if err != nil { - t.Fatalf("failed to set model (partition %s, name %s): %s", partitionKey, name, err) - } + defer itr.Close() + for _, seekValue := range []string{"b", "c", "d"} { + itr.SeekGE([]byte(seekValue)) + if !itr.Next() { + t.Fatalf("Expected iterator to have a value") + } + if err := itr.Err(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + k := itr.Entry().Key + if string(k) != seekValue { + t.Fatalf("Expected to find value %s. Found %s", seekValue, k) } } + scanCalls := atomic.LoadInt64(&store.ScanCalls) + if scanCalls != 1 { + t.Fatalf("Expected exactly 1 call to Scan. got: %d", scanCalls) + } + } +} +func listPartitionIteratorWithSeekGE(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), secondPartitionKey, 0) if itr == nil { t.Fatalf("failed to create partition iterator") } defer itr.Close() + for _, seekValue := range []string{"b", "aaa", "b"} { + itr.SeekGE([]byte(seekValue)) + names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) + if diffs := deep.Equal(names, []string{"b", "c", "d"}); diffs != nil { + t.Fatalf("got wrong list of names: %v", diffs) + } + } + } +} - itr.SeekGE([]byte("b")) - require.True(t, itr.Next()) - - itr.SeekGE([]byte("z")) - require.True(t, itr.Next()) - - itr.SeekGE([]byte("d1")) +func testPartitionIteratorListAll(ctx context.Context, store kv.Store) func(t *testing.T) { + return func(t *testing.T) { + itr := kv.NewPartitionIterator(ctx, store, (&TestModel{}).ProtoReflect().Type(), firstPartitionKey, 0) + if itr == nil { + t.Fatalf("failed to create partition iterator") + } + defer itr.Close() names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) - if diffs := deep.Equal(names, moreModelNames); diffs != nil { + if diffs := deep.Equal(names, []string{"a", "aa", "b", "c", "d"}); diffs != nil { t.Fatalf("got wrong list of names: %v", diffs) } - }) + } } // scanPartitionIterator scans the iterator and returns a slice of the results of applying fn to each model. From 477f0ebfb405596ab0ee8b8d1b633ad13aa1480a Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Sat, 9 Sep 2023 21:26:19 +0300 Subject: [PATCH 5/8] Make the tests really fail --- pkg/kv/dynamodb/store_test.go | 2 +- pkg/kv/kvtest/iterators.go | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/kv/dynamodb/store_test.go b/pkg/kv/dynamodb/store_test.go index 982497f38dd..f61afc9daf2 100644 --- a/pkg/kv/dynamodb/store_test.go +++ b/pkg/kv/dynamodb/store_test.go @@ -16,7 +16,7 @@ func TestDynamoKV(t *testing.T) { t.Helper() testParams = &kvparams.DynamoDB{ TableName: testutil.UniqueKVTableName(), - ScanLimit: 10, + ScanLimit: kvtest.MaxPageSize, Endpoint: databaseURI, AwsRegion: "us-east-1", AwsAccessKeyID: "fakeMyKeyId", diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index 00a3296532c..590f31bed9a 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -22,6 +22,9 @@ type StoreWithCounter struct { ScanCalls int64 } +// MaxPageSize is the maximum page size for pagination tests +const MaxPageSize = 10 + func NewStoreWithCounter(store kv.Store) *StoreWithCounter { return &StoreWithCounter{Store: store} } @@ -63,15 +66,6 @@ func testPartitionIteratorSeekGEWithPagination(ctx context.Context, store kv.Sto "da", "db", "dc", "dd", "de", "df", "dg", "dh", "di", "dj", "dk", "dl", "dm", "dn", "do", "dp", "dq", "dr", "ds", "dt", "du", "dv", "dw", "dx", "dy", "dz", - "ea", "eb", "ec", "ed", "ee", "ef", "eg", "eh", "ei", "ej", - "ek", "el", "em", "en", "eo", "ep", "eq", "er", "es", "et", - "eu", "ev", "ew", "ex", "ey", "ez", - "fa", "fb", "fc", "fd", "fe", "ff", "fg", "fh", "fi", "fj", - "fk", "fl", "fm", "fn", "fo", "fp", "fq", "fr", "fs", "ft", - "fu", "fv", "fw", "fx", "fy", "fz", - "ga", "gb", "gc", "gd", "ge", "gf", "gg", "gh", "gi", "gj", - "gk", "gl", "gm", "gn", "go", "gp", "gq", "gr", "gs", "gt", - "gu", "gv", "gw", "gx", "gy", "gz", "z", } for _, name := range moreModelNames { @@ -90,9 +84,14 @@ func testPartitionIteratorSeekGEWithPagination(ctx context.Context, store kv.Sto } defer itr.Close() - itr.SeekGE([]byte("b")) require.True(t, itr.Next()) + itr.SeekGE([]byte("b")) + for i := 0; i < MaxPageSize+1; i++ { + // force + require.True(t, itr.Next()) + } + itr.SeekGE([]byte("z")) require.True(t, itr.Next()) From af49acf221f9240314995fd74a299ed7b80cc0ba Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Sun, 10 Sep 2023 11:00:45 +0300 Subject: [PATCH 6/8] Fix comments --- pkg/kv/cosmosdb/main_test.go | 18 ++++++++++++++++++ pkg/kv/cosmosdb/store_test.go | 25 ------------------------- pkg/kv/kvtest/iterators.go | 15 ++++++++++----- pkg/kv/local/store_test.go | 2 +- pkg/kv/mem/store_test.go | 2 +- pkg/kv/postgres/store_test.go | 26 +++++++++++++++++++++----- pkg/testutil/dynamodb.go | 6 +++++- 7 files changed, 56 insertions(+), 38 deletions(-) delete mode 100644 pkg/kv/cosmosdb/store_test.go diff --git a/pkg/kv/cosmosdb/main_test.go b/pkg/kv/cosmosdb/main_test.go index 43f6baa6a9b..6a9abebd463 100644 --- a/pkg/kv/cosmosdb/main_test.go +++ b/pkg/kv/cosmosdb/main_test.go @@ -1,14 +1,32 @@ package cosmosdb_test import ( + "context" "os" "testing" + "github.com/treeverse/lakefs/pkg/kv" + "github.com/treeverse/lakefs/pkg/kv/cosmosdb" "github.com/treeverse/lakefs/pkg/kv/kvparams" + "github.com/treeverse/lakefs/pkg/kv/kvtest" ) var testParams *kvparams.CosmosDB +func TestCosmosDB(t *testing.T) { + t.Skip("CosmosDB tests are flaky due to the emulator. If you plan on running those, make sure to assign at least 3CPUs and" + + " 4GB of memory to the container running the emulator.") + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { + t.Helper() + store, err := kv.Open(ctx, kvparams.Config{CosmosDB: testParams, Type: cosmosdb.DriverName}) + if err != nil { + t.Fatalf("failed to open kv '%s' store: %s", cosmosdb.DriverName, err) + } + t.Cleanup(store.Close) + return store + }) +} + func TestMain(m *testing.M) { code := m.Run() os.Exit(code) diff --git a/pkg/kv/cosmosdb/store_test.go b/pkg/kv/cosmosdb/store_test.go deleted file mode 100644 index c2fe665b018..00000000000 --- a/pkg/kv/cosmosdb/store_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package cosmosdb_test - -import ( - "context" - "github.com/treeverse/lakefs/pkg/kv" - "github.com/treeverse/lakefs/pkg/kv/cosmosdb" - "github.com/treeverse/lakefs/pkg/kv/kvparams" - "testing" - - "github.com/treeverse/lakefs/pkg/kv/kvtest" -) - -func TestCosmosDB(t *testing.T) { - t.Skip("CosmosDB tests are flaky due to the emulator. If you plan on running those, make sure to assign at least 3CPUs and" + - " 4GB of memory to the container running the emulator.") - kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { - t.Helper() - store, err := kv.Open(ctx, kvparams.Config{CosmosDB: testParams, Type: cosmosdb.DriverName}) - if err != nil { - t.Fatalf("failed to open kv '%s' store: %s", cosmosdb.DriverName, err) - } - t.Cleanup(store.Close) - return store - }) -} diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index 590f31bed9a..8ac2bbf4063 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/go-test/deep" - "github.com/stretchr/testify/require" "github.com/treeverse/lakefs/pkg/graveler" "github.com/treeverse/lakefs/pkg/kv" ) @@ -84,16 +83,22 @@ func testPartitionIteratorSeekGEWithPagination(ctx context.Context, store kv.Sto } defer itr.Close() - require.True(t, itr.Next()) + if !itr.Next() { + t.Fatal("expected Next to be true") + } itr.SeekGE([]byte("b")) for i := 0; i < MaxPageSize+1; i++ { - // force - require.True(t, itr.Next()) + // force pagination + if !itr.Next() { + t.Fatal("expected Next to be true") + } } itr.SeekGE([]byte("z")) - require.True(t, itr.Next()) + if !itr.Next() { + t.Fatal("expected Next to be true") + } itr.SeekGE([]byte("d1")) names := scanPartitionIterator(t, itr, func(_ []byte, model *TestModel) string { return string(model.Name) }) diff --git a/pkg/kv/local/store_test.go b/pkg/kv/local/store_test.go index 78ea6307e86..79299032a35 100644 --- a/pkg/kv/local/store_test.go +++ b/pkg/kv/local/store_test.go @@ -2,9 +2,9 @@ package local_test import ( "context" - "github.com/treeverse/lakefs/pkg/kv" "testing" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" "github.com/treeverse/lakefs/pkg/kv/local" diff --git a/pkg/kv/mem/store_test.go b/pkg/kv/mem/store_test.go index c1189dabffd..cda2128da76 100644 --- a/pkg/kv/mem/store_test.go +++ b/pkg/kv/mem/store_test.go @@ -2,9 +2,9 @@ package mem_test import ( "context" - "github.com/treeverse/lakefs/pkg/kv" "testing" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" "github.com/treeverse/lakefs/pkg/kv/mem" diff --git a/pkg/kv/postgres/store_test.go b/pkg/kv/postgres/store_test.go index 3ff58811a56..0517649565d 100644 --- a/pkg/kv/postgres/store_test.go +++ b/pkg/kv/postgres/store_test.go @@ -2,29 +2,45 @@ package postgres_test import ( "context" - "github.com/treeverse/lakefs/pkg/kv" - "github.com/treeverse/lakefs/pkg/testutil" + "fmt" "testing" + "github.com/jackc/pgx/v5" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" "github.com/treeverse/lakefs/pkg/kv/postgres" + "github.com/treeverse/lakefs/pkg/testutil" ) func TestPostgresKV(t *testing.T) { + databaseURI, cleanup := runDBInstance(pool, testutil.UniqueKVTableName()) + t.Cleanup(cleanup) + kvtest.DriverTest(t, func(t testing.TB, ctx context.Context) kv.Store { t.Helper() - databaseURI, cleanup := runDBInstance(pool, testutil.UniqueKVTableName()) + + conn, err := pgx.Connect(ctx, databaseURI) + if err != nil { + t.Fatalf("Unable to connect to database: %v", err) + } + defer conn.Close(context.Background()) + + // create a new schema per test + schemaName := "test_schema" + testutil.UniqueName() + _, err = conn.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)) + if err != nil { + t.Fatalf("Error creating schema: %v", err) + } store, err := kv.Open(ctx, kvparams.Config{ Type: postgres.DriverName, - Postgres: &kvparams.Postgres{ConnectionString: databaseURI, ScanPageSize: 10}, + Postgres: &kvparams.Postgres{ConnectionString: fmt.Sprintf("%s&search_path=%s", databaseURI, schemaName), ScanPageSize: kvtest.MaxPageSize}, }) if err != nil { t.Fatalf("failed to open kv '%s' store: %s", postgres.DriverName, err) } t.Cleanup(store.Close) - t.Cleanup(cleanup) return store }) } diff --git a/pkg/testutil/dynamodb.go b/pkg/testutil/dynamodb.go index 5e99e053bd6..670b9657af3 100644 --- a/pkg/testutil/dynamodb.go +++ b/pkg/testutil/dynamodb.go @@ -78,7 +78,11 @@ func GetDynamoDBInstance() (string, func(), error) { } func UniqueKVTableName() string { - return "kvstore_" + nanoid.MustGenerate(chars, charsSize) + return "kvstore_" + UniqueName() +} + +func UniqueName() string { + return nanoid.MustGenerate(chars, charsSize) } func GetDynamoDBProd(ctx context.Context, tb testing.TB) kv.Store { From 58cb65824edfb3428f267b4a96a4e28a557fa030 Mon Sep 17 00:00:00 2001 From: itaiad200 Date: Sun, 10 Sep 2023 12:03:51 +0300 Subject: [PATCH 7/8] Update pkg/kv/postgres/store_test.go Co-authored-by: Barak Amar --- pkg/kv/postgres/store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/postgres/store_test.go b/pkg/kv/postgres/store_test.go index 0517649565d..2502ac3aa91 100644 --- a/pkg/kv/postgres/store_test.go +++ b/pkg/kv/postgres/store_test.go @@ -24,7 +24,7 @@ func TestPostgresKV(t *testing.T) { if err != nil { t.Fatalf("Unable to connect to database: %v", err) } - defer conn.Close(context.Background()) + defer conn.Close(ctx) // create a new schema per test schemaName := "test_schema" + testutil.UniqueName() From 146413a61e45f0e9fded28ffb79ea872591f95e6 Mon Sep 17 00:00:00 2001 From: Itai Admi Date: Sun, 10 Sep 2023 12:18:28 +0300 Subject: [PATCH 8/8] More fixes --- pkg/kv/kvtest/iterators.go | 2 +- pkg/kv/postgres/main_test.go | 3 ++- pkg/kv/postgres/store_test.go | 6 ++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvtest/iterators.go b/pkg/kv/kvtest/iterators.go index 8ac2bbf4063..c3466c34c09 100644 --- a/pkg/kv/kvtest/iterators.go +++ b/pkg/kv/kvtest/iterators.go @@ -55,7 +55,7 @@ func testPartitionIterator(t *testing.T, ms MakeStore) { t.Run("failed SeekGE partition not found", testPartitionIteratorSeekGEOnPartitionNotFound(ctx, store)) t.Run("SeekGE past end", testPartitionIteratorSeekGEPastEnd(ctx, store)) t.Run("SeekGE seek back", testPartitionIteratorSeekGESeekBack(ctx, store)) - t.Run("listing values SeekGE with pagination", testPartitionIteratorSeekGEWithPagination(ctx, store)) + t.Run("listing values SeekGE after pagination", testPartitionIteratorSeekGEWithPagination(ctx, store)) } func testPartitionIteratorSeekGEWithPagination(ctx context.Context, store kv.Store) func(t *testing.T) { diff --git a/pkg/kv/postgres/main_test.go b/pkg/kv/postgres/main_test.go index cee79d6a578..d441a390064 100644 --- a/pkg/kv/postgres/main_test.go +++ b/pkg/kv/postgres/main_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/url" "os" "testing" @@ -48,7 +49,7 @@ func runDBInstance(dockerPool *dockertest.Pool, dbName string) (string, func()) // create connection var pgPool *pgxpool.Pool port := resource.GetPort("5432/tcp") - uri := fmt.Sprintf("postgres://lakefs:lakefs@localhost:%s/%s?sslmode=disable", port, dbName) + uri := fmt.Sprintf("postgres://lakefs:lakefs@localhost:%s/%s?sslmode=disable", port, url.PathEscape(dbName)) err = dockerPool.Retry(func() error { var err error pgPool, err = pgxpool.New(ctx, uri) diff --git a/pkg/kv/postgres/store_test.go b/pkg/kv/postgres/store_test.go index 0517649565d..a4c195dadb6 100644 --- a/pkg/kv/postgres/store_test.go +++ b/pkg/kv/postgres/store_test.go @@ -3,6 +3,7 @@ package postgres_test import ( "context" "fmt" + "net/url" "testing" "github.com/jackc/pgx/v5" @@ -28,14 +29,15 @@ func TestPostgresKV(t *testing.T) { // create a new schema per test schemaName := "test_schema" + testutil.UniqueName() + _, err = conn.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+url.PathEscape(schemaName)) _, err = conn.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)) if err != nil { - t.Fatalf("Error creating schema: %v", err) + t.Fatalf("Error creating schema '%s': %s", schemaName, err) } store, err := kv.Open(ctx, kvparams.Config{ Type: postgres.DriverName, - Postgres: &kvparams.Postgres{ConnectionString: fmt.Sprintf("%s&search_path=%s", databaseURI, schemaName), ScanPageSize: kvtest.MaxPageSize}, + Postgres: &kvparams.Postgres{ConnectionString: fmt.Sprintf("%s&search_path=%s", databaseURI, url.PathEscape(schemaName)), ScanPageSize: kvtest.MaxPageSize}, }) if err != nil { t.Fatalf("failed to open kv '%s' store: %s", postgres.DriverName, err)