Skip to content

Commit

Permalink
implement sync.GetRecord correctly (#424)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Nov 8, 2023
2 parents d3d9ea3 + bcef4e2 commit fa7db1c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 5 deletions.
21 changes: 16 additions & 5 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import (
atproto "github.com/bluesky-social/indigo/api/atproto"
comatprototypes "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/blobs"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/mst"
"gorm.io/gorm"

"github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/xrpc"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/ipld/go-car"
"github.com/labstack/echo/v4"
)

Expand All @@ -40,7 +43,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
return nil, fmt.Errorf("account was taken down")
}

_, record, err := s.repoman.GetRecord(ctx, u.ID, collection, rkey, cid.Undef)
root, blocks, err := s.repoman.GetRecordProof(ctx, u.ID, collection, rkey)
if err != nil {
if errors.Is(err, mst.ErrNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo")
Expand All @@ -50,10 +53,18 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
}

buf := new(bytes.Buffer)
err = record.MarshalCBOR(buf)
if err != nil {
log.Errorw("failed to marshal record to CBOR", "err", err, "did", did, "collection", collection, "rkey", rkey)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to marshal record to CBOR")
hb, err := cbor.DumpObject(&car.CarHeader{
Roots: []cid.Cid{root},
Version: 1,
})
if _, err := carstore.LdWrite(buf, hb); err != nil {
return nil, err
}

for _, blk := range blocks {
if _, err := carstore.LdWrite(buf, blk.Cid().Bytes(), blk.RawData()); err != nil {
return nil, err
}
}

return buf, nil
Expand Down
28 changes: 28 additions & 0 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/mst"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/util"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -447,6 +449,32 @@ func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collectio
return ocid, val, nil
}

func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, collection string, rkey string) (cid.Cid, []blocks.Block, error) {
robs, err := rm.cs.ReadOnlySession(user)
if err != nil {
return cid.Undef, nil, err
}

bs := util.NewLoggingBstore(robs)

head, err := rm.cs.GetUserRepoHead(ctx, user)
if err != nil {
return cid.Undef, nil, err
}

r, err := repo.OpenRepo(ctx, bs, head, true)
if err != nil {
return cid.Undef, nil, err
}

_, _, err = r.GetRecord(ctx, collection+"/"+rkey)
if err != nil {
return cid.Undef, nil, err
}

return head, bs.GetLoggedBlocks(), nil
}

func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) {
bs, err := rm.cs.ReadOnlySession(uid)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions util/readrecordbs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package util

import (
"context"
"fmt"

blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type LoggingBstore struct {
base blockstore.Blockstore

set map[cid.Cid]blockformat.Block
}

func NewLoggingBstore(base blockstore.Blockstore) *LoggingBstore {
return &LoggingBstore{
base: base,
set: make(map[cid.Cid]blockformat.Block),
}
}

var _ blockstore.Blockstore = (*LoggingBstore)(nil)

func (bs *LoggingBstore) GetLoggedBlocks() []blockformat.Block {
var out []blockformat.Block
for _, v := range bs.set {
out = append(out, v)
}
return out
}

func (bs *LoggingBstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
return bs.base.Has(ctx, c)
}

func (bs *LoggingBstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
blk, err := bs.base.Get(ctx, c)
if err != nil {
return nil, err
}

bs.set[c] = blk

return blk, nil
}

func (bs *LoggingBstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
return bs.base.GetSize(ctx, c)
}

func (bs *LoggingBstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
return fmt.Errorf("deletes not allowed on logging blockstore")
}

func (bs *LoggingBstore) Put(context.Context, blockformat.Block) error {
return fmt.Errorf("writes not allowed on logging blockstore")
}

func (bs *LoggingBstore) PutMany(context.Context, []blockformat.Block) error {
return fmt.Errorf("writes not allowed on logging blockstore")
}

func (bs *LoggingBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, fmt.Errorf("iteration not allowed on logging blockstore")
}

func (bs *LoggingBstore) HashOnRead(enabled bool) {

}

0 comments on commit fa7db1c

Please sign in to comment.