Skip to content

Commit

Permalink
Remove directories table (#1683)
Browse files Browse the repository at this point in the history
This PR merges the removal of directories into `master`. Please review
carefully because merging it wasn't straightforward because `master` is
quite far behind `dev` at this point. ~~I'm still in the process of
(re)testing the migrations.~~

edit: tested the migration on both SQLite and MySQL
  • Loading branch information
peterjan authored Nov 19, 2024
1 parent 149f2fe commit 5c7e857
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 645 deletions.
6 changes: 6 additions & 0 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00019_scan_reset", log)
},
},
{
ID: "00020_remove_directories",
Migrate: func(tx Tx) error {
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00020_remove_directories", log)
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
2 changes: 1 addition & 1 deletion internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func TestObjectEntries(t *testing.T) {
entries[i].ModTime = api.TimeRFC3339{}

// assert mime type
isDir := strings.HasSuffix(entries[i].Name, "/") && entries[i].Name != "//double/" // double is a file
isDir := strings.HasSuffix(entries[i].Name, "/")
if (isDir && entries[i].MimeType != "") || (!isDir && entries[i].MimeType == "") {
t.Fatal("unexpected mime type", entries[i].MimeType)
}
Expand Down
123 changes: 90 additions & 33 deletions stores/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -15,34 +16,59 @@ import (
"go.sia.tech/renterd/stores/sql"
"go.sia.tech/renterd/stores/sql/sqlite"
"go.uber.org/zap"

"lukechampine.com/frand"
)

// BenchmarkRenameDirectories benchmarks renaming a directory.
// BenchmarkObjects benchmarks the performance of various object-related
// database operations.
//
// M1 Max | 54057 ns/op | 10418 B/op | 251 allocs/op
func BenchmarkRenameDirectories(b *testing.B) {
// create database
// cpu: Apple M1 Max
// BenchmarkObjects/ObjectEntries-10 11618 102732 ns/op 7074 B/op 99 allocs/op
// BenchmarkObjects/RenameObjects-10 12705 94236 ns/op 3506 B/op 81 allocs/op
func BenchmarkObjects(b *testing.B) {
db, err := newTestDB(context.Background(), b.TempDir())
if err != nil {
b.Fatal(err)
}

// test parameters
objects := int(1e2)
bucket := "bucket"

// prepare database
if err := insertDirectories(db, b.Name()); err != nil {
dirs, err := insertObjects(db.DB(), bucket, objects)
if err != nil {
b.Fatal(err)
}

// start benchmark
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error {
_, err := tx.RenameDirectories(context.Background(), b.Name(), "/a/b/c/", "/c/b/a/")
return err
}); err != nil {
b.Fatal(err)
b.Run("ObjectEntries", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error {
_, _, err := tx.ObjectEntries(context.Background(), bucket, dirs[i%len(dirs)], "", "", "", "", 0, -1)
return err
}); err != nil {
b.Fatal(err)
}
}
}
})

// start rename benchmark
b.Run("RenameObjects", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.Transaction(context.Background(), func(tx sql.DatabaseTx) error {
err := tx.RenameObjects(context.Background(), bucket, dirs[frand.Intn(i+1)%len(dirs)], dirs[frand.Intn(i+1)%len(dirs)], true)
if err != nil && !errors.Is(err, api.ErrObjectNotFound) {
return err
}
return nil
}); err != nil {
b.Fatal(err)
}
}
})
}

// BenchmarkPrunableContractRoots benchmarks diffing the roots of a contract
Expand Down Expand Up @@ -93,22 +119,51 @@ func BenchmarkPrunableContractRoots(b *testing.B) {
}
}

func insertContractSectors(db *isql.DB, fcid types.FileContractID, n int) (roots []types.Hash256, _ error) {
// insert host
hk := types.PublicKey{1}
res, err := db.Exec(context.Background(), `
INSERT INTO hosts (public_key) VALUES (?)`, sql.PublicKey(hk))
func insertObjects(db *isql.DB, bucket string, n int) (dirs []string, _ error) {
var bucketID int64
res, err := db.Exec(context.Background(), "INSERT INTO buckets (created_at, name) VALUES (?, ?)", time.Now(), bucket)
if err != nil {
return nil, err
} else if bucketID, err = res.LastInsertId(); err != nil {
return nil, err
}
hostID, err := res.LastInsertId()

stmt, err := db.Prepare(context.Background(), "INSERT INTO objects (created_at,object_id, db_bucket_id, size, mime_type, etag) VALUES (?, ?, ?, ?, '', '')")
if err != nil {
return nil, err
}
defer stmt.Close()

// insert contract
res, err = db.Exec(context.Background(), `
INSERT INTO contracts (host_id, fcid,start_height) VALUES (?, ?, ?)`, hostID, sql.FileContractID(fcid), 0)
var path string
seen := make(map[string]struct{})
for i := 0; i < n; i++ {
for {
path = generateRandomPath(6)
if _, used := seen[path]; !used {
break
}
}
seen[path] = struct{}{}

size := frand.Intn(1e3)
if frand.Intn(10) == 0 {
path += "/"
size = 0
dirs = append(dirs, path)
}
_, err := stmt.Exec(context.Background(), time.Now(), path, bucketID, size)
if err != nil {
return nil, err
}
}
return dirs, nil
}

func insertContractSectors(db *isql.DB, fcid types.FileContractID, n int) (roots []types.Hash256, _ error) {
// insert host
hk := types.PublicKey{1}
res, err := db.Exec(context.Background(), `
INSERT INTO contracts (fcid, host_key, start_height, v2) VALUES (?, ?, ?, ?)`, sql.PublicKey(hk), sql.FileContractID(fcid), 0, false)
if err != nil {
return nil, err
}
Expand All @@ -131,7 +186,7 @@ INSERT INTO slabs (created_at, `+"`key`"+`) VALUES (?, ?)`, time.Now(), sql.Encr

// insert sectors
insertSectorStmt, err := db.Prepare(context.Background(), `
INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?, ?) RETURNING id`)
INSERT INTO sectors (db_slab_id, slab_index, root) VALUES (?, ?, ?) RETURNING id`)
if err != nil {
return nil, fmt.Errorf("failed to prepare statement to insert sector: %w", err)
}
Expand All @@ -140,7 +195,7 @@ INSERT INTO sectors (db_slab_id, slab_index, latest_host, root) VALUES (?, ?, ?,
for i := 0; i < n; i++ {
var sectorID int64
roots = append(roots, frand.Entropy256())
err := insertSectorStmt.QueryRow(context.Background(), slabID, i, sql.PublicKey(hk), sql.Hash256(roots[i])).Scan(&sectorID)
err := insertSectorStmt.QueryRow(context.Background(), slabID, i, sql.Hash256(roots[i])).Scan(&sectorID)
if err != nil {
return nil, fmt.Errorf("failed to insert sector: %w", err)
}
Expand Down Expand Up @@ -175,14 +230,16 @@ WHERE c.fcid = ?`, sql.FileContractID(fcid)).Scan(&cnt)
return
}

func insertDirectories(db *sqlite.MainDatabase, bucket string) error {
return db.Transaction(context.Background(), func(tx sql.DatabaseTx) error {
if err := tx.CreateBucket(context.Background(), bucket, api.BucketPolicy{}); err != nil {
return err
}
_, err := tx.InsertDirectories(context.Background(), bucket, "/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z")
return err
})
func generateRandomPath(maxLevels int) string {
numLevels := frand.Intn(maxLevels) + 1
letters := "abcdef"

var path []string
for i := 0; i < numLevels; i++ {
path = append(path, string(letters[frand.Intn(len(letters))]))
}

return "/" + strings.Join(path, "/")
}

func newTestDB(ctx context.Context, dir string) (*sqlite.MainDatabase, error) {
Expand Down
47 changes: 3 additions & 44 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (

var (
pruneSlabsAlertID = frand.Entropy256()
pruneDirsAlertID = frand.Entropy256()
)

var objectDeleteBatchSizes = []int64{10, 50, 100, 200, 500, 1000, 5000, 10000, 50000, 100000}
Expand Down Expand Up @@ -421,32 +420,19 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con

func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error {
return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
// create new dir
dirID, err := tx.InsertDirectories(ctx, bucket, keyNew)
if err != nil {
return err
}
// update object
err = tx.RenameObject(ctx, bucket, keyOld, keyNew, dirID, force)
if err != nil {
if err := tx.RenameObject(ctx, bucket, keyOld, keyNew, force); err != nil {
return err
}
// delete old dir if empty
s.triggerSlabPruning()
return nil
})
}

func (s *SQLStore) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error {
return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
// create new dir
dirID, err := tx.RenameDirectories(ctx, bucket, prefixOld, prefixNew)
if err != nil {
return fmt.Errorf("RenameObjects: failed to create new directory: %w", err)
} else if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, dirID, force); err != nil {
if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, force); err != nil {
return err
}
// prune old dirs
s.triggerSlabPruning()
return nil
})
Expand Down Expand Up @@ -512,14 +498,8 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet,
return fmt.Errorf("UpdateObject: failed to delete object: %w", err)
}

// create the dir
dirID, err := tx.InsertDirectories(ctx, bucket, path)
if err != nil {
return fmt.Errorf("failed to create directories for path '%s': %w", path, err)
}

// Insert a new object.
err = tx.InsertObject(ctx, bucket, path, contractSet, dirID, o, mimeType, eTag, metadata)
err = tx.InsertObject(ctx, bucket, path, contractSet, o, mimeType, eTag, metadata)
if err != nil {
return fmt.Errorf("failed to insert object: %w", err)
}
Expand Down Expand Up @@ -753,27 +733,6 @@ func (s *SQLStore) pruneSlabsLoop() {
}
}

// prune dirs
err := s.db.Transaction(s.shutdownCtx, func(dt sql.DatabaseTx) error {
return dt.PruneEmptydirs(s.shutdownCtx)
})
if err != nil {
s.logger.Errorw("dir pruning failed", zap.Error(err))
s.alerts.RegisterAlert(s.shutdownCtx, alerts.Alert{
ID: pruneDirsAlertID,
Severity: alerts.SeverityWarning,
Message: "Failed to prune dirs",
Timestamp: time.Now(),
Data: map[string]interface{}{
"error": err.Error(),
"hint": "This might happen when your database is under a lot of load due to deleting objects rapidly. This alert will disappear the next time slabs are pruned successfully.",
},
})
pruneSuccess = false
} else {
s.alerts.DismissAlerts(s.shutdownCtx, pruneDirsAlertID)
}

// mark the last prune time where both slabs and dirs were pruned
if pruneSuccess {
s.mu.Lock()
Expand Down
Loading

0 comments on commit 5c7e857

Please sign in to comment.