Skip to content

Commit

Permalink
allow carstore to use multiple directories, round robin style (#812)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Nov 16, 2024
2 parents 0fc4c7e + ffe7fb6 commit 05b8751
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 28 deletions.
33 changes: 20 additions & 13 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,23 @@ type CarStore interface {
}

type FileCarStore struct {
meta *CarStoreGormMeta
rootDir string
meta *CarStoreGormMeta
rootDirs []string

lscLk sync.Mutex
lastShardCache map[models.Uid]*CarShard
}

func NewCarStore(meta *gorm.DB, root string) (CarStore, error) {
if _, err := os.Stat(root); err != nil {
if !os.IsNotExist(err) {
return nil, err
}
func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
for _, root := range roots {
if _, err := os.Stat(root); err != nil {
if !os.IsNotExist(err) {
return nil, err
}

if err := os.Mkdir(root, 0775); err != nil {
return nil, err
if err := os.Mkdir(root, 0775); err != nil {
return nil, err
}
}
}
if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil {
Expand All @@ -88,7 +90,7 @@ func NewCarStore(meta *gorm.DB, root string) (CarStore, error) {

return &FileCarStore{
meta: &CarStoreGormMeta{meta: meta},
rootDir: root,
rootDirs: roots,
lastShardCache: make(map[models.Uid]*CarShard),
}, nil
}
Expand Down Expand Up @@ -541,9 +543,14 @@ func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error) {
func fnameForShard(user models.Uid, seq int) string {
return fmt.Sprintf("sh-%d-%d", user, seq)
}

func (cs *FileCarStore) dirForUser(user models.Uid) string {
return cs.rootDirs[int(user)%len(cs.rootDirs)]
}

func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
// TODO: some overwrite protections
fname := filepath.Join(cs.rootDir, fnameForShard(user, seq))
fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
fi, err := os.Create(fname)
if err != nil {
return nil, "", err
Expand All @@ -557,7 +564,7 @@ func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid,
defer span.End()

// TODO: some overwrite protections
fname := filepath.Join(cs.rootDir, fnameForShard(user, seq))
fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
if err := os.WriteFile(fname, data, 0664); err != nil {
return "", err
}
Expand Down Expand Up @@ -982,7 +989,7 @@ func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user mode
// TODO: some overwrite protections
// NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary".
// This creates "sh-%d-%d%s" with some random stuff in the last position
fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq))
fi, err := os.CreateTemp(cs.dirForUser(user), fnameForShard(user, seq))
if err != nil {
return nil, "", err
}
Expand Down
11 changes: 8 additions & 3 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ func testCarStore() (CarStore, func(), error) {
return nil, nil, err
}

sharddir := filepath.Join(tempdir, "shards")
if err := os.MkdirAll(sharddir, 0775); err != nil {
sharddir1 := filepath.Join(tempdir, "shards1")
if err := os.MkdirAll(sharddir1, 0775); err != nil {
return nil, nil, err
}

sharddir2 := filepath.Join(tempdir, "shards2")
if err := os.MkdirAll(sharddir2, 0775); err != nil {
return nil, nil, err
}

Expand All @@ -45,7 +50,7 @@ func testCarStore() (CarStore, func(), error) {
return nil, nil, err
}

cs, err := NewCarStore(db, sharddir)
cs, err := NewCarStore(db, []string{sharddir1, sharddir2})
if err != nil {
return nil, nil, err
}
Expand Down
19 changes: 17 additions & 2 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func run(args []string) error {
EnvVars: []string{"RELAY_NUM_COMPACTION_WORKERS"},
Value: 2,
},
&cli.StringSliceFlag{
Name: "carstore-shard-dirs",
Usage: "specify list of shard directories for carstore storage, overrides default storage within datadir",
EnvVars: []string{"RELAY_CARSTORE_SHARD_DIRS"},
},
}

app.Action = runBigsky
Expand Down Expand Up @@ -312,8 +317,18 @@ func runBigsky(cctx *cli.Context) error {
}
}

os.MkdirAll(filepath.Dir(csdir), os.ModePerm)
cstore, err := carstore.NewCarStore(csdb, csdir)
csdirs := []string{csdir}
if paramDirs := cctx.StringSlice("carstore-shard-dirs"); len(paramDirs) > 0 {
csdirs = paramDirs
}

for _, csd := range csdirs {
if err := os.MkdirAll(filepath.Dir(csd), os.ModePerm); err != nil {
return err
}
}

cstore, err := carstore.NewCarStore(csdb, csdirs)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/laputa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func run(args []string) {
}
}

cstore, err := carstore.NewCarStore(csdb, csdir)
cstore, err := carstore.NewCarStore(csdb, []string{csdir})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/supercollider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func initSpeedyRepoMan(key *godid.PrivKey) (*repomgr.RepoManager, *godid.PrivKey
return nil, nil, err
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion events/dbpersist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, erro
return nil, nil, nil, "", err
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
return nil, nil, nil, "", err
}
Expand Down
2 changes: 1 addition & 1 deletion indexer/posts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func testIndexer(t *testing.T) *testIx {
t.Fatal(err)
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pds/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func testCarStore(t *testing.T, db *gorm.DB) (carstore.CarStore, func()) {
t.Fatal(err)
}

cs, err := carstore.NewCarStore(db, sharddir)
cs, err := carstore.NewCarStore(db, []string{sharddir})
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion repomgr/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func BenchmarkRepoMgrCreates(b *testing.B) {
b.Fatal(err)
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions repomgr/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestLoadNewRepo(t *testing.T) {
t.Fatal(err)
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func testCarstore(t *testing.T, dir string) carstore.CarStore {
t.Fatal(err)
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func SetupPDS(ctx context.Context, suffix string, plc plc.PLCClient) (*TestPDS,
return nil, err
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,7 +550,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) {
return nil, err
}

cs, err := carstore.NewCarStore(cardb, cspath)
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 05b8751

Please sign in to comment.