Skip to content

Commit

Permalink
add missing file
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Dec 16, 2024
1 parent 5c7f937 commit cc76378
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 6 deletions.
250 changes: 250 additions & 0 deletions carstore/nonarchive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package carstore

import (
"bytes"
"context"
"fmt"
"io"
"sync"

"github.com/bluesky-social/indigo/models"
blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
car "github.com/ipld/go-car"
"go.opentelemetry.io/otel"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type NonArchivalCarstore struct {
db *gorm.DB

lk sync.Mutex
lastCommitCache map[models.Uid]*commitRefInfo
}

func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) {
if err := db.AutoMigrate(&commitRefInfo{}); err != nil {
return nil, err
}

return &NonArchivalCarstore{
db: db,
lastCommitCache: make(map[models.Uid]*commitRefInfo),
}, nil
}

type commitRefInfo struct {
ID uint `gorm:"primarykey"`
Uid models.Uid `gorm:"uniqueIndex"`
Rev string
Root models.DbCID
}

func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo {
cs.lk.Lock()
defer cs.lk.Unlock()

ls, ok := cs.lastCommitCache[user]
if ok {
return ls
}

return nil
}

func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) {
cs.lk.Lock()
defer cs.lk.Unlock()

delete(cs.lastCommitCache, user)
}

func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.lastCommitCache[ls.Uid] = ls
}

func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
var out commitRefInfo
if err := cs.db.Find(&out, "uid = ?", user).Error; err != nil {
return nil, err
}

return &out, nil
}

func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo")
defer span.End()

maybeLs := cs.checkLastShardCache(user)
if maybeLs != nil {
return maybeLs, nil
}

lastShard, err := cs.loadCommitRefInfo(ctx, user)
if err != nil {
return nil, err
}

cs.putLastShardCache(lastShard)
return lastShard, nil
}

func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error {
cri := &commitRefInfo{
Uid: uid,
Rev: rev,
Root: models.DbCID{cid},
}

if err := cs.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "uid"}},
UpdateAll: true,
}).Create(cri).Error; err != nil {
return fmt.Errorf("update or set last commit info: %w", err)
}

cs.putLastShardCache(cri)

return nil
}

func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
defer span.End()

// TODO: ensure that we don't write updates on top of the wrong head
// this needs to be a compare and swap type operation
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return nil, err
}

if since != nil && *since != lastShard.Rev {
return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
}

return &DeltaSession{
fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()),
blks: make(map[cid.Cid]blockformat.Block),
base: &userView{
user: user,
cs: cs,
prefetch: true,
cache: make(map[cid.Cid]blockformat.Block),
},
user: user,
baseCid: lastShard.Root.CID,
cs: cs,
seq: 0,
lastRev: lastShard.Rev,
}, nil
}

func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
return &DeltaSession{
base: &userView{
user: user,
cs: cs,
prefetch: false,
cache: make(map[cid.Cid]blockformat.Block),
},
readonly: true,
user: user,
cs: cs,
}, nil
}

// TODO: incremental is only ever called true, remove the param
func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
return fmt.Errorf("not supported in non-archival mode")
}

func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
defer span.End()

carr, err := car.NewCarReader(bytes.NewReader(carslice))
if err != nil {
return cid.Undef, nil, err
}

if len(carr.Header.Roots) != 1 {
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
}

ds, err := cs.NewDeltaSession(ctx, uid, since)
if err != nil {
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
}

var cids []cid.Cid
for {
blk, err := carr.Next()
if err != nil {
if err == io.EOF {
break
}
return cid.Undef, nil, err
}

cids = append(cids, blk.Cid())

if err := ds.Put(ctx, blk); err != nil {
return cid.Undef, nil, err
}
}

return carr.Header.Roots[0], ds, nil
}

func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return cid.Undef, err
}
if lastShard.ID == 0 {
return cid.Undef, nil
}

return lastShard.Root.CID, nil
}

func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return "", err
}
if lastShard.ID == 0 {
return "", nil
}

return lastShard.Rev, nil
}

func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
return nil, nil
}

func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error {
if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil {
return err
}

cs.removeLastShardCache(user)
return nil
}

func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
return nil, fmt.Errorf("compaction not supported on non-archival")
}

func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
return nil, fmt.Errorf("compaction not supported in non-archival")
}
1 change: 1 addition & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEven
case *bsky.ActorProfile:
ix.log.Debug("TODO: got actor profile record creation, need to do something with this")
default:
log.Warnw("unrecognized record", "record", op.Record, "collection", op.Collection)
return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection)
}

Expand Down
55 changes: 51 additions & 4 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,21 +578,68 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds
return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey")
}

/*
switch EventKind(op.Action) {
case EvtKindCreateRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindCreateRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: (*cid.Cid)(op.Cid),
})
case EvtKindUpdateRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindUpdateRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: (*cid.Cid)(op.Cid),
})
case EvtKindDeleteRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindDeleteRecord,
Collection: parts[0],
Rkey: parts[1],
})
default:
return fmt.Errorf("unrecognized external user event kind: %q", op.Action)
}
*/
switch EventKind(op.Action) {
case EvtKindCreateRecord:
evtops = append(evtops, RepoOp{
rop := RepoOp{
Kind: EvtKindCreateRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: (*cid.Cid)(op.Cid),
})
}

if rm.hydrateRecords {
_, rec, err := r.GetRecord(ctx, op.Path)
if err != nil {
return fmt.Errorf("reading changed record from car slice: %w", err)
}
rop.Record = rec
}

evtops = append(evtops, rop)
case EvtKindUpdateRecord:
evtops = append(evtops, RepoOp{
rop := RepoOp{
Kind: EvtKindUpdateRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: (*cid.Cid)(op.Cid),
})
}

if rm.hydrateRecords {
_, rec, err := r.GetRecord(ctx, op.Path)
if err != nil {
return fmt.Errorf("reading changed record from car slice: %w", err)
}

rop.Record = rec
}

evtops = append(evtops, rop)
case EvtKindDeleteRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindDeleteRecord,
Expand Down
3 changes: 2 additions & 1 deletion testing/integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) {
e1 := evts.Next()
assert.NotNil(e1.RepoCommit)
assert.Equal(e1.RepoCommit.Repo, bob.DID())
fmt.Println(e1.RepoCommit.Ops[0])

ctx := context.TODO()
rm := p1.server.Repoman()
Expand All @@ -544,7 +545,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) {
}

e2 := evts.Next()
fmt.Println(e2.RepoCommit.Ops)
//fmt.Println(e2.RepoCommit.Ops[0])
assert.Equal(len(e2.RepoCommit.Ops), 0)
assert.Equal(e2.RepoCommit.Repo, bob.DID())
}
4 changes: 3 additions & 1 deletion testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,13 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) {
if err != nil {
return nil, err
}
*/
//*/
//*
cs, err := carstore.NewNonArchivalCarstore(cardb)
if err != nil {
return nil, err
}
//*/

//kmgr := indexer.NewKeyManager(didr, nil)
kmgr := &bsutil.FakeKeyManager{}
Expand Down

0 comments on commit cc76378

Please sign in to comment.