diff --git a/bgs/handlers.go b/bgs/handlers.go index da87c9521..7eebb4f47 100644 --- a/bgs/handlers.go +++ b/bgs/handlers.go @@ -113,7 +113,7 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since // TODO: stream the response buf := new(bytes.Buffer) - if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil { + if err := s.repoman.ReadRepo(ctx, u.ID, since, buf, 2<<30); err != nil { log.Errorw("failed to read repo into buffer", "err", err, "did", did) return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer") } diff --git a/carstore/bs.go b/carstore/bs.go index e7af35d12..ab8089fb2 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "io" + "math" "os" "path/filepath" "sort" @@ -56,7 +57,7 @@ type CarStore interface { ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) ReadOnlySession(user models.Uid) (*DeltaSession, error) - ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error + ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer, maxBytes int64) error Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) WipeUserData(ctx context.Context, user models.Uid) error } @@ -366,10 +367,14 @@ func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) } // TODO: incremental is only ever called true, remove the param -func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { +func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer, maxBytes int64) error { ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") defer span.End() + if maxBytes == -1 { + maxBytes = math.MaxInt64 - 1 + } + var earlySeq int if sinceRev != "" { var err error @@ -403,10 +408,13 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR return err } + var nr int64 for _, sh := range shards { - if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { + n, err := cs.writeShardBlocks(ctx, &sh, w, maxBytes-nr) + if err != nil { return err } + nr += n } return nil @@ -414,27 +422,33 @@ func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceR // inner loop part of ReadUserCar // copy shard blocks from disk to Writer -func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { +func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer, maxBytes int64) (int64, error) { ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") defer span.End() fi, err := os.Open(sh.Path) if err != nil { - return err + return 0, err } defer fi.Close() _, err = fi.Seek(sh.DataStart, io.SeekStart) if err != nil { - return err + return 0, err } - _, err = io.Copy(w, fi) + limr := io.LimitReader(fi, maxBytes+1) + + n, err := io.Copy(w, limr) if err != nil { - return err + return 0, err } - return nil + if n == maxBytes+1 { + return n, fmt.Errorf("read too many bytes in shard: %d - %d - %d", sh.Usr, sh.ID, sh.Seq) + } + + return n, nil } // inner loop part of compactBucket diff --git a/carstore/repo_test.go b/carstore/repo_test.go index a4d2c8cb8..069130818 100644 --- a/carstore/repo_test.go +++ b/carstore/repo_test.go @@ -138,7 +138,7 @@ func TestBasicOperation(t *testing.T) { } buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil { t.Fatal(err) } checkRepo(t, cs, buf, recs) @@ -148,7 +148,7 @@ func TestBasicOperation(t *testing.T) { } buf = new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil { t.Fatal(err) } checkRepo(t, cs, buf, recs) @@ -237,14 +237,14 @@ func TestRepeatedCompactions(t *testing.T) { fmt.Printf("%#v\n", st) buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil { t.Fatal(err) } checkRepo(t, cs, buf, recs) } buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { + if err := cs.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil { t.Fatal(err) } checkRepo(t, cs, buf, recs) @@ -576,7 +576,7 @@ func TestDuplicateBlockAcrossShards(t *testing.T) { } buf := new(bytes.Buffer) - if err := cs.ReadUserCar(ctx, 2, "", true, buf); err != nil { + if err := cs.ReadUserCar(ctx, 2, "", true, buf, -1); err != nil { t.Fatal(err) } checkRepo(t, cs, buf, recs) diff --git a/events/dbpersist.go b/events/dbpersist.go index a9d6288f2..4d75975e8 100644 --- a/events/dbpersist.go +++ b/events/dbpersist.go @@ -650,7 +650,7 @@ func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) { buf := new(bytes.Buffer) - if err := p.cs.ReadUserCar(ctx, rer.Repo, rer.Rev, true, buf); err != nil { + if err := p.cs.ReadUserCar(ctx, rer.Repo, rer.Rev, true, buf, -1); err != nil { return nil, err } diff --git a/pds/handlers.go b/pds/handlers.go index 73827a429..0db142b0b 100644 --- a/pds/handlers.go +++ b/pds/handlers.go @@ -343,7 +343,7 @@ func (s *Server) handleComAtprotoSyncGetRepo(ctx context.Context, did string, si } buf := new(bytes.Buffer) - if err := s.repoman.ReadRepo(ctx, targetUser.ID, since, buf); err != nil { + if err := s.repoman.ReadRepo(ctx, targetUser.ID, since, buf, -1); err != nil { return nil, err } diff --git a/repomgr/ingest_test.go b/repomgr/ingest_test.go index dcb9097ac..75feb541a 100644 --- a/repomgr/ingest_test.go +++ b/repomgr/ingest_test.go @@ -142,7 +142,7 @@ func TestIngestWithGap(t *testing.T) { } buf := new(bytes.Buffer) - if err := cs2.ReadUserCar(ctx, 1, "", true, buf); err != nil { + if err := cs2.ReadUserCar(ctx, 1, "", true, buf, -1); err != nil { t.Fatal(err) } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index ad90a43b2..788cf47ff 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -417,8 +417,8 @@ func (rm *RepoManager) GetRepoRev(ctx context.Context, user models.Uid) (string, return rm.cs.GetUserRepoRev(ctx, user) } -func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer) error { - return rm.cs.ReadUserCar(ctx, user, since, true, w) +func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer, maxBytes int64) error { + return rm.cs.ReadUserCar(ctx, user, since, true, w, maxBytes) } func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) {