Skip to content

Commit

Permalink
Remove KV flag (#4010)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Sep 4, 2022
1 parent 249c3e2 commit 71db8f3
Show file tree
Hide file tree
Showing 53 changed files with 445 additions and 1,597 deletions.
118 changes: 40 additions & 78 deletions .github/workflows/esti.yaml

Large diffs are not rendered by default.

15 changes: 0 additions & 15 deletions cmd/lakefs-loadtest/cmd/db.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package cmd

import (
"context"
"fmt"
"os"
"time"

"github.com/treeverse/lakefs/pkg/db/params"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/db"
)

type ReqResult struct {
Expand All @@ -23,15 +17,6 @@ var dbCmd = &cobra.Command{
Short: "Load test database actions",
}

func connectToDB(ctx context.Context, connectionString string) db.Database {
database, err := db.ConnectDB(ctx, params.Database{Driver: db.DatabaseDriver, ConnectionString: connectionString})
if err != nil {
fmt.Printf("Failed connecting to database: %s\n", err)
os.Exit(1)
}
return database
}

//nolint:gochecknoinits,gomnd
func init() {
rootCmd.AddCommand(dbCmd)
Expand Down
23 changes: 6 additions & 17 deletions cmd/lakefs-loadtest/cmd/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var entryCmd = &cobra.Command{
fmt.Printf("Invalid 'ref': %s", uri.ErrInvalidRefURI)
os.Exit(1)
}
connectionString, _ := cmd.Flags().GetString("db")
requests, _ := cmd.Flags().GetInt("requests")
concurrency, _ := cmd.Flags().GetInt("concurrency")
sampleRatio, _ := cmd.Flags().GetFloat64("sample")
Expand All @@ -51,10 +50,6 @@ var entryCmd = &cobra.Command{
rand.Seed(time.Now().UTC().UnixNano()) // make it special

ctx := cmd.Context()
database := connectToDB(ctx, connectionString)
defer database.Close()
lockDB := connectToDB(ctx, connectionString)
defer lockDB.Close()

conf, err := config.NewConfig()
if err != nil {
Expand All @@ -65,22 +60,16 @@ var entryCmd = &cobra.Command{
fmt.Printf("invalid config: %s\n", err)
}

var storeMessage *kv.StoreMessage
dbParams := conf.GetDatabaseParams()
if dbParams.KVEnabled {
kvParams := conf.GetKVParams()
kvStore, err := kv.Open(ctx, kvParams)
if err != nil {
logging.Default().WithError(err).Fatal("failed to open KV store")
}
defer kvStore.Close()
storeMessage = &kv.StoreMessage{Store: kvStore}
kvParams := conf.GetKVParams()
kvStore, err := kv.Open(ctx, kvParams)
if err != nil {
logging.Default().WithError(err).Fatal("failed to open KV store")
}
defer kvStore.Close()
storeMessage := &kv.StoreMessage{Store: kvStore}

c, err := catalog.New(ctx, catalog.Config{
Config: conf,
DB: database,
LockDB: lockDB,
KVStore: storeMessage,
})
if err != nil {
Expand Down
28 changes: 8 additions & 20 deletions cmd/lakefs/cmd/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/db"
"github.com/treeverse/lakefs/pkg/diagnostics"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/stats"
Expand All @@ -22,24 +21,14 @@ var diagnosticsCmd = &cobra.Command{
ctx := cmd.Context()
output, _ := cmd.Flags().GetString("output")

dbParams := cfg.GetDatabaseParams()
var (
dbPool db.Database
storeMessage *kv.StoreMessage
)
if dbParams.KVEnabled {
kvParams := cfg.GetKVParams()
kvStore, err := kv.Open(ctx, kvParams)
if err != nil {
log.Fatalf("Failed to open KV store: %s", err)
}
defer kvStore.Close()
storeMessage = &kv.StoreMessage{
Store: kvStore,
}
} else {
dbPool = db.BuildDatabaseConnection(ctx, cfg.GetDatabaseParams())
defer dbPool.Close()
kvParams := cfg.GetKVParams()
kvStore, err := kv.Open(ctx, kvParams)
if err != nil {
log.Fatalf("Failed to open KV store: %s", err)
}
defer kvStore.Close()
storeMessage := &kv.StoreMessage{
Store: kvStore,
}

adapter, err := factory.BuildBlockAdapter(ctx, &stats.NullCollector{}, cfg)
Expand All @@ -48,7 +37,6 @@ var diagnosticsCmd = &cobra.Command{
}
c, err := catalog.New(ctx, catalog.Config{
Config: cfg,
DB: dbPool,
KVStore: storeMessage,
})
if err != nil {
Expand Down
45 changes: 9 additions & 36 deletions cmd/lakefs/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"bufio"
"context"
"errors"
"fmt"
"net/url"
"os"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/cmdutils"
"github.com/treeverse/lakefs/pkg/db"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/logging"
Expand Down Expand Up @@ -71,40 +69,16 @@ func runImport(cmd *cobra.Command, args []string) (statusCode int) {
cfg := loadConfig()
ctx := cmd.Context()
logger := logging.FromContext(ctx)
dbParams := cfg.GetDatabaseParams()
var (
idGen actions.IDGenerator
actionsStore actions.Store
storeMessage *kv.StoreMessage
dbPool db.Database
)
if dbParams.KVEnabled {
kvParams := cfg.GetKVParams()
kvStore, err := kv.Open(ctx, kvParams)
if err != nil {
logger.WithError(err).Fatal("failed to open KV store")
}
defer kvStore.Close()
storeMessage = &kv.StoreMessage{Store: kvStore}

actionsStore = actions.NewActionsKVStore(*storeMessage)
idGen = &actions.DecreasingIDGenerator{}
} else {
dbPool = db.BuildDatabaseConnection(ctx, dbParams)
defer dbPool.Close()
err := db.ValidateSchemaUpToDate(ctx, dbPool, dbParams)
if errors.Is(err, db.ErrSchemaNotCompatible) {
fmt.Println("Migration version mismatch, for more information see https://docs.lakefs.io/deploying-aws/upgrade.html")
return 1
}
if err != nil {
fmt.Printf("%s\n", err)
return 1
}

actionsStore = actions.NewActionsDBStore(dbPool)
idGen = &actions.IncreasingIDGenerator{}
kvParams := cfg.GetKVParams()
kvStore, err := kv.Open(ctx, kvParams)
if err != nil {
logger.WithError(err).Fatal("failed to open KV store")
}
defer kvStore.Close()
storeMessage := &kv.StoreMessage{Store: kvStore}

actionsStore := actions.NewActionsKVStore(*storeMessage)
idGen := &actions.DecreasingIDGenerator{}

u := uri.Must(uri.Parse(args[0]))
if !u.IsRepository() {
Expand Down Expand Up @@ -134,7 +108,6 @@ func runImport(cmd *cobra.Command, args []string) (statusCode int) {

c, err := catalog.New(ctx, catalog.Config{
Config: cfg,
DB: dbPool,
KVStore: storeMessage,
})
if err != nil {
Expand Down
26 changes: 12 additions & 14 deletions cmd/lakefs/cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,15 @@ var versionCmd = &cobra.Command{
cfg := loadConfig()
ctx := cmd.Context()
dbParams := cfg.GetDatabaseParams()
if dbParams.KVEnabled {
kvStore, err := kv.Open(ctx, cfg.GetKVParams())
if err != nil {
fmt.Printf("Failed to open KV store: %s\n", err)
return
}
defer kvStore.Close()
if kv.ValidateSchemaVersion(ctx, kvStore, false) == nil {
// Migration already occurred in KV or setup is required
return
}
kvStore, err := kv.Open(ctx, cfg.GetKVParams())
if err != nil {
fmt.Printf("Failed to open KV store: %s\n", err)
return
}
defer kvStore.Close()
if kv.ValidateSchemaVersion(ctx, kvStore, false) == nil {
// Migration already occurred in KV or setup is required
return
}
dbPool := db.BuildDatabaseConnection(ctx, dbParams)
defer dbPool.Close()
Expand All @@ -55,7 +53,7 @@ var upCmd = &cobra.Command{
Short: "Apply all up migrations",
Run: func(cmd *cobra.Command, args []string) {
cfg := loadConfig()
err := db.MigrateUp(cfg.GetDatabaseParams())
err := db.MigrateUp(cfg.GetDatabaseParams(), cfg, cfg.GetKVParams())
if err != nil {
fmt.Printf("Failed to setup DB: %s\n", err)
os.Exit(1)
Expand All @@ -70,8 +68,8 @@ var gotoCmd = &cobra.Command{
cfg := loadConfig()
ctx := cmd.Context()
dbParams := cfg.GetDatabaseParams()
if dbParams.KVEnabled {
fmt.Printf("Unsupported command for KV migration version\n")
if dbParams.ConnectionString != "" {
fmt.Printf("Missing database.connection_string for migration\n")
os.Exit(1)
}
version, err := cmd.Flags().GetUint("version")
Expand Down
3 changes: 0 additions & 3 deletions cmd/lakefs/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ func initConfig() {
}

logger.WithFields(cfg.ToLoggerFields()).Info("Config")
if cfg.GetDatabaseParams().KVEnabled {
logger.Warn("USING KV EXPERIMENTAL FLAG!!! USE AT YOUR OWN RISK!!!")
}
}

// getHomeDir find and return the home directory
Expand Down
Loading

0 comments on commit 71db8f3

Please sign in to comment.