From 5c7e857b9d8369946c9584273dc70a92e2bb6117 Mon Sep 17 00:00:00 2001 From: Peter-Jan Brone Date: Tue, 19 Nov 2024 16:10:24 +0100 Subject: [PATCH] Remove `directories` table (#1683) This PR merges the removal of directories into `master`. Please review carefully because merging it wasn't straightforward because `master` is quite far behind `dev` at this point. ~~I'm still in the process of (re)testing the migrations.~~ edit: tested the migration on both SQLite and MySQL --- internal/sql/migrations.go | 6 + internal/test/e2e/cluster_test.go | 2 +- stores/bench_test.go | 123 ++++++--- stores/metadata.go | 47 +--- stores/metadata_test.go | 197 ++------------ stores/sql/database.go | 17 +- stores/sql/main.go | 250 +++++------------- stores/sql/mysql/main.go | 157 ++++------- .../migration_00020_remove_directories.sql | 3 + stores/sql/mysql/migrations/main/schema.sql | 20 +- stores/sql/sqlite/main.go | 96 ++----- .../migration_00020_remove_directories.sql | 18 ++ stores/sql/sqlite/migrations/main/schema.sql | 8 +- 13 files changed, 299 insertions(+), 645 deletions(-) create mode 100644 stores/sql/mysql/migrations/main/migration_00020_remove_directories.sql create mode 100644 stores/sql/sqlite/migrations/main/migration_00020_remove_directories.sql diff --git a/internal/sql/migrations.go b/internal/sql/migrations.go index 936518c42..105406cea 100644 --- a/internal/sql/migrations.go +++ b/internal/sql/migrations.go @@ -294,6 +294,12 @@ var ( return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00019_scan_reset", log) }, }, + { + ID: "00020_remove_directories", + Migrate: func(tx Tx) error { + return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00020_remove_directories", log) + }, + }, } } MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 12f603203..8c7227fab 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -399,7 +399,7 @@ func TestObjectEntries(t *testing.T) { entries[i].ModTime = api.TimeRFC3339{} // assert mime type - isDir := strings.HasSuffix(entries[i].Name, "/") && entries[i].Name != "//double/" // double is a file + isDir := strings.HasSuffix(entries[i].Name, "/") if (isDir && entries[i].MimeType != "") || (!isDir && entries[i].MimeType == "") { t.Fatal("unexpected mime type", entries[i].MimeType) } diff --git a/stores/bench_test.go b/stores/bench_test.go index 8c17d40e1..6762d723a 100644 --- a/stores/bench_test.go +++ b/stores/bench_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "path/filepath" + "strings" "testing" "time" @@ -15,34 +16,59 @@ import ( "go.sia.tech/renterd/stores/sql" "go.sia.tech/renterd/stores/sql/sqlite" "go.uber.org/zap" + "lukechampine.com/frand" ) -// BenchmarkRenameDirectories benchmarks renaming a directory. +// BenchmarkObjects benchmarks the performance of various object-related +// database operations. // -// M1 Max | 54057 ns/op | 10418 B/op | 251 allocs/op -func BenchmarkRenameDirectories(b *testing.B) { - // create database +// cpu: Apple M1 Max +// BenchmarkObjects/ObjectEntries-10 11618 102732 ns/op 7074 B/op 99 allocs/op +// BenchmarkObjects/RenameObjects-10 12705 94236 ns/op 3506 B/op 81 allocs/op +func BenchmarkObjects(b *testing.B) { db, err := newTestDB(context.Background(), b.TempDir()) if err != nil { b.Fatal(err) } + // test parameters + objects := int(1e2) + bucket := "bucket" + // prepare database - if err := insertDirectories(db, b.Name()); err != nil { + dirs, err := insertObjects(db.DB(), bucket, objects) + if err != nil { b.Fatal(err) } - // start benchmark - b.ResetTimer() - for i := 0; i < b.N; i++ { - if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { - _, err := tx.RenameDirectories(context.Background(), b.Name(), "/a/b/c/", "/c/b/a/") - return err - }); err != nil { - b.Fatal(err) + b.Run("ObjectEntries", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { + _, _, err := tx.ObjectEntries(context.Background(), bucket, dirs[i%len(dirs)], "", "", "", "", 0, -1) + return err + }); err != nil { + b.Fatal(err) + } } - } + }) + + // start rename benchmark + b.Run("RenameObjects", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { + err := tx.RenameObjects(context.Background(), bucket, dirs[frand.Intn(i+1)%len(dirs)], dirs[frand.Intn(i+1)%len(dirs)], true) + if err != nil && !errors.Is(err, api.ErrObjectNotFound) { + return err + } + return nil + }); err != nil { + b.Fatal(err) + } + } + }) } // BenchmarkPrunableContractRoots benchmarks diffing the roots of a contract @@ -93,22 +119,51 @@ func BenchmarkPrunableContractRoots(b *testing.B) { } } -func insertContractSectors(db *isql.DB, fcid types.FileContractID, n int) (roots []types.Hash256, _ error) { - // insert host - hk := types.PublicKey{1} - res, err := db.Exec(context.Background(), ` -INSERT INTO hosts (public_key) VALUES (?)`, sql.PublicKey(hk)) +func insertObjects(db *isql.DB, bucket string, n int) (dirs []string, _ error) { + var bucketID int64 + res, err := db.Exec(context.Background(), "INSERT INTO buckets (created_at, name) VALUES (?, ?)", time.Now(), bucket) if err != nil { return nil, err + } else if bucketID, err = res.LastInsertId(); err != nil { + return nil, err } - hostID, err := res.LastInsertId() + + stmt, err := db.Prepare(context.Background(), "INSERT INTO objects (created_at,object_id, db_bucket_id, size, mime_type, etag) VALUES (?, ?, ?, ?, '', '')") if err != nil { return nil, err } + defer stmt.Close() - // insert contract - res, err = db.Exec(context.Background(), ` -INSERT INTO contracts (host_id, fcid,start_height) VALUES (?, ?, ?)`, hostID, sql.FileContractID(fcid), 0) + var path string + seen := make(map[string]struct{}) + for i := 0; i < n; i++ { + for { + path = generateRandomPath(6) + if _, used := seen[path]; !used { + break + } + } + seen[path] = struct{}{} + + size := frand.Intn(1e3) + if frand.Intn(10) == 0 { + path += "/" + size = 0 + dirs = append(dirs, path) + } + _, err := stmt.Exec(context.Background(), time.Now(), path, bucketID, size) + if err != nil { + return nil, err + } + } + return dirs, nil +} + +func insertContractSectors(db *isql.DB, fcid types.FileContractID, n int) (roots []types.Hash256, _ error) { + // insert host + hk := types.PublicKey{1} + res, err := db.Exec(context.Background(), ` +INSERT INTO contracts (fcid, host_key, start_height, v2) VALUES (?, ?, ?, ?)`, sql.PublicKey(hk), sql.FileContractID(fcid), 0, false) if err != nil { return nil, err } @@ -131,7 +186,7 @@ INSERT INTO slabs (created_at, `+"`key`"+`) VALUES (?, ?)`, time.Now(), sql.Encr // insert sectors insertSectorStmt, err := db.Prepare(context.Background(), ` -INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, ?) RETURNING id`) +INSERT INTO sectors (db_slab_id, slab_index, root) VALUES (?, ?, ?) RETURNING id`) if err != nil { return nil, fmt.Errorf("failed to prepare statement to insert sector: %w", err) } @@ -140,7 +195,7 @@ INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, for i := 0; i < n; i++ { var sectorID int64 roots = append(roots, frand.Entropy256()) - err := insertSectorStmt.QueryRow(context.Background(), slabID, i, sql.PublicKey(hk), sql.Hash256(roots[i])).Scan(§orID) + err := insertSectorStmt.QueryRow(context.Background(), slabID, i, sql.Hash256(roots[i])).Scan(§orID) if err != nil { return nil, fmt.Errorf("failed to insert sector: %w", err) } @@ -175,14 +230,16 @@ WHERE c.fcid = ?`, sql.FileContractID(fcid)).Scan(&cnt) return } -func insertDirectories(db *sqlite.MainDatabase, bucket string) error { - return db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { - if err := tx.CreateBucket(context.Background(), bucket, api.BucketPolicy{}); err != nil { - return err - } - _, err := tx.InsertDirectories(context.Background(), bucket, "/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z") - return err - }) +func generateRandomPath(maxLevels int) string { + numLevels := frand.Intn(maxLevels) + 1 + letters := "abcdef" + + var path []string + for i := 0; i < numLevels; i++ { + path = append(path, string(letters[frand.Intn(len(letters))])) + } + + return "/" + strings.Join(path, "/") } func newTestDB(ctx context.Context, dir string) (*sqlite.MainDatabase, error) { diff --git a/stores/metadata.go b/stores/metadata.go index bfa3b79fc..64e971ae9 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -40,7 +40,6 @@ const ( var ( pruneSlabsAlertID = frand.Entropy256() - pruneDirsAlertID = frand.Entropy256() ) var objectDeleteBatchSizes = []int64{10, 50, 100, 200, 500, 1000, 5000, 10000, 50000, 100000} @@ -421,17 +420,9 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error { return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { - // create new dir - dirID, err := tx.InsertDirectories(ctx, bucket, keyNew) - if err != nil { - return err - } - // update object - err = tx.RenameObject(ctx, bucket, keyOld, keyNew, dirID, force) - if err != nil { + if err := tx.RenameObject(ctx, bucket, keyOld, keyNew, force); err != nil { return err } - // delete old dir if empty s.triggerSlabPruning() return nil }) @@ -439,14 +430,9 @@ func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew stri func (s *SQLStore) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error { return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { - // create new dir - dirID, err := tx.RenameDirectories(ctx, bucket, prefixOld, prefixNew) - if err != nil { - return fmt.Errorf("RenameObjects: failed to create new directory: %w", err) - } else if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, dirID, force); err != nil { + if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, force); err != nil { return err } - // prune old dirs s.triggerSlabPruning() return nil }) @@ -512,14 +498,8 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, return fmt.Errorf("UpdateObject: failed to delete object: %w", err) } - // create the dir - dirID, err := tx.InsertDirectories(ctx, bucket, path) - if err != nil { - return fmt.Errorf("failed to create directories for path '%s': %w", path, err) - } - // Insert a new object. - err = tx.InsertObject(ctx, bucket, path, contractSet, dirID, o, mimeType, eTag, metadata) + err = tx.InsertObject(ctx, bucket, path, contractSet, o, mimeType, eTag, metadata) if err != nil { return fmt.Errorf("failed to insert object: %w", err) } @@ -753,27 +733,6 @@ func (s *SQLStore) pruneSlabsLoop() { } } - // prune dirs - err := s.db.Transaction(s.shutdownCtx, func(dt sql.DatabaseTx) error { - return dt.PruneEmptydirs(s.shutdownCtx) - }) - if err != nil { - s.logger.Errorw("dir pruning failed", zap.Error(err)) - s.alerts.RegisterAlert(s.shutdownCtx, alerts.Alert{ - ID: pruneDirsAlertID, - Severity: alerts.SeverityWarning, - Message: "Failed to prune dirs", - Timestamp: time.Now(), - Data: map[string]interface{}{ - "error": err.Error(), - "hint": "This might happen when your database is under a lot of load due to deleting objects rapidly. This alert will disappear the next time slabs are pruned successfully.", - }, - }) - pruneSuccess = false - } else { - s.alerts.DismissAlerts(s.shutdownCtx, pruneDirsAlertID) - } - // mark the last prune time where both slabs and dirs were pruned if pruneSuccess { s.mu.Lock() diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 2e732b7a2..8c38e70d5 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1665,7 +1665,7 @@ func TestObjectEntriesExplicitDir(t *testing.T) { }{ {"/dir/", 0}, // empty dir - created first {"/dir/file", 1}, // file uploaded to dir - {"/dir2/", 2}, // empty dir - remains empty + {"/dir2/", 0}, // empty dir - remains empty } ctx := context.Background() @@ -1698,7 +1698,7 @@ func TestObjectEntriesExplicitDir(t *testing.T) { }{ {"/", "", "", "", []api.ObjectMetadata{ {Name: "/dir/", Size: 1, Health: 0.5}, - {ETag: "d34db33f", Name: "/dir2/", Size: 2, Health: 1, MimeType: testMimeType}, // has MimeType and ETag since it's a file + {Name: "/dir2/", Size: 0, Health: 1}, }}, {"/dir/", "", "", "", []api.ObjectMetadata{{ETag: "d34db33f", Name: "/dir/file", Size: 1, Health: 0.5, MimeType: testMimeType}}}, } @@ -2640,64 +2640,6 @@ func TestRenameObjects(t *testing.T) { t.Fatal("unexpected path", obj.Name) } } - - // Assert directories are correct - expectedDirs := []struct { - id int64 - parentID int64 - name string - }{ - { - id: 1, - parentID: 0, - name: "/", - }, - { - id: 2, - parentID: 1, - name: "/fileÅ›/", - }, - { - id: 5, - parentID: 2, - name: "/fileÅ›/foo/", - }, - } - - var n int64 - if err := ss.DB().QueryRow(ctx, "SELECT COUNT(*) FROM directories").Scan(&n); err != nil { - t.Fatal(err) - } else if n != int64(len(expectedDirs)) { - t.Fatalf("unexpected number of directories, %v != %v", n, len(expectedDirs)) - } - - type row struct { - ID int64 - ParentID int64 - Name string - } - rows, err := ss.DB().Query(context.Background(), "SELECT id, COALESCE(db_parent_id, 0), name FROM directories ORDER BY id ASC") - if err != nil { - t.Fatal(err) - } - defer rows.Close() - var i int - for rows.Next() { - var dir row - if err := rows.Scan(&dir.ID, &dir.ParentID, &dir.Name); err != nil { - t.Fatal(err) - } else if dir.ID != expectedDirs[i].id { - t.Fatalf("unexpected directory id, %v != %v", dir.ID, expectedDirs[i].id) - } else if dir.ParentID != expectedDirs[i].parentID { - t.Fatalf("unexpected directory parent id, %v != %v", dir.ParentID, expectedDirs[i].parentID) - } else if dir.Name != expectedDirs[i].name { - t.Fatalf("unexpected directory name, %v != %v", dir.Name, expectedDirs[i].name) - } - i++ - } - if len(expectedDirs) != i { - t.Fatalf("expected %v dirs, got %v", len(expectedDirs), i) - } } func TestRenameObjectsRegression(t *testing.T) { @@ -2787,6 +2729,25 @@ func TestRenameObjectsRegression(t *testing.T) { assertNumObjects("/video/", 2) assertNumObjects("/video/thriller/", 2) assertNumObjects("/", 4) + + // assert we can move a folder up + if err := ss.RenameObjects(ctx, testBucket, "/video/thriller/", "/thriller/", false); err != nil { + t.Fatal(err) + } + + assertNumObjects("/video/", 1) + assertNumObjects("/thriller/", 2) + assertNumObjects("/", 5) + + // assert we can move a folder down + if err := ss.RenameObjects(ctx, testBucket, "/thriller/", "/audio/thriller/", false); err != nil { + t.Fatal(err) + } + + assertNumObjects("/video/", 1) + assertNumObjects("/audio/thriller/", 2) + assertNumObjects("/audio/", 2) + assertNumObjects("/", 4) } // TestObjectsStats is a unit test for ObjectsStats. @@ -4372,29 +4333,19 @@ func TestSlabCleanup(t *testing.T) { t.Fatal(err) } - var dirID int64 - err = ss.db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { - var err error - dirID, err = tx.InsertDirectories(context.Background(), api.DefaultBucketName, "/") - return err - }) - if err != nil { - t.Fatal(err) - } - // create objects - insertObjStmt, err := ss.DB().Prepare(context.Background(), "INSERT INTO objects (db_directory_id, object_id, db_bucket_id, health) VALUES (?, ?, ?, ?);") + insertObjStmt, err := ss.DB().Prepare(context.Background(), "INSERT INTO objects (object_id, db_bucket_id, health) VALUES (?, ?, ?);") if err != nil { t.Fatal(err) } defer insertObjStmt.Close() var obj1ID, obj2ID int64 - if res, err := insertObjStmt.Exec(context.Background(), dirID, "/1", ss.DefaultBucketID(), 1); err != nil { + if res, err := insertObjStmt.Exec(context.Background(), "/1", ss.DefaultBucketID(), 1); err != nil { t.Fatal(err) } else if obj1ID, err = res.LastInsertId(); err != nil { t.Fatal(err) - } else if res, err := insertObjStmt.Exec(context.Background(), dirID, "/2", ss.DefaultBucketID(), 1); err != nil { + } else if res, err := insertObjStmt.Exec(context.Background(), "/2", ss.DefaultBucketID(), 1); err != nil { t.Fatal(err) } else if obj2ID, err = res.LastInsertId(); err != nil { t.Fatal(err) @@ -4450,7 +4401,7 @@ func TestSlabCleanup(t *testing.T) { } var obj3ID int64 - if res, err := insertObjStmt.Exec(context.Background(), dirID, "3", ss.DefaultBucketID(), 1); err != nil { + if res, err := insertObjStmt.Exec(context.Background(), "3", ss.DefaultBucketID(), 1); err != nil { t.Fatal(err) } else if obj3ID, err = res.LastInsertId(); err != nil { t.Fatal(err) @@ -4905,101 +4856,3 @@ func TestUpdateObjectParallel(t *testing.T) { close(c) wg.Wait() } - -func TestDirectories(t *testing.T) { - ss := newTestSQLStore(t, defaultTestSQLStoreConfig) - defer ss.Close() - - paths := []string{ - "/foo", - "/bar/baz", - "///somefile", - "/dir/fakedir/", - "/", - "/bar/fileinsamedirasbefore", - } - - for _, p := range paths { - var dirID int64 - err := ss.db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { - var err error - dirID, err = tx.InsertDirectories(context.Background(), api.DefaultBucketName, p) - return err - }) - if err != nil { - t.Fatal(err) - } else if dirID == 0 { - t.Fatalf("unexpected dir id %v", dirID) - } - } - - expectedDirs := []struct { - name string - id int64 - parentID int64 - }{ - { - name: "/", - id: 1, - parentID: 0, - }, - { - name: "/bar/", - id: 2, - parentID: 1, - }, - { - name: "//", - id: 3, - parentID: 1, - }, - { - name: "///", - id: 4, - parentID: 3, - }, - { - name: "/dir/", - id: 5, - parentID: 1, - }, - } - - type row struct { - ID int64 - ParentID int64 - Name string - } - rows, err := ss.DB().Query(context.Background(), "SELECT id, COALESCE(db_parent_id, 0), name FROM directories ORDER BY id ASC") - if err != nil { - t.Fatal(err) - } - defer rows.Close() - var nDirs int - for i := 0; rows.Next(); i++ { - var dir row - if err := rows.Scan(&dir.ID, &dir.ParentID, &dir.Name); err != nil { - t.Fatal(err) - } else if dir.ID != expectedDirs[i].id { - t.Fatalf("unexpected id %v", dir.ID) - } else if dir.ParentID != expectedDirs[i].parentID { - t.Fatalf("unexpected parent id %v", dir.ParentID) - } else if dir.Name != expectedDirs[i].name { - t.Fatalf("unexpected name '%v' != '%v'", dir.Name, expectedDirs[i].name) - } - nDirs++ - } - if len(expectedDirs) != nDirs { - t.Fatalf("expected %v dirs, got %v", len(expectedDirs), nDirs) - } - - now := time.Now() - ss.Retry(100, 100*time.Millisecond, func() error { - ss.triggerSlabPruning() - return ss.waitForPruneLoop(now) - }) - - if n := ss.Count("directories"); n != 1 { - t.Fatal("expected 1 dir, got", n) - } -} diff --git a/stores/sql/database.go b/stores/sql/database.go index 17ac2291c..5d6013e3a 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -182,12 +182,8 @@ type ( // HostBlocklist returns the list of host addresses on the blocklist. HostBlocklist(ctx context.Context) ([]string, error) - // InsertDirectories inserts the directories for the given path and - // returns the ID of the immediate parent directory. - InsertDirectories(ctx context.Context, bucket, path string) (int64, error) - // InsertObject inserts a new object into the database. - InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error + InsertObject(ctx context.Context, bucket, key, contractSet string, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error // HostsForScanning returns a list of hosts to scan which haven't been // scanned since at least maxLastScan. @@ -254,9 +250,6 @@ type ( // the contract. PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) - // PruneEmptydirs prunes any directories that are empty. - PruneEmptydirs(ctx context.Context) error - // PruneSlabs deletes slabs that are no longer referenced by any slice // or slab buffer. PruneSlabs(ctx context.Context, limit int64) (int64, error) @@ -285,17 +278,13 @@ type ( // times. The contracts of those hosts are also removed. RemoveOfflineHosts(ctx context.Context, minRecentFailures uint64, maxDownTime time.Duration) (int64, error) - // RenameDirectories renames all directories in the database with the - // given prefix to the new prefix. - RenameDirectories(ctx context.Context, bucket, prefixOld, prefixNew string) (int64, error) - // RenameObject renames an object in the database from keyOld to keyNew // and the new directory dirID. returns api.ErrObjectExists if the an // object already exists at the target location or api.ErrObjectNotFound // if the object at keyOld doesn't exist. If force is true, the instead // of returning api.ErrObjectExists, the existing object will be // deleted. - RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID int64, force bool) error + RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error // RenameObjects renames all objects in the database with the given // prefix to the new prefix. If 'force' is true, it will overwrite any @@ -303,7 +292,7 @@ type ( // `api.ErrOBjectNotFound` is returned. If 'force' is false and an // object already exists with the new prefix, `api.ErrObjectExists` is // returned. - RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error + RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error // RenewContract renews the contract in the database. That means the // contract with the ID of 'renewedFrom' will be moved to the archived diff --git a/stores/sql/main.go b/stores/sql/main.go index c743e0000..e1ccf3716 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -362,7 +362,7 @@ func ContractSizes(ctx context.Context, tx sql.Tx) (map[types.FileContractID]api return sizes, nil } -func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata, dstDirID int64) (api.ObjectMetadata, error) { +func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { // stmt to fetch bucket id bucketIDStmt, err := tx.Prepare(ctx, "SELECT id FROM buckets WHERE name = ?") if err != nil { @@ -420,10 +420,10 @@ func CopyObject(ctx context.Context, tx sql.Tx, srcBucket, dstBucket, srcKey, ds } // copy object - res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_directory_id, db_bucket_id,`+"`key`"+`, size, mime_type, etag) - SELECT ?, ?, ?, ?, `+"`key`"+`, size, ?, etag + res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_bucket_id,`+"`key`"+`, size, mime_type, etag) + SELECT ?, ?, ?, `+"`key`"+`, size, ?, etag FROM objects - WHERE id = ?`, time.Now(), dstKey, dstDirID, dstBID, mimeType, srcObjID) + WHERE id = ?`, time.Now(), dstKey, dstBID, mimeType, srcObjID) if err != nil { return api.ObjectMetadata{}, fmt.Errorf("failed to insert object: %w", err) } @@ -465,10 +465,6 @@ func DeleteBucket(ctx context.Context, tx sql.Tx, bucket string) error { } else if !empty { return api.ErrBucketNotEmpty } - _, err = tx.Exec(ctx, "DELETE FROM directories WHERE db_bucket_id = ?", id) - if err != nil { - return fmt.Errorf("failed to delete bucket: %w", err) - } _, err = tx.Exec(ctx, "DELETE FROM buckets WHERE id = ?", id) if err != nil { return fmt.Errorf("failed to delete bucket: %w", err) @@ -801,12 +797,11 @@ func InsertMultipartUpload(ctx context.Context, tx sql.Tx, bucket, key string, e return uploadID, nil } -func InsertObject(ctx context.Context, tx sql.Tx, key string, dirID, bucketID, size int64, ec object.EncryptionKey, mimeType, eTag string) (int64, error) { - res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_directory_id, db_bucket_id, `+"`key`"+`, size, mime_type, etag) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, +func InsertObject(ctx context.Context, tx sql.Tx, key string, bucketID, size int64, ec object.EncryptionKey, mimeType, eTag string) (int64, error) { + res, err := tx.Exec(ctx, `INSERT INTO objects (created_at, object_id, db_bucket_id, `+"`key`"+`, size, mime_type, etag) + VALUES (?, ?, ?, ?, ?, ?, ?)`, time.Now(), key, - dirID, bucketID, EncryptionKey(ec), size, @@ -1339,22 +1334,6 @@ func NormalizePeer(peer string) (string, error) { return normalized.String(), nil } -func dirID(ctx context.Context, tx sql.Tx, bucket, path string) (int64, error) { - if bucket == "" { - return 0, fmt.Errorf("bucket must be set") - } else if !strings.HasPrefix(path, "/") { - return 0, fmt.Errorf("path must start with /") - } else if !strings.HasSuffix(path, "/") { - return 0, fmt.Errorf("path must end with /") - } - - var id int64 - if err := tx.QueryRow(ctx, "SELECT id FROM directories WHERE db_bucket_id = (SELECT id FROM buckets WHERE name = ?) AND name = ?", bucket, path).Scan(&id); err != nil { - return 0, fmt.Errorf("failed to fetch directory: %w", err) - } - return id, nil -} - func ObjectEntries(ctx context.Context, tx Tx, bucket, path, prefix, sortBy, sortDir, marker string, offset, limit int) ([]api.ObjectMetadata, bool, error) { // sanity check we are passing a directory if !strings.HasSuffix(path, "/") { @@ -1383,38 +1362,23 @@ func ObjectEntries(ctx context.Context, tx Tx, bucket, path, prefix, sortBy, sor sortDir = api.ObjectSortDirAsc } - // fetch directory id - dirID, err := dirID(ctx, tx, bucket, path) - if errors.Is(err, dsql.ErrNoRows) { - return []api.ObjectMetadata{}, false, nil - } else if err != nil { - return nil, false, fmt.Errorf("failed to fetch directory id: %w", err) - } - + // add object query args args := []any{ - path, - dirID, bucket, - } - - // apply prefix - var prefixExpr string - if prefix != "" { - prefixExpr = "AND SUBSTR(o.object_id, 1, ?) = ?" - args = append(args, - utf8.RuneCountInString(path+prefix), path+prefix, - utf8.RuneCountInString(path+prefix), path+prefix, - ) + path + "%", utf8.RuneCountInString(path), path, // case-sensitive object_id LIKE + path, // exclude exact path + utf8.RuneCountInString(path) + 1, // exclude dirs } + // add directory query args args = append(args, - bucket, - path+"%", - utf8.RuneCountInString(path), path, - dirID, + utf8.RuneCountInString(path), utf8.RuneCountInString(path)+1, + path+"%", utf8.RuneCountInString(path), path, // case-sensitive object_id LIKE + utf8.RuneCountInString(path), utf8.RuneCountInString(path)+1, path, + utf8.RuneCountInString(path), utf8.RuneCountInString(path)+1, ) // apply marker - var whereExpr string + var whereExprs []string markerExprs, markerArgs, err := whereObjectMarker(marker, sortBy, sortDir, func(dst any, marker, col string) error { var groupFn string switch col { @@ -1425,19 +1389,33 @@ func ObjectEntries(ctx context.Context, tx Tx, bucket, path, prefix, sortBy, sor default: return fmt.Errorf("unknown column: %v", col) } + + markerExprsObj := []string{"o.object_id = ?"} + markerArgsObj := []any{marker} + if bucket != "" { + markerExprsObj = append(markerExprsObj, "b.name = ?") + markerArgsObj = append(markerArgsObj, bucket) + } + + markerExprsDir := []string{"SUBSTR(o.object_id, 1, ?) = ?"} + markerArgsDir := []any{utf8.RuneCountInString(marker), marker} + if bucket != "" { + markerExprsDir = append(markerExprsDir, "b.name = ?") + markerArgsDir = append(markerArgsDir, bucket) + } + err := tx.QueryRow(ctx, fmt.Sprintf(` SELECT o.%s FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id - WHERE b.name = ? AND o.object_id = ? + WHERE %s UNION ALL SELECT %s(o.%s) FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id - INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name - WHERE b.name = ? AND d.name = ? - GROUP BY d.id - `, col, groupFn, col, tx.CharLengthExpr()), bucket, marker, bucket, marker).Scan(dst) + WHERE %s + GROUP BY o.db_bucket_id + `, col, strings.Join(markerExprsObj, " AND "), groupFn, col, strings.Join(markerExprsDir, " AND ")), append(markerArgsObj, markerArgsDir...)...).Scan(dst) if errors.Is(err, dsql.ErrNoRows) { return api.ErrMarkerNotFound } else { @@ -1447,10 +1425,24 @@ func ObjectEntries(ctx context.Context, tx Tx, bucket, path, prefix, sortBy, sor if err != nil { return nil, false, fmt.Errorf("failed to query marker: %w", err) } else if len(markerExprs) > 0 { - whereExpr = "WHERE " + strings.Join(markerExprs, " AND ") + whereExprs = append(whereExprs, markerExprs...) } args = append(args, markerArgs...) + // apply bucket + if bucket != "" { + whereExprs = append(whereExprs, "b.name = ?") + args = append(args, bucket) + } + + // apply prefix + if prefix != "" { + whereExprs = append(whereExprs, "SUBSTR(object_id, 1, ?) = ?") + args = append(args, + utf8.RuneCountInString(path+prefix), path+prefix, + ) + } + // apply sorting orderByExprs, err := orderByObject(sortBy, sortDir) if err != nil { @@ -1460,35 +1452,41 @@ func ObjectEntries(ctx context.Context, tx Tx, bucket, path, prefix, sortBy, sor // apply offset and limit args = append(args, limit, offset) + // build where expression + var whereExpr string + if len(whereExprs) > 0 { + whereExpr = fmt.Sprintf("WHERE %s", strings.Join(whereExprs, " AND ")) + } + // objectsQuery consists of 2 parts // 1. fetch all objects in requested directory // 2. fetch all sub-directories rows, err := tx.Query(ctx, fmt.Sprintf(` SELECT %s FROM ( - SELECT o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag + SELECT o.db_bucket_id, o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag FROM objects o - LEFT JOIN directories d ON d.name = o.object_id - WHERE o.object_id != ? AND o.db_directory_id = ? AND o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?) %s - AND d.id IS NULL + WHERE + o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ? AND + o.object_id != ? AND + INSTR(SUBSTR(o.object_id, ?), "/") = 0 + AND SUBSTR(o.object_id, -1, 1) != "/" + UNION ALL - SELECT d.name as object_id, SUM(o.size), MIN(o.health), '' as mime_type, MAX(o.created_at) as created_at, '' as etag + + SELECT MIN(o.db_bucket_id), MIN(SUBSTR(o.object_id, 1, ?+INSTR(SUBSTR(o.object_id, ?), "/"))) as object_id, SUM(o.size), MIN(o.health), '' as mime_type, MAX(o.created_at) as created_at, '' as etag FROM objects o - INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name %s - WHERE o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?) - AND o.object_id LIKE ? - AND SUBSTR(o.object_id, 1, ?) = ? - AND d.db_parent_id = ? - GROUP BY d.id + WHERE + o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ? AND + SUBSTR(o.object_id, 1, ?+INSTR(SUBSTR(o.object_id, ?), "/")) != ? + GROUP BY SUBSTR(o.object_id, 1, ?+INSTR(SUBSTR(o.object_id, ?), "/")) ) AS o + INNER JOIN buckets b ON b.id = o.db_bucket_id %s ORDER BY %s LIMIT ? OFFSET ? `, tx.SelectObjectMetadataExpr(), - prefixExpr, - tx.CharLengthExpr(), - prefixExpr, whereExpr, strings.Join(orderByExprs, ", "), ), args...) @@ -1860,112 +1858,6 @@ func RemoveOfflineHosts(ctx context.Context, tx sql.Tx, minRecentFailures uint64 return res.RowsAffected() } -// RenameDirectories renames all directories in the database with the given -// prefix to the new prefix. -func RenameDirectories(ctx context.Context, tx sql.Tx, bucket, prefixOld, prefixNew string) (int64, error) { - // sanity check input - if !strings.HasPrefix(prefixNew, "/") { - return 0, errors.New("paths has to have a leading slash") - } else if bucket == "" { - return 0, errors.New("bucket cannot be empty") - } - - // prepare statements - queryDirStmt, err := tx.Prepare(ctx, "SELECT id FROM directories WHERE name = ? AND db_bucket_id = ?") - if err != nil { - return 0, err - } - defer queryDirStmt.Close() - - insertStmt, err := tx.Prepare(ctx, "INSERT INTO directories (created_at, db_bucket_id, db_parent_id, name) VALUES (?, ?, ?, ?)") - if err != nil { - return 0, fmt.Errorf("failed to prepare statement: %w", err) - } - defer insertStmt.Close() - - updateNameStmt, err := tx.Prepare(ctx, "UPDATE directories SET name = ? WHERE id = ?") - if err != nil { - return 0, err - } - defer updateNameStmt.Close() - - updateParentStmt, err := tx.Prepare(ctx, "UPDATE directories SET db_parent_id = ? WHERE db_parent_id = ?") - if err != nil { - return 0, err - } - defer updateParentStmt.Close() - - // fetch bucket id - var bucketID int64 - err = tx.QueryRow(ctx, "SELECT id FROM buckets WHERE buckets.name = ?", bucket).Scan(&bucketID) - if errors.Is(err, dsql.ErrNoRows) { - return 0, fmt.Errorf("bucket '%v' not found: %w", bucket, api.ErrBucketNotFound) - } else if err != nil { - return 0, fmt.Errorf("failed to fetch bucket id: %w", err) - } - - // fetch destination directories - directories := make(map[int64]string) - rows, err := tx.Query(ctx, "SELECT id, name FROM directories WHERE name LIKE ? AND db_bucket_id = ? ORDER BY LENGTH(name) - LENGTH(REPLACE(name, '/', '')) ASC", prefixOld+"%", bucketID) - if err != nil { - return 0, err - } - defer rows.Close() - for rows.Next() { - var id int64 - var name string - if err := rows.Scan(&id, &name); err != nil { - return 0, err - } - directories[id] = strings.Replace(name, prefixOld, prefixNew, 1) - } - - // update existing directories - for id, name := range directories { - var existingID int64 - if err := queryDirStmt.QueryRow(ctx, name, bucketID).Scan(&existingID); err != nil && !errors.Is(err, dsql.ErrNoRows) { - return 0, err - } else if existingID > 0 { - if _, err := updateParentStmt.Exec(ctx, existingID, id); err != nil { - return 0, err - } - } else { - if _, err := updateNameStmt.Exec(ctx, name, id); err != nil { - return 0, err - } - } - } - - // insert new directories - var dirID *int64 - dirs := object.Directories(prefixNew) - if strings.HasSuffix(prefixNew, "/") { - dirs = append(dirs, prefixNew) - } - for _, dir := range dirs { - // check if the directory exists - var existingID int64 - if err := queryDirStmt.QueryRow(ctx, dir, bucketID).Scan(&existingID); err != nil && !errors.Is(err, dsql.ErrNoRows) { - return 0, fmt.Errorf("failed to fetch directory id %v: %w", dir, err) - } else if existingID > 0 { - dirID = &existingID - continue - } - - // insert directory - if res, err := insertStmt.Exec(ctx, time.Now(), bucketID, dirID, dir); err != nil { - return 0, fmt.Errorf("failed to create directory %v %v: %w", dirID, dir, err) - } else if insertedID, err := res.LastInsertId(); err != nil { - return 0, fmt.Errorf("failed to fetch directory id %v: %w", dir, err) - } else if insertedID == 0 { - return 0, fmt.Errorf("dir we just created doesn't exist - shouldn't happen") - } else { - dirID = &insertedID - } - } - return *dirID, nil -} - func RenewContract(ctx context.Context, tx sql.Tx, rev rhpv2.ContractRevision, contractPrice, totalCost types.Currency, startHeight uint64, renewedFrom types.FileContractID, state string) (api.ContractMetadata, error) { var contractState ContractState if err := contractState.LoadString(state); err != nil { diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index dc7f2062c..18e5d8d2b 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -74,7 +74,7 @@ func (b *MainDatabase) LoadSlabBuffers(ctx context.Context) ([]ssql.LoadedSlabBu func (b *MainDatabase) InsertDirectories(ctx context.Context, tx sql.Tx, bucket, path string) (int64, error) { mtx := b.wrapTxn(tx) - return mtx.InsertDirectories(ctx, bucket, path) + return mtx.InsertDirectoriesDeprecated(ctx, bucket, path) } func (b *MainDatabase) MakeDirsForPath(ctx context.Context, tx sql.Tx, path string) (int64, error) { @@ -227,14 +227,8 @@ func (tx *MainDatabaseTx) CompleteMultipartUpload(ctx context.Context, bucket, k return "", fmt.Errorf("failed to fetch multipart upload: %w", err) } - // create the directory. - dirID, err := tx.InsertDirectories(ctx, bucket, key) - if err != nil { - return "", fmt.Errorf("failed to create directory for key %s: %w", key, err) - } - // create the object - objID, err := ssql.InsertObject(ctx, tx, key, dirID, mpu.BucketID, size, mpu.EC, mpu.MimeType, eTag) + objID, err := ssql.InsertObject(ctx, tx, key, mpu.BucketID, size, mpu.EC, mpu.MimeType, eTag) if err != nil { return "", fmt.Errorf("failed to insert object: %w", err) } @@ -313,11 +307,7 @@ func (tx *MainDatabaseTx) ContractSizes(ctx context.Context) (map[types.FileCont } func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { - dstDirID, err := tx.InsertDirectories(ctx, dstBucket, dstKey) - if err != nil { - return api.ObjectMetadata{}, err - } - return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata, dstDirID) + return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata) } func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error { @@ -416,7 +406,39 @@ func (tx *MainDatabaseTx) InsertContract(ctx context.Context, rev rhpv2.Contract return ssql.InsertContract(ctx, tx, rev, contractPrice, totalCost, startHeight, renewedFrom, state) } -func (tx *MainDatabaseTx) InsertDirectories(ctx context.Context, bucket, path string) (int64, error) { +func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) { + return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata) +} + +func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { + // get bucket id + var bucketID int64 + err := tx.QueryRow(ctx, "SELECT id FROM buckets WHERE buckets.name = ?", bucket).Scan(&bucketID) + if errors.Is(err, dsql.ErrNoRows) { + return api.ErrBucketNotFound + } else if err != nil { + return fmt.Errorf("failed to fetch bucket id: %w", err) + } + + // insert object + objID, err := ssql.InsertObject(ctx, tx, key, bucketID, o.TotalSize(), o.Key, mimeType, eTag) + if err != nil { + return fmt.Errorf("failed to insert object: %w", err) + } + + // insert slabs + if err := tx.insertSlabs(ctx, &objID, nil, contractSet, o.Slabs); err != nil { + return fmt.Errorf("failed to insert slabs: %w", err) + } + + // insert metadata + if err := ssql.InsertMetadata(ctx, tx, &objID, nil, md); err != nil { + return fmt.Errorf("failed to insert object metadata: %w", err) + } + return nil +} + +func (tx *MainDatabaseTx) InsertDirectoriesDeprecated(ctx context.Context, bucket, path string) (int64, error) { // sanity check input if !strings.HasPrefix(path, "/") { return 0, errors.New("path has to have a leading slash") @@ -472,38 +494,6 @@ func (tx *MainDatabaseTx) InsertDirectories(ctx context.Context, bucket, path st return *dirID, nil } -func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) { - return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata) -} - -func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { - // get bucket id - var bucketID int64 - err := tx.QueryRow(ctx, "SELECT id FROM buckets WHERE buckets.name = ?", bucket).Scan(&bucketID) - if errors.Is(err, dsql.ErrNoRows) { - return api.ErrBucketNotFound - } else if err != nil { - return fmt.Errorf("failed to fetch bucket id: %w", err) - } - - // insert object - objID, err := ssql.InsertObject(ctx, tx, key, dirID, bucketID, o.TotalSize(), o.Key, mimeType, eTag) - if err != nil { - return fmt.Errorf("failed to insert object: %w", err) - } - - // insert slabs - if err := tx.insertSlabs(ctx, &objID, nil, contractSet, o.Slabs); err != nil { - return fmt.Errorf("failed to insert slabs: %w", err) - } - - // insert metadata - if err := ssql.InsertMetadata(ctx, tx, &objID, nil, md); err != nil { - return fmt.Errorf("failed to insert object metadata: %w", err) - } - return nil -} - func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID, limit int64) (int64, error) { if len(fcids) == 0 { return 0, nil @@ -691,30 +681,6 @@ CREATE INDEX %s_idx ON %s (root(32));`, tmpTable, tmpTable, tmpTable, tmpTable)) return } -func (tx *MainDatabaseTx) PruneEmptydirs(ctx context.Context) error { - stmt, err := tx.Prepare(ctx, ` - DELETE - FROM directories - WHERE directories.id != 1 - AND NOT EXISTS (SELECT 1 FROM objects WHERE objects.db_directory_id = directories.id) - AND NOT EXISTS (SELECT 1 FROM (SELECT 1 FROM directories AS d WHERE d.db_parent_id = directories.id) i) - `) - if err != nil { - return err - } - defer stmt.Close() - for { - res, err := stmt.Exec(ctx) - if err != nil { - return err - } else if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - return nil - } - } -} - func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, error) { res, err := tx.Exec(ctx, ` DELETE FROM slabs @@ -756,11 +722,7 @@ func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailu return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime) } -func (tx *MainDatabaseTx) RenameDirectories(ctx context.Context, bucket, prefixOld, prefixNew string) (int64, error) { - return ssql.RenameDirectories(ctx, tx, bucket, prefixOld, prefixNew) -} - -func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID int64, force bool) error { +func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error { if force { // delete potentially existing object at destination if _, err := tx.DeleteObject(ctx, bucket, keyNew); err != nil { @@ -774,7 +736,7 @@ func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyN return api.ErrObjectExists } } - resp, err := tx.Exec(ctx, `UPDATE objects SET object_id = ?, db_directory_id = ? WHERE object_id = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, keyNew, dirID, keyOld, bucket) + resp, err := tx.Exec(ctx, `UPDATE objects SET object_id = ? WHERE object_id = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, keyNew, keyOld, bucket) if err != nil { return err } else if n, err := resp.RowsAffected(); err != nil { @@ -785,33 +747,27 @@ func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyN return nil } -func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error { +func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error { if force { - // delete where bucket matches, where object_id is prefixed by the old - // prefix (case sensitive) and directories that exactly match the new - // prefix, otherwise the update conflicts + // to avoid a conflict on update, we delete objects that would conflict + // with objects being renamed, within the scope of the bucket of course query := ` DELETE FROM objects WHERE db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) AND - ( - object_id IN ( - SELECT * - FROM ( - SELECT CONCAT(?, SUBSTR(object_id, ?)) - FROM objects - WHERE object_id LIKE ? - AND SUBSTR(object_id, 1, ?) = ? - ) as i - ) OR (object_id = ? AND size = 0) + object_id IN ( + SELECT * + FROM ( + SELECT CONCAT(?, SUBSTR(object_id, ?)) + FROM objects + WHERE object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ? + ) as i )` args := []any{ bucket, prefixNew, utf8.RuneCountInString(prefixOld) + 1, - prefixOld + "%", - utf8.RuneCountInString(prefixOld), prefixOld, - prefixOld, + prefixOld + "%", utf8.RuneCountInString(prefixOld), prefixOld, } _, err := tx.Exec(ctx, query, args...) if err != nil { @@ -825,20 +781,15 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, // only when the object is an immediate child (no slash in suffix) query := ` UPDATE objects - SET object_id = CONCAT(?, SUBSTR(object_id, ?)), - db_directory_id = CASE INSTR(SUBSTR(object_id, ?), "/") WHEN 0 THEN ? ELSE db_directory_id END - WHERE object_id LIKE ? - AND SUBSTR(object_id, 1, ?) = ? - AND object_id != ? - AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)` + SET object_id = CONCAT(?, SUBSTR(object_id, ?)) + WHERE + db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) AND + object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ?` args := []any{ prefixNew, utf8.RuneCountInString(prefixOld) + 1, - utf8.RuneCountInString(prefixOld) + 1, dirID, - prefixOld + "%", - utf8.RuneCountInString(prefixOld), prefixOld, - prefixNew, bucket, + prefixOld + "%", utf8.RuneCountInString(prefixOld), prefixOld, } resp, err := tx.Exec(ctx, query, args...) if err != nil && strings.Contains(err.Error(), "Duplicate entry") { diff --git a/stores/sql/mysql/migrations/main/migration_00020_remove_directories.sql b/stores/sql/mysql/migrations/main/migration_00020_remove_directories.sql new file mode 100644 index 000000000..b0622f8ac --- /dev/null +++ b/stores/sql/mysql/migrations/main/migration_00020_remove_directories.sql @@ -0,0 +1,3 @@ +ALTER TABLE objects DROP FOREIGN KEY fk_objects_db_directory_id; +ALTER TABLE objects DROP COLUMN db_directory_id; +DROP TABLE directories; \ No newline at end of file diff --git a/stores/sql/mysql/migrations/main/schema.sql b/stores/sql/mysql/migrations/main/schema.sql index 516a942ca..53ad716b4 100644 --- a/stores/sql/mysql/migrations/main/schema.sql +++ b/stores/sql/mysql/migrations/main/schema.sql @@ -315,27 +315,11 @@ CREATE TABLE `multipart_parts` ( CONSTRAINT `fk_multipart_uploads_parts` FOREIGN KEY (`db_multipart_upload_id`) REFERENCES `multipart_uploads` (`id`) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; --- dbDirectory -CREATE TABLE `directories` ( - `id` bigint unsigned NOT NULL AUTO_INCREMENT, - `created_at` datetime(3) DEFAULT NULL, - `db_bucket_id` bigint unsigned NOT NULL, - `db_parent_id` bigint unsigned, - `name` varchar(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, - PRIMARY KEY (`id`), - KEY `idx_directories_parent_id` (`db_parent_id`), - KEY `idx_directories_name` (`name`), - UNIQUE KEY `idx_directories_bucket_name` (`db_bucket_id`,`name`), - CONSTRAINT `fk_directories_db_directories` FOREIGN KEY (`db_parent_id`) REFERENCES `directories` (`id`), - CONSTRAINT `fk_directories_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets` (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; - -- dbObject CREATE TABLE `objects` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT, `created_at` datetime(3) DEFAULT NULL, `db_bucket_id` bigint unsigned NOT NULL, - `db_directory_id` bigint unsigned NOT NULL, `object_id` varchar(766) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, `key` longblob, `health` double NOT NULL DEFAULT '1', @@ -350,9 +334,7 @@ CREATE TABLE `objects` ( KEY `idx_objects_etag` (`etag`), KEY `idx_objects_size` (`size`), KEY `idx_objects_created_at` (`created_at`), - KEY `idx_objects_db_directory_id` (`db_directory_id`), - CONSTRAINT `fk_objects_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets` (`id`), - CONSTRAINT `fk_objects_db_directory_id` FOREIGN KEY (`db_directory_id`) REFERENCES `directories` (`id`) + CONSTRAINT `fk_objects_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets` (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- dbSetting diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 3cfb679bb..62d3298a1 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -73,7 +73,7 @@ func (b *MainDatabase) LoadSlabBuffers(ctx context.Context) ([]ssql.LoadedSlabBu func (b *MainDatabase) InsertDirectories(ctx context.Context, tx sql.Tx, bucket, path string) (int64, error) { mtx := b.wrapTxn(tx) - return mtx.InsertDirectories(ctx, bucket, path) + return mtx.InsertDirectoriesDeprecated(ctx, bucket, path) } func (b *MainDatabase) MakeDirsForPath(ctx context.Context, tx sql.Tx, path string) (int64, error) { @@ -226,14 +226,8 @@ func (tx *MainDatabaseTx) CompleteMultipartUpload(ctx context.Context, bucket, k return "", fmt.Errorf("failed to fetch multipart upload: %w", err) } - // create the directory. - dirID, err := tx.InsertDirectories(ctx, bucket, key) - if err != nil { - return "", fmt.Errorf("failed to create directory for key %s: %w", key, err) - } - // create the object - objID, err := ssql.InsertObject(ctx, tx, key, dirID, mpu.BucketID, size, mpu.EC, mpu.MimeType, eTag) + objID, err := ssql.InsertObject(ctx, tx, key, mpu.BucketID, size, mpu.EC, mpu.MimeType, eTag) if err != nil { return "", fmt.Errorf("failed to insert object: %w", err) } @@ -317,11 +311,7 @@ func (tx *MainDatabaseTx) ContractSizes(ctx context.Context) (map[types.FileCont } func (tx *MainDatabaseTx) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) { - dstDirID, err := tx.InsertDirectories(ctx, dstBucket, dstKey) - if err != nil { - return api.ObjectMetadata{}, err - } - return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata, dstDirID) + return ssql.CopyObject(ctx, tx, srcBucket, dstBucket, srcKey, dstKey, mimeType, metadata) } func (tx *MainDatabaseTx) CreateBucket(ctx context.Context, bucket string, bp api.BucketPolicy) error { @@ -405,7 +395,7 @@ func (tx *MainDatabaseTx) InsertContract(ctx context.Context, rev rhpv2.Contract return ssql.InsertContract(ctx, tx, rev, contractPrice, totalCost, startHeight, renewedFrom, state) } -func (tx *MainDatabaseTx) InsertDirectories(ctx context.Context, bucket, path string) (int64, error) { +func (tx *MainDatabaseTx) InsertDirectoriesDeprecated(ctx context.Context, bucket, path string) (int64, error) { // sanity check input if !strings.HasPrefix(path, "/") { return 0, errors.New("path has to have a leading slash") @@ -465,7 +455,7 @@ func (tx *MainDatabaseTx) InsertMultipartUpload(ctx context.Context, bucket, key return ssql.InsertMultipartUpload(ctx, tx, bucket, key, ec, mimeType, metadata) } -func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, dirID int64, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { +func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contractSet string, o object.Object, mimeType, eTag string, md api.ObjectUserMetadata) error { // get bucket id var bucketID int64 err := tx.QueryRow(ctx, "SELECT id FROM buckets WHERE buckets.name = ?", bucket).Scan(&bucketID) @@ -476,7 +466,7 @@ func (tx *MainDatabaseTx) InsertObject(ctx context.Context, bucket, key, contrac } // insert object - objID, err := ssql.InsertObject(ctx, tx, key, dirID, bucketID, o.TotalSize(), o.Key, mimeType, eTag) + objID, err := ssql.InsertObject(ctx, tx, key, bucketID, o.TotalSize(), o.Key, mimeType, eTag) if err != nil { return fmt.Errorf("failed to insert object: %w", err) } @@ -701,30 +691,6 @@ CREATE INDEX %s_idx ON %s (root);`, tmpTable, tmpTable, tmpTable, tmpTable)) return } -func (tx *MainDatabaseTx) PruneEmptydirs(ctx context.Context) error { - stmt, err := tx.Prepare(ctx, ` - DELETE - FROM directories - WHERE directories.id != 1 - AND NOT EXISTS (SELECT 1 FROM objects WHERE objects.db_directory_id = directories.id) - AND NOT EXISTS (SELECT 1 FROM (SELECT 1 FROM directories AS d WHERE d.db_parent_id = directories.id) i) - `) - if err != nil { - return err - } - defer stmt.Close() - for { - res, err := stmt.Exec(ctx) - if err != nil { - return err - } else if n, err := res.RowsAffected(); err != nil { - return err - } else if n == 0 { - return nil - } - } -} - func (tx *MainDatabaseTx) PruneSlabs(ctx context.Context, limit int64) (int64, error) { res, err := tx.Exec(ctx, ` DELETE FROM slabs @@ -766,13 +732,7 @@ func (tx *MainDatabaseTx) RemoveOfflineHosts(ctx context.Context, minRecentFailu return ssql.RemoveOfflineHosts(ctx, tx, minRecentFailures, maxDownTime) } -// RenameDirectories renames all directories in the database with the -// given prefix to the new prefix. -func (tx *MainDatabaseTx) RenameDirectories(ctx context.Context, bucket, prefixOld, prefixNew string) (int64, error) { - return ssql.RenameDirectories(ctx, tx, bucket, prefixOld, prefixNew) -} - -func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, dirID int64, force bool) error { +func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error { if force { // delete potentially existing object at destination if _, err := tx.DeleteObject(ctx, bucket, keyNew); err != nil { @@ -786,7 +746,7 @@ func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyN return api.ErrObjectExists } } - resp, err := tx.Exec(ctx, `UPDATE objects SET object_id = ?, db_directory_id = ? WHERE object_id = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, keyNew, dirID, keyOld, bucket) + resp, err := tx.Exec(ctx, `UPDATE objects SET object_id = ? WHERE object_id = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, keyNew, keyOld, bucket) if err != nil { return err } else if n, err := resp.RowsAffected(); err != nil { @@ -797,30 +757,24 @@ func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyN return nil } -func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error { +func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error { if force { - // delete where bucket matches, where object_id is prefixed by the old - // prefix (case sensitive) and directories that exactly match the new - // prefix, otherwise the update conflicts + // to avoid a conflict on update, we delete objects that would conflict + // with objects being renamed, within the scope of the bucket of course query := ` DELETE FROM objects WHERE db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) AND - ( - object_id IN ( - SELECT ? || SUBSTR(object_id, ?) - FROM objects - WHERE object_id LIKE ? - AND SUBSTR(object_id, 1, ?) = ? - ) OR (object_id = ? AND size = 0) + object_id IN ( + SELECT ? || SUBSTR(object_id, ?) + FROM objects + WHERE object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ? )` args := []any{ bucket, prefixNew, utf8.RuneCountInString(prefixOld) + 1, - prefixOld + "%", - utf8.RuneCountInString(prefixOld), prefixOld, - prefixOld, + prefixOld + "%", utf8.RuneCountInString(prefixOld), prefixOld, } _, err := tx.Exec(ctx, query, args...) if err != nil { @@ -834,20 +788,16 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, // only when the object is an immediate child (no slash in suffix) query := ` UPDATE objects - SET object_id = ? || SUBSTR(object_id, ?), - db_directory_id = CASE INSTR(SUBSTR(object_id, ?), "/") WHEN 0 THEN ? ELSE db_directory_id END - WHERE object_id LIKE ? - AND SUBSTR(object_id, 1, ?) = ? - AND object_id != ? - AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)` + SET object_id = ? || SUBSTR(object_id, ?) + WHERE + db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) AND + object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ?` args := []any{ prefixNew, utf8.RuneCountInString(prefixOld) + 1, - utf8.RuneCountInString(prefixOld) + 1, dirID, - prefixOld + "%", - utf8.RuneCountInString(prefixOld), prefixOld, - prefixNew, - bucket} + bucket, + prefixOld + "%", utf8.RuneCountInString(prefixOld), prefixOld, + } resp, err := tx.Exec(ctx, query, args...) if err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") { return api.ErrObjectExists diff --git a/stores/sql/sqlite/migrations/main/migration_00020_remove_directories.sql b/stores/sql/sqlite/migrations/main/migration_00020_remove_directories.sql new file mode 100644 index 000000000..dcfa5f031 --- /dev/null +++ b/stores/sql/sqlite/migrations/main/migration_00020_remove_directories.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS `objects_temp`; +CREATE TABLE `objects_temp` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_bucket_id` integer NOT NULL, `object_id` text,`key` blob,`health` real NOT NULL DEFAULT 1,`size` integer,`mime_type` text,`etag` text,CONSTRAINT `fk_objects_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets`(`id`)); + +INSERT INTO `objects_temp` (`id`, `created_at`, `db_bucket_id`, `object_id`, `key`, `health`, `size`, `mime_type`, `etag`) +SELECT `id`, `created_at`, `db_bucket_id`, `object_id`, `key`, `health`, `size`, `mime_type`, `etag` +FROM `objects`; +DROP TABLE `objects`; +ALTER TABLE `objects_temp` RENAME TO `objects`; + +CREATE INDEX `idx_objects_db_bucket_id` ON `objects`(`db_bucket_id`); +CREATE INDEX `idx_objects_etag` ON `objects`(`etag`); +CREATE INDEX `idx_objects_health` ON `objects`(`health`); +CREATE INDEX `idx_objects_object_id` ON `objects`(`object_id`); +CREATE INDEX `idx_objects_size` ON `objects`(`size`); +CREATE UNIQUE INDEX `idx_object_bucket` ON `objects`(`db_bucket_id`,`object_id`); +CREATE INDEX `idx_objects_created_at` ON `objects`(`created_at`); + +DROP TABLE IF EXISTS `directories`; \ No newline at end of file diff --git a/stores/sql/sqlite/migrations/main/schema.sql b/stores/sql/sqlite/migrations/main/schema.sql index a7e2ddf7c..7b70a8b61 100644 --- a/stores/sql/sqlite/migrations/main/schema.sql +++ b/stores/sql/sqlite/migrations/main/schema.sql @@ -44,14 +44,8 @@ CREATE INDEX `idx_contract_set_contracts_db_contract_id` ON `contract_set_contra CREATE TABLE `buckets` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`policy` text,`name` text NOT NULL UNIQUE); CREATE INDEX `idx_buckets_name` ON `buckets`(`name`); --- dbDirectory -CREATE TABLE `directories` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_bucket_id` integer NOT NULL,`db_parent_id` integer,`name` text,CONSTRAINT `fk_directories_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets`(`id`),CONSTRAINT `fk_directories_db_directories` FOREIGN KEY (`db_parent_id`) REFERENCES `directories`(`id`)); -CREATE INDEX `idx_directories_parent_id` ON `directories`(`db_parent_id`); -CREATE INDEX `idx_directories_name` ON `directories`(`name`); -CREATE UNIQUE INDEX `idx_directories_bucket_name` ON `directories`(`db_bucket_id`,`name`); - -- dbObject -CREATE TABLE `objects` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_bucket_id` integer NOT NULL, `db_directory_id` integer NOT NULL, `object_id` text,`key` blob,`health` real NOT NULL DEFAULT 1,`size` integer,`mime_type` text,`etag` text,CONSTRAINT `fk_objects_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets`(`id`),CONSTRAINT `fk_objects_db_directories` FOREIGN KEY (`db_directory_id`) REFERENCES `directories`(`id`)); +CREATE TABLE `objects` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`db_bucket_id` integer NOT NULL, `object_id` text,`key` blob,`health` real NOT NULL DEFAULT 1,`size` integer,`mime_type` text,`etag` text,CONSTRAINT `fk_objects_db_bucket` FOREIGN KEY (`db_bucket_id`) REFERENCES `buckets`(`id`)); CREATE INDEX `idx_objects_db_bucket_id` ON `objects`(`db_bucket_id`); CREATE INDEX `idx_objects_etag` ON `objects`(`etag`); CREATE INDEX `idx_objects_health` ON `objects`(`health`);