Skip to content

Commit

Permalink
hack: non-archival relay work
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Dec 16, 2024
1 parent 749cc07 commit 07dc376
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 18 deletions.
3 changes: 3 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,7 @@ func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *a
}).Error; err != nil {
return err
}
u.SetTombstoned(true)

if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand Down Expand Up @@ -1416,6 +1417,8 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
if err := s.db.Create(&u).Error; err != nil {
return nil, fmt.Errorf("failed to create user after handle conflict: %w", err)
}

s.userCache.Remove(did)
} else {
return nil, fmt.Errorf("failed to create other pds user: %w", err)
}
Expand Down
33 changes: 28 additions & 5 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
}

type userView struct {
cs *FileCarStore
cs CarStore
user models.Uid

cache map[cid.Cid]blockformat.Block
Expand All @@ -111,13 +111,24 @@ func (uv *userView) HashOnRead(hor bool) {
}

func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) {
return uv.cs.meta.HasUidCid(ctx, uv.user, k)
_, have := uv.cache[k]
if have {
return have, nil
}

fcd, ok := uv.cs.(*FileCarStore)
if !ok {
return false, nil
}

return fcd.meta.HasUidCid(ctx, uv.user, k)
}

var CacheHits int64
var CacheMiss int64

func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) {

if !k.Defined() {
return nil, fmt.Errorf("attempted to 'get' undefined cid")
}
Expand All @@ -132,7 +143,12 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro
}
atomic.AddInt64(&CacheMiss, 1)

path, offset, user, err := uv.cs.meta.LookupBlockRef(ctx, k)
fcd, ok := uv.cs.(*FileCarStore)
if !ok {
return nil, ipld.ErrNotFound{Cid: k}
}

path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +288,7 @@ type DeltaSession struct {
baseCid cid.Cid
seq int
readonly bool
cs *FileCarStore
cs CarStore
lastRev string
}

Expand Down Expand Up @@ -587,7 +603,14 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str
return nil, fmt.Errorf("cannot write to readonly deltaSession")
}

return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
switch ocs := ds.cs.(type) {
case *FileCarStore:
return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
case *NonArchivalCarstore:
return nil, ocs.updateLastCommit(ctx, ds.user, rev, root)
default:
return nil, fmt.Errorf("unsupported carstore type")
}
}

func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
Expand Down
19 changes: 10 additions & 9 deletions pds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/bluesky-social/indigo/api/atproto"
comatproto "github.com/bluesky-social/indigo/api/atproto"
bsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/carstore"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/indexer"
Expand Down Expand Up @@ -202,14 +201,16 @@ func (s *Server) createExternalUser(ctx context.Context, did string) (*models.Ac
handle = hurl.Host
}

profile, err := bsky.ActorGetProfile(ctx, c, did)
if err != nil {
return nil, err
}
/*
profile, err := bsky.ActorGetProfile(ctx, c, did)
if err != nil {
return nil, err
}
if handle != profile.Handle {
return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle)
}
if handle != profile.Handle {
return nil, fmt.Errorf("mismatch in handle between did document and pds profile (%s != %s)", handle, profile.Handle)
}
*/

// TODO: request this users info from their server to fill out our data...
u := User{
Expand All @@ -227,7 +228,7 @@ func (s *Server) createExternalUser(ctx context.Context, did string) (*models.Ac
subj := &models.ActorInfo{
Uid: u.ID,
Handle: sql.NullString{String: handle, Valid: true},
DisplayName: *profile.DisplayName,
DisplayName: "missing display name",
Did: did,
Type: "",
PDS: peering.ID,
Expand Down
9 changes: 8 additions & 1 deletion repomgr/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,19 @@ func BenchmarkRepoMgrCreates(b *testing.B) {
b.Fatal(err)
}

cs, err := carstore.NewCarStore(cardb, []string{cspath})
/*
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
b.Fatal(err)
}
*/
cs, err := carstore.NewNonArchivalCarstore(cardb)
if err != nil {
b.Fatal(err)
}

repoman := NewRepoManager(cs, &util.FakeKeyManager{})
repoman.noArchive = true

ctx := context.TODO()
if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:foo:bar", "catdog", "", ""); err != nil {
Expand Down
9 changes: 8 additions & 1 deletion repomgr/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ func testCarstore(t *testing.T, dir string) carstore.CarStore {
t.Fatal(err)
}

cs, err := carstore.NewCarStore(cardb, []string{cspath})
/*
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
t.Fatal(err)
}
*/

cs, err := carstore.NewNonArchivalCarstore(cardb)
if err != nil {
t.Fatal(err)
}
Expand Down
93 changes: 92 additions & 1 deletion repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ import (

func NewRepoManager(cs carstore.CarStore, kmgr KeyManager) *RepoManager {

var noArchive bool
if _, ok := cs.(*carstore.NonArchivalCarstore); ok {
noArchive = true
}

return &RepoManager{
cs: cs,
userLocks: make(map[models.Uid]*userLock),
kmgr: kmgr,
log: slog.Default().With("system", "repomgr"),
noArchive: noArchive,
}
}

Expand All @@ -62,7 +68,8 @@ type RepoManager struct {
events func(context.Context, *RepoEvent)
hydrateRecords bool

log *slog.Logger
log *slog.Logger
noArchive bool
}

type ActorInfo struct {
Expand Down Expand Up @@ -530,6 +537,90 @@ func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid st
}

func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
if rm.noArchive {
return rm.handleExternalUserEventNoArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops)
} else {
return rm.handleExternalUserEventArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops)
}
}

func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
defer span.End()

span.SetAttributes(attribute.Int64("uid", int64(uid)))

log.Debugw("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)

unlock := rm.lockUser(ctx, uid)
defer unlock()

start := time.Now()
root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice)
if err != nil {
return fmt.Errorf("importing external carslice: %w", err)
}

r, err := repo.OpenRepo(ctx, ds, root)
if err != nil {
return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
}

if err := rm.CheckRepoSig(ctx, r, did); err != nil {
return fmt.Errorf("check repo sig: %w", err)
}
openAndSigCheckDuration.Observe(time.Since(start).Seconds())

evtops := make([]RepoOp, 0, len(ops))
for _, op := range ops {
parts := strings.SplitN(op.Path, "/", 2)
if len(parts) != 2 {
return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey")
}

switch EventKind(op.Action) {
case EvtKindCreateRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindCreateRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: (*cid.Cid)(op.Cid),
})
case EvtKindUpdateRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindUpdateRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: (*cid.Cid)(op.Cid),
})
case EvtKindDeleteRecord:
evtops = append(evtops, RepoOp{
Kind: EvtKindDeleteRecord,
Collection: parts[0],
Rkey: parts[1],
})
default:
return fmt.Errorf("unrecognized external user event kind: %q", op.Action)
}
}

if rm.events != nil {
rm.events(ctx, &RepoEvent{
User: uid,
//OldRoot: prev,
NewRoot: root,
Rev: nrev,
Since: since,
Ops: evtops,
RepoSlice: carslice,
PDS: pdsid,
})
}

return nil
}

func (rm *RepoManager) handleExternalUserEventArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
defer span.End()

Expand Down
1 change: 1 addition & 0 deletions testing/integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func TestRelayHandleEmptyEvent(t *testing.T) {
}

e2 := evts.Next()
fmt.Println(e2.RepoCommit.Ops)
assert.Equal(len(e2.RepoCommit.Ops), 0)
assert.Equal(e2.RepoCommit.Repo, bob.DID())
}
8 changes: 7 additions & 1 deletion testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,13 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) {
return nil, err
}

cs, err := carstore.NewCarStore(cardb, []string{cspath})
/*
cs, err := carstore.NewCarStore(cardb, []string{cspath})
if err != nil {
return nil, err
}
*/
cs, err := carstore.NewNonArchivalCarstore(cardb)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 07dc376

Please sign in to comment.