Skip to content

Commit

Permalink
add a way to limit getRepo streaming sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Nov 16, 2024
1 parent 0fc4c7e commit 6ea5707
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 20 deletions.
2 changes: 1 addition & 1 deletion bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since

// TODO: stream the response
buf := new(bytes.Buffer)
if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil {
if err := s.repoman.ReadRepo(ctx, u.ID, since, buf, 2<<30); err != nil {
log.Errorw("failed to read repo into buffer", "err", err, "did", did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer")
}
Expand Down
32 changes: 23 additions & 9 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -56,7 +57,7 @@ type CarStore interface {
ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error)
NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error)
ReadOnlySession(user models.Uid) (*DeltaSession, error)
ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error
ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer, maxBytes int64) error
Stat(ctx context.Context, usr models.Uid) ([]UserStat, error)
WipeUserData(ctx context.Context, user models.Uid) error
}
Expand Down Expand Up @@ -366,10 +367,14 @@ func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error)
}

// TODO: incremental is only ever called true, remove the param
func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer, maxBytes int64) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
defer span.End()

if maxBytes == -1 {
maxBytes = math.MaxInt64 - 1
}

var earlySeq int
if sinceRev != "" {
var err error
Expand Down Expand Up @@ -403,38 +408,47 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR
return err
}

var nr int64
for _, sh := range shards {
if err := cs.writeShardBlocks(ctx, &sh, w); err != nil {
n, err := cs.writeShardBlocks(ctx, &sh, w, maxBytes-nr)
if err != nil {
return err
}
nr += n
}

return nil
}

// inner loop part of ReadUserCar
// copy shard blocks from disk to Writer
func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error {
func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer, maxBytes int64) (int64, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks")
defer span.End()

fi, err := os.Open(sh.Path)
if err != nil {
return err
return 0, err
}
defer fi.Close()

_, err = fi.Seek(sh.DataStart, io.SeekStart)
if err != nil {
return err
return 0, err
}

_, err = io.Copy(w, fi)
limr := io.LimitReader(fi, maxBytes+1)

n, err := io.Copy(w, limr)
if err != nil {
return err
return 0, err
}

return nil
if n == maxBytes+1 {
return n, fmt.Errorf("read too many bytes in shard: %d - %d - %d", sh.Usr, sh.ID, sh.Seq)
}

return n, nil
}

// inner loop part of compactBucket
Expand Down
10 changes: 5 additions & 5 deletions carstore/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestBasicOperation(t *testing.T) {
}

buf := new(bytes.Buffer)
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil {
t.Fatal(err)
}
checkRepo(t, cs, buf, recs)
Expand All @@ -148,7 +148,7 @@ func TestBasicOperation(t *testing.T) {
}

buf = new(bytes.Buffer)
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil {
t.Fatal(err)
}
checkRepo(t, cs, buf, recs)
Expand Down Expand Up @@ -237,14 +237,14 @@ func TestRepeatedCompactions(t *testing.T) {
fmt.Printf("%#v\n", st)

buf := new(bytes.Buffer)
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil {
t.Fatal(err)
}
checkRepo(t, cs, buf, recs)
}

buf := new(bytes.Buffer)
if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil {
if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil {
t.Fatal(err)
}
checkRepo(t, cs, buf, recs)
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestDuplicateBlockAcrossShards(t *testing.T) {
}

buf := new(bytes.Buffer)
if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil {
if err := cs.ReadUserCar(ctx, 2, "", true, buf, -1); err != nil {
t.Fatal(err)
}
checkRepo(t, cs, buf, recs)
Expand Down
2 changes: 1 addition & 1 deletion events/dbpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord)
func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {

buf := new(bytes.Buffer)
if err := p.cs.ReadUserCar(ctx, rer.Repo, rer.Rev, true, buf); err != nil {
if err := p.cs.ReadUserCar(ctx, rer.Repo, rer.Rev, true, buf, -1); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pds/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context, did string, si
}

buf := new(bytes.Buffer)
if err := s.repoman.ReadRepo(ctx, targetUser.ID, since, buf); err != nil {
if err := s.repoman.ReadRepo(ctx, targetUser.ID, since, buf, -1); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion repomgr/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestIngestWithGap(t *testing.T) {
}

buf := new(bytes.Buffer)
if err := cs2.ReadUserCar(ctx, 1, "", true, buf); err != nil {
if err := cs2.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil {
t.Fatal(err)
}

Expand Down
4 changes: 2 additions & 2 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ func (rm *RepoManager) GetRepoRev(ctx context.Context, user models.Uid) (string,
return rm.cs.GetUserRepoRev(ctx, user)
}

func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer) error {
return rm.cs.ReadUserCar(ctx, user, since, true, w)
func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer, maxBytes int64) error {
return rm.cs.ReadUserCar(ctx, user, since, true, w, maxBytes)
}

func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) {
Expand Down

0 comments on commit 6ea5707

Please sign in to comment.