diff --git a/catalog/factory/build.go b/catalog/factory/build.go index daa510af1c3..d3f2c4a938f 100644 --- a/catalog/factory/build.go +++ b/catalog/factory/build.go @@ -13,8 +13,8 @@ func BuildCataloger(db db.Database, c *config.Config) (catalog.Cataloger, error) return mvcc.NewCataloger(db, mvcc.WithCacheEnabled(false)), nil } catType := c.GetCatalogerType() - if catType == "rocks" { - return rocks.NewCataloger(db, c) + if catType == "mvcc" { + return mvcc.NewCataloger(db, mvcc.WithParams(c.GetMvccCatalogerCatalogParams())), nil } - return mvcc.NewCataloger(db, mvcc.WithParams(c.GetMvccCatalogerCatalogParams())), nil + return rocks.NewCataloger(db, c) } diff --git a/catalog/migrate/migrate.go b/catalog/migrate/migrate.go index 61bd16a99e0..7ab302a9fa1 100644 --- a/catalog/migrate/migrate.go +++ b/catalog/migrate/migrate.go @@ -54,6 +54,8 @@ type Reporter interface { const ( initialCommitMessage = "Create empty new branch for migrate" + + migrateTimestampKeyName = "migrate_timestamp" ) func (m *Migrate) Close() error { @@ -353,8 +355,18 @@ func (m *Migrate) selectRepoCommits(ctx context.Context, repo *graveler.Reposito } func (m *Migrate) postMigrate() error { + m.log.Info("update db migrate timestamp") + migrateTimestamp := time.Now().UTC().Format(time.RFC3339) + _, err := m.db.Exec(`INSERT INTO auth_installation_metadata(key_name, key_value) + VALUES ($1,$2) + ON CONFLICT (key_name) DO UPDATE SET key_value=EXCLUDED.key_value`, + migrateTimestampKeyName, migrateTimestamp) + if err != nil { + return err + } + m.log.Info("start analyze db") - _, err := m.db.Exec(` + _, err = m.db.Exec(` analyze graveler_staging_kv; analyze graveler_commits; analyze graveler_tags; @@ -365,7 +377,6 @@ func (m *Migrate) postMigrate() error { func (m *Migrate) SetReporter(r Reporter) { m.reporter = r } - func (c *commitRecord) Scan(rows pgx.Row) error { return rows.Scan(&c.BranchID, &c.BranchName, &c.CommitID, &c.PreviousCommitID, &c.Committer, &c.Message, &c.CreationDate, &c.Metadata, @@ -379,3 +390,25 @@ func (n *nullReporter) BeginRepository(string) {} func (n *nullReporter) BeginCommit(string, string, string, string) {} func (n *nullReporter) EndRepository(error) {} + +func CheckMigrationRequired(conn db.Database) bool { + // check if we already run migration + res, err := conn.Transact(func(tx db.Tx) (interface{}, error) { + var migrationRun bool + err := tx.GetPrimitive(&migrationRun, `SELECT EXISTS(SELECT 1 FROM auth_installation_metadata WHERE key_name=$1)`, migrateTimestampKeyName) + if err != nil || migrationRun { + return false, err + } + // check if we have catalog repositories + var repoCount int + err = tx.GetPrimitive(&repoCount, `SELECT COUNT(*) FROM catalog_repositories`) + if err != nil { + return false, err + } + return repoCount > 0, nil + }, db.ReadOnly()) + if err != nil { + return false + } + return res.(bool) +} diff --git a/cmd/lakefs/cmd/migrate_db.go b/cmd/lakefs/cmd/migrate_db.go index 5181083069b..69055a6b660 100644 --- a/cmd/lakefs/cmd/migrate_db.go +++ b/cmd/lakefs/cmd/migrate_db.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "errors" "fmt" "os" @@ -20,13 +19,10 @@ var migrateDBCmd = &cobra.Command{ Long: `Migrate database content from MVCC model to the current format`, Run: func(cmd *cobra.Command, args []string) { dbParams := cfg.GetDatabaseParams() - err := db.ValidateSchemaUpToDate(dbParams) - if errors.Is(err, db.ErrSchemaNotCompatible) { - fmt.Println("Migration version mismatch, for more information see https://docs.lakefs.io/deploying/upgrade.html") - os.Exit(1) - } + + err := db.MigrateUp(dbParams) if err != nil { - fmt.Println(err) + fmt.Printf("Failed to setup DB: %s\n", err) os.Exit(1) } diff --git a/cmd/lakefs/cmd/run.go b/cmd/lakefs/cmd/run.go index 4903ceeb50c..193a29c9019 100644 --- a/cmd/lakefs/cmd/run.go +++ b/cmd/lakefs/cmd/run.go @@ -21,6 +21,7 @@ import ( "github.com/treeverse/lakefs/auth/crypt" "github.com/treeverse/lakefs/block/factory" catalogfactory "github.com/treeverse/lakefs/catalog/factory" + catalogmigrate "github.com/treeverse/lakefs/catalog/migrate" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/dedup" @@ -83,6 +84,16 @@ var runCmd = &cobra.Command{ logger.WithError(err).Fatal("Failed to create block adapter") } + // Migrate old MVCC - if cataloger type is not set, + // warn the user in case migrate didn't run and there are MVCC repositories + if cfg.GetCatalogerType() == "" { + migrationRequired := catalogmigrate.CheckMigrationRequired(dbPool) + if migrationRequired { + logger.Fatal("Data migration is required") + fmt.Println(migrateRequiredMsg) + } + } + // init authentication authService := auth.NewDBAuthService( dbPool, @@ -92,8 +103,10 @@ var runCmd = &cobra.Command{ cloudMetadataProvider := stats.BuildMetadataProvider(logger, cfg) metadata := stats.NewMetadata(logger, cfg, authMetadataManager, cloudMetadataProvider) bufferedCollector := stats.NewBufferedCollector(metadata.InstallationID, cfg) + // send metadata bufferedCollector.CollectMetadata(metadata) + // update health info with installation ID httputil.SetHealthHandlerInfo(metadata.InstallationID) @@ -180,6 +193,25 @@ var runCmd = &cobra.Command{ }, } +const migrateRequiredMsg = `Data migration is required - https://docs.lakefs.io/deploying/upgrade.html. +Starting version 0.30.0, lakeFS handles your committed metadata in a new way, +which is more robust and has better performance. +To move your existing data, you will need to run the following upgrade command: + + $ lakefs migrate db + +If you want to start over, discarding your existing data, you need to explicitly state this in your lakeFS configuration file. +To do so, add the following to your configuration: + +cataloger: + type: rocks + +And run: + + $ lakefs migrate up + +` + const runBanner = ` ██╗ █████╗ ██╗ ██╗███████╗███████╗███████╗ diff --git a/docs/deploying/upgrade.md b/docs/deploying/upgrade.md index 2579f608d89..68126f9337d 100644 --- a/docs/deploying/upgrade.md +++ b/docs/deploying/upgrade.md @@ -12,7 +12,8 @@ has_children: false Upgrading lakeFS from a previous version usually just requires re-deploying with the latest image (or downloading the latest version, if you're using the binary). There are cases where the database will require a migration - check whether the [release](https://github.com/treeverse/lakeFS/releases) you are upgrading to requires that. -# Migrating + +# Migrating from lakeFS >= 0.30.0 In case a migration is required, first stop the running lakeFS service. Using the `lakefs` binary for the new version, run the following: @@ -24,3 +25,29 @@ lakefs migrate up Deploy (or run) the new version of lakeFS. Note that an older version of lakeFS cannot run on a migrated database. + + +# Migrating from lakeFS < 0.30.0 + +Starting version 0.30.0, lakeFS handles your committed metadata in a [new way](https://docs.google.com/document/d/1jzD7-jun-tdU5BGapmnMBe9ovSzBvTNjXCcVztV07A4/edit?usp=sharing){: target="_blank" }, which is more robust and has better performance. +To move your existing data, you will need to run the following upgrade commands. + +Verify lakeFS version >= 0.30.0: + +```shell +lakefs --version +``` + +Migrate data from previous format: + +```shell +lakefs migrate db +``` + +If you want to start over, discarding your existing data, you need to explicitly state this in your lakeFS configuration file. +To do so, add the following to your configuration: + +```yaml +cataloger: + type: rocks +```