Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do shard deletions in batches #321

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ func (bgs *BGS) CreateAdminToken(tok string) error {

func (bgs *BGS) checkAdminAuth(next echo.HandlerFunc) echo.HandlerFunc {
return func(e echo.Context) error {
ctx, span := otel.Tracer("bgs").Start(e.Request().Context(), "checkAdminAuth")
defer span.End()

e.SetRequest(e.Request().WithContext(ctx))

authheader := e.Request().Header.Get("Authorization")
pref := "Bearer "
if !strings.HasPrefix(authheader, pref) {
Expand Down
52 changes: 35 additions & 17 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,39 +947,54 @@ func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error
}

func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error {
var shards []CarShard
var shards []*CarShard
if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil {
return err
}

for _, sh := range shards {
if err := cs.deleteShard(ctx, &sh); err != nil {
if !os.IsNotExist(err) {
return err
}
if err := cs.deleteShards(ctx, shards); err != nil {
if !os.IsNotExist(err) {
return err
}
}

if err := cs.meta.Delete(&CarShard{}, "usr = ?", user).Error; err != nil {
return err
}

return nil
}

func (cs *CarStore) deleteShard(ctx context.Context, sh *CarShard) error {
func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShard")
defer span.End()

if err := cs.meta.Delete(&CarShard{}, "id = ?", sh.ID).Error; err != nil {
var ids []uint
for _, sh := range shs {
ids = append(ids, sh.ID)
}

txn := cs.meta.Begin()

if err := txn.Delete(&CarShard{}, "id in (?)", ids).Error; err != nil {
return err
}

if err := cs.meta.Delete(&blockRef{}, "shard = ?", sh.ID).Error; err != nil {
if err := txn.Delete(&blockRef{}, "shard in (?)", ids).Error; err != nil {
return err
}

return cs.deleteShardFile(ctx, sh)
if err := txn.Commit().Error; err != nil {
return err
}

var outErr error
for _, sh := range shs {
if err := cs.deleteShardFile(ctx, sh); err != nil {
if !os.IsNotExist(err) {
return err
}
outErr = err
}
}

return outErr
Comment on lines +988 to +997
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this bit just to return at least one IsNotExist if any of the shards end up already having been deleted but we don't wanna let that stop us from deleting the others, however if there's some other error we're interested in stopping immediately and returning?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, yeah. i want to at least try to delete everything, if we fail because the file is already gone, we continue (but still let our caller know it happened). if we fail for some other error i think it makes sense to bail out immediately

}

type shardStat struct {
Expand Down Expand Up @@ -1210,16 +1225,19 @@ func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) erro
return err
}

var todelete []*CarShard
for _, s := range b.shards {
removedShards[s.ID] = true
sh, ok := shardsById[s.ID]
if !ok {
return fmt.Errorf("missing shard to delete")
}

if err := cs.deleteShard(ctx, &sh); err != nil {
return fmt.Errorf("deleting shard: %w", err)
}
todelete = append(todelete, &sh)
}

if err := cs.deleteShards(ctx, todelete); err != nil {
return fmt.Errorf("deleting shards: %w", err)
}
}

Expand Down
39 changes: 39 additions & 0 deletions cmd/gosky/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var bgsAdminCmd = &cli.Command{
bgsTakedownRepoCmd,
bgsSetNewSubsEnabledCmd,
bgsCompactRepo,
bgsCompactAll,
},
}

Expand Down Expand Up @@ -342,3 +343,41 @@ var bgsCompactRepo = &cli.Command{
return nil
},
}

var bgsCompactAll = &cli.Command{
Name: "compact-all",
Action: func(cctx *cli.Context) error {
url := cctx.String("bgs") + "/admin/repo/compactAll"

req, err := http.NewRequest("POST", url, nil)
if err != nil {
return err
}

auth := cctx.String("key")
req.Header.Set("Authorization", "Bearer "+auth)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

if resp.StatusCode != 200 {
var e xrpc.XRPCError
if err := json.NewDecoder(resp.Body).Decode(&e); err != nil {
return err
}

return &e
}

var out map[string]any
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return err
}

fmt.Println(out)

return nil
},
}
Loading