From 71531bbb305644e5bd80be5cce2221773336ef0c Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 19 Nov 2024 14:33:05 -0800 Subject: [PATCH] add missing file --- carstore/nonarchive.go | 250 +++++++++++++++++++++++++++++++++++++++++ indexer/indexer.go | 1 + repomgr/repomgr.go | 55 ++++++++- testing/integ_test.go | 3 +- testing/utils.go | 4 +- 5 files changed, 307 insertions(+), 6 deletions(-) create mode 100644 carstore/nonarchive.go diff --git a/carstore/nonarchive.go b/carstore/nonarchive.go new file mode 100644 index 000000000..64b0a32f7 --- /dev/null +++ b/carstore/nonarchive.go @@ -0,0 +1,250 @@ +package carstore + +import ( + "bytes" + "context" + "fmt" + "io" + "sync" + + "github.com/bluesky-social/indigo/models" + blockformat "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + car "github.com/ipld/go-car" + "go.opentelemetry.io/otel" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type NonArchivalCarstore struct { + db *gorm.DB + + lk sync.Mutex + lastCommitCache map[models.Uid]*commitRefInfo +} + +func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) { + if err := db.AutoMigrate(&commitRefInfo{}); err != nil { + return nil, err + } + + return &NonArchivalCarstore{ + db: db, + lastCommitCache: make(map[models.Uid]*commitRefInfo), + }, nil +} + +type commitRefInfo struct { + ID uint `gorm:"primarykey"` + Uid models.Uid `gorm:"uniqueIndex"` + Rev string + Root models.DbCID +} + +func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo { + cs.lk.Lock() + defer cs.lk.Unlock() + + ls, ok := cs.lastCommitCache[user] + if ok { + return ls + } + + return nil +} + +func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) { + cs.lk.Lock() + defer cs.lk.Unlock() + + delete(cs.lastCommitCache, user) +} + +func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) { + cs.lk.Lock() + defer cs.lk.Unlock() + + cs.lastCommitCache[ls.Uid] = ls +} + +func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { + var out commitRefInfo + if err := cs.db.Find(&out, "uid = ?", user).Error; err != nil { + return nil, err + } + + return &out, nil +} + +func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo") + defer span.End() + + maybeLs := cs.checkLastShardCache(user) + if maybeLs != nil { + return maybeLs, nil + } + + lastShard, err := cs.loadCommitRefInfo(ctx, user) + if err != nil { + return nil, err + } + + cs.putLastShardCache(lastShard) + return lastShard, nil +} + +func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error { + cri := &commitRefInfo{ + Uid: uid, + Rev: rev, + Root: models.DbCID{cid}, + } + + if err := cs.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "uid"}}, + UpdateAll: true, + }).Create(cri).Error; err != nil { + return fmt.Errorf("update or set last commit info: %w", err) + } + + cs.putLastShardCache(cri) + + return nil +} + +func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") + defer span.End() + + // TODO: ensure that we don't write updates on top of the wrong head + // this needs to be a compare and swap type operation + lastShard, err := cs.getCommitRefInfo(ctx, user) + if err != nil { + return nil, err + } + + if since != nil && *since != lastShard.Rev { + return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch) + } + + return &DeltaSession{ + fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()), + blks: make(map[cid.Cid]blockformat.Block), + base: &userView{ + user: user, + cs: cs, + prefetch: true, + cache: make(map[cid.Cid]blockformat.Block), + }, + user: user, + baseCid: lastShard.Root.CID, + cs: cs, + seq: 0, + lastRev: lastShard.Rev, + }, nil +} + +func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { + return &DeltaSession{ + base: &userView{ + user: user, + cs: cs, + prefetch: false, + cache: make(map[cid.Cid]blockformat.Block), + }, + readonly: true, + user: user, + cs: cs, + }, nil +} + +// TODO: incremental is only ever called true, remove the param +func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { + return fmt.Errorf("not supported in non-archival mode") +} + +func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") + defer span.End() + + carr, err := car.NewCarReader(bytes.NewReader(carslice)) + if err != nil { + return cid.Undef, nil, err + } + + if len(carr.Header.Roots) != 1 { + return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) + } + + ds, err := cs.NewDeltaSession(ctx, uid, since) + if err != nil { + return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err) + } + + var cids []cid.Cid + for { + blk, err := carr.Next() + if err != nil { + if err == io.EOF { + break + } + return cid.Undef, nil, err + } + + cids = append(cids, blk.Cid()) + + if err := ds.Put(ctx, blk); err != nil { + return cid.Undef, nil, err + } + } + + return carr.Header.Roots[0], ds, nil +} + +func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { + lastShard, err := cs.getCommitRefInfo(ctx, user) + if err != nil { + return cid.Undef, err + } + if lastShard.ID == 0 { + return cid.Undef, nil + } + + return lastShard.Root.CID, nil +} + +func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { + lastShard, err := cs.getCommitRefInfo(ctx, user) + if err != nil { + return "", err + } + if lastShard.ID == 0 { + return "", nil + } + + return lastShard.Rev, nil +} + +func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { + return nil, nil +} + +func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error { + if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil { + return err + } + + cs.removeLastShardCache(user) + return nil +} + +func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { + return nil, fmt.Errorf("compaction not supported on non-archival") +} + +func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { + return nil, fmt.Errorf("compaction not supported in non-archival") +} diff --git a/indexer/indexer.go b/indexer/indexer.go index 18c9d3625..e16918bd0 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -531,6 +531,7 @@ func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEven case *bsky.ActorProfile: log.Debugf("TODO: got actor profile record creation, need to do something with this") default: + log.Warnw("unrecognized record", "record", op.Record, "collection", op.Collection) return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection) } diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index 41dbc7974..b1111aec0 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -578,21 +578,68 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey") } + /* + switch EventKind(op.Action) { + case EvtKindCreateRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindCreateRecord, + Collection: parts[0], + Rkey: parts[1], + RecCid: (*cid.Cid)(op.Cid), + }) + case EvtKindUpdateRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindUpdateRecord, + Collection: parts[0], + Rkey: parts[1], + RecCid: (*cid.Cid)(op.Cid), + }) + case EvtKindDeleteRecord: + evtops = append(evtops, RepoOp{ + Kind: EvtKindDeleteRecord, + Collection: parts[0], + Rkey: parts[1], + }) + default: + return fmt.Errorf("unrecognized external user event kind: %q", op.Action) + } + */ switch EventKind(op.Action) { case EvtKindCreateRecord: - evtops = append(evtops, RepoOp{ + rop := RepoOp{ Kind: EvtKindCreateRecord, Collection: parts[0], Rkey: parts[1], RecCid: (*cid.Cid)(op.Cid), - }) + } + + if rm.hydrateRecords { + _, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("reading changed record from car slice: %w", err) + } + rop.Record = rec + } + + evtops = append(evtops, rop) case EvtKindUpdateRecord: - evtops = append(evtops, RepoOp{ + rop := RepoOp{ Kind: EvtKindUpdateRecord, Collection: parts[0], Rkey: parts[1], RecCid: (*cid.Cid)(op.Cid), - }) + } + + if rm.hydrateRecords { + _, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("reading changed record from car slice: %w", err) + } + + rop.Record = rec + } + + evtops = append(evtops, rop) case EvtKindDeleteRecord: evtops = append(evtops, RepoOp{ Kind: EvtKindDeleteRecord, diff --git a/testing/integ_test.go b/testing/integ_test.go index 9bc1b713b..efcff3985 100644 --- a/testing/integ_test.go +++ b/testing/integ_test.go @@ -541,6 +541,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { e1 := evts.Next() assert.NotNil(e1.RepoCommit) assert.Equal(e1.RepoCommit.Repo, bob.DID()) + fmt.Println(e1.RepoCommit.Ops[0]) ctx := context.TODO() rm := p1.server.Repoman() @@ -549,7 +550,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) { } e2 := evts.Next() - fmt.Println(e2.RepoCommit.Ops) + //fmt.Println(e2.RepoCommit.Ops[0]) assert.Equal(len(e2.RepoCommit.Ops), 0) assert.Equal(e2.RepoCommit.Repo, bob.DID()) } diff --git a/testing/utils.go b/testing/utils.go index 55cced5bd..1d3b4d682 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -555,11 +555,13 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { if err != nil { return nil, err } - */ + //*/ + //* cs, err := carstore.NewNonArchivalCarstore(cardb) if err != nil { return nil, err } + //*/ //kmgr := indexer.NewKeyManager(didr, nil) kmgr := &bsutil.FakeKeyManager{}