Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

non-archival relay #827

Merged
merged 12 commits into from
Dec 16, 2024
3 changes: 3 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,7 @@ func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *a
}).Error; err != nil {
return err
}
u.SetTombstoned(true)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an irrelevant fix of some other thing


if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
"handle": nil,
Expand Down Expand Up @@ -1416,6 +1417,8 @@ func (s *BGS) createExternalUser(ctx context.Context, did string) (*models.Actor
if err := s.db.Create(&u).Error; err != nil {
return nil, fmt.Errorf("failed to create user after handle conflict: %w", err)
}

s.userCache.Remove(did)
} else {
return nil, fmt.Errorf("failed to create other pds user: %w", err)
}
Expand Down
54 changes: 49 additions & 5 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
}

type userView struct {
cs *FileCarStore
cs CarStore
user models.Uid

cache map[cid.Cid]blockformat.Block
Expand All @@ -111,13 +111,24 @@ func (uv *userView) HashOnRead(hor bool) {
}

func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) {
return uv.cs.meta.HasUidCid(ctx, uv.user, k)
_, have := uv.cache[k]
if have {
return have, nil
}

fcd, ok := uv.cs.(*FileCarStore)
if !ok {
return false, nil
}

return fcd.meta.HasUidCid(ctx, uv.user, k)
}

var CacheHits int64
var CacheMiss int64

func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) {

if !k.Defined() {
return nil, fmt.Errorf("attempted to 'get' undefined cid")
}
Expand All @@ -132,7 +143,12 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro
}
atomic.AddInt64(&CacheMiss, 1)

path, offset, user, err := uv.cs.meta.LookupBlockRef(ctx, k)
fcd, ok := uv.cs.(*FileCarStore)
if !ok {
return nil, ipld.ErrNotFound{Cid: k}
}

path, offset, user, err := fcd.meta.LookupBlockRef(ctx, k)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +288,7 @@ type DeltaSession struct {
baseCid cid.Cid
seq int
readonly bool
cs *FileCarStore
cs CarStore
lastRev string
}

Expand Down Expand Up @@ -587,7 +603,18 @@ func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev str
return nil, fmt.Errorf("cannot write to readonly deltaSession")
}

return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
switch ocs := ds.cs.(type) {
case *FileCarStore:
return ocs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
case *NonArchivalCarstore:
slice, err := blocksToCar(ctx, root, rev, ds.blks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case on type should probably be a new interface method that both implement

if err != nil {
return nil, err
}
return slice, ocs.updateLastCommit(ctx, ds.user, rev, root)
default:
return nil, fmt.Errorf("unsupported carstore type")
}
}

func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
Expand All @@ -608,6 +635,23 @@ func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
return hnw, nil
}

func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) {
buf := new(bytes.Buffer)
_, err := WriteCarHeader(buf, root)
if err != nil {
return nil, fmt.Errorf("failed to write car header: %w", err)
}

for k, blk := range blks {
_, err := LdWrite(buf, k.Bytes(), blk.RawData())
if err != nil {
return nil, fmt.Errorf("failed to write block: %w", err)
}
}

return buf.Bytes(), nil
}

func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {

buf := new(bytes.Buffer)
Expand Down
254 changes: 254 additions & 0 deletions carstore/nonarchive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package carstore

import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"sync"

"github.com/bluesky-social/indigo/models"
blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
car "github.com/ipld/go-car"
"go.opentelemetry.io/otel"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type NonArchivalCarstore struct {
db *gorm.DB

lk sync.Mutex
lastCommitCache map[models.Uid]*commitRefInfo

log *slog.Logger
}

func NewNonArchivalCarstore(db *gorm.DB) (*NonArchivalCarstore, error) {
if err := db.AutoMigrate(&commitRefInfo{}); err != nil {
return nil, err
}

return &NonArchivalCarstore{
db: db,
lastCommitCache: make(map[models.Uid]*commitRefInfo),
log: slog.Default().With("system", "carstorena"),
}, nil
}

type commitRefInfo struct {
ID uint `gorm:"primarykey"`
Uid models.Uid `gorm:"uniqueIndex"`
Rev string
Root models.DbCID
}

func (cs *NonArchivalCarstore) checkLastShardCache(user models.Uid) *commitRefInfo {
cs.lk.Lock()
defer cs.lk.Unlock()

ls, ok := cs.lastCommitCache[user]
if ok {
return ls
}

return nil
}

func (cs *NonArchivalCarstore) removeLastShardCache(user models.Uid) {
cs.lk.Lock()
defer cs.lk.Unlock()

delete(cs.lastCommitCache, user)
}

func (cs *NonArchivalCarstore) putLastShardCache(ls *commitRefInfo) {
cs.lk.Lock()
defer cs.lk.Unlock()

cs.lastCommitCache[ls.Uid] = ls
}

func (cs *NonArchivalCarstore) loadCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
var out commitRefInfo
if err := cs.db.Find(&out, "uid = ?", user).Error; err != nil {
return nil, err
}

return &out, nil
}

func (cs *NonArchivalCarstore) getCommitRefInfo(ctx context.Context, user models.Uid) (*commitRefInfo, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "getCommitRefInfo")
defer span.End()

maybeLs := cs.checkLastShardCache(user)
if maybeLs != nil {
return maybeLs, nil
}

lastShard, err := cs.loadCommitRefInfo(ctx, user)
if err != nil {
return nil, err
}

cs.putLastShardCache(lastShard)
return lastShard, nil
}

func (cs *NonArchivalCarstore) updateLastCommit(ctx context.Context, uid models.Uid, rev string, cid cid.Cid) error {
cri := &commitRefInfo{
Uid: uid,
Rev: rev,
Root: models.DbCID{CID: cid},
}

if err := cs.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "uid"}},
UpdateAll: true,
}).Create(cri).Error; err != nil {
return fmt.Errorf("update or set last commit info: %w", err)
}

cs.putLastShardCache(cri)

return nil
}

func (cs *NonArchivalCarstore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
defer span.End()

// TODO: ensure that we don't write updates on top of the wrong head
// this needs to be a compare and swap type operation
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return nil, err
}

if since != nil && *since != lastShard.Rev {
cs.log.Warn("revision mismatch: %s != %s: %s", *since, lastShard.Rev, ErrRepoBaseMismatch)
}

return &DeltaSession{
fresh: blockstore.NewBlockstore(datastore.NewMapDatastore()),
blks: make(map[cid.Cid]blockformat.Block),
base: &userView{
user: user,
cs: cs,
prefetch: true,
cache: make(map[cid.Cid]blockformat.Block),
},
user: user,
baseCid: lastShard.Root.CID,
cs: cs,
seq: 0,
lastRev: lastShard.Rev,
}, nil
}

func (cs *NonArchivalCarstore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
return &DeltaSession{
base: &userView{
user: user,
cs: cs,
prefetch: false,
cache: make(map[cid.Cid]blockformat.Block),
},
readonly: true,
user: user,
cs: cs,
}, nil
}

// TODO: incremental is only ever called true, remove the param
func (cs *NonArchivalCarstore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error {
return fmt.Errorf("not supported in non-archival mode")
}

func (cs *NonArchivalCarstore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
defer span.End()

carr, err := car.NewCarReader(bytes.NewReader(carslice))
if err != nil {
return cid.Undef, nil, err
}

if len(carr.Header.Roots) != 1 {
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
}

ds, err := cs.NewDeltaSession(ctx, uid, since)
if err != nil {
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
}

var cids []cid.Cid
for {
blk, err := carr.Next()
if err != nil {
if err == io.EOF {
break
}
return cid.Undef, nil, err
}

cids = append(cids, blk.Cid())

if err := ds.Put(ctx, blk); err != nil {
return cid.Undef, nil, err
}
}

return carr.Header.Roots[0], ds, nil
}

func (cs *NonArchivalCarstore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return cid.Undef, err
}
if lastShard.ID == 0 {
return cid.Undef, nil
}

return lastShard.Root.CID, nil
}

func (cs *NonArchivalCarstore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
lastShard, err := cs.getCommitRefInfo(ctx, user)
if err != nil {
return "", err
}
if lastShard.ID == 0 {
return "", nil
}

return lastShard.Rev, nil
}

func (cs *NonArchivalCarstore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
return nil, nil
}

func (cs *NonArchivalCarstore) WipeUserData(ctx context.Context, user models.Uid) error {
if err := cs.db.Raw("DELETE from commit_ref_infos WHERE uid = ?", user).Error; err != nil {
return err
}

cs.removeLastShardCache(user)
return nil
}

func (cs *NonArchivalCarstore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
return nil, fmt.Errorf("compaction not supported on non-archival")
}

func (cs *NonArchivalCarstore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
return nil, fmt.Errorf("compaction not supported in non-archival")
}
Loading
Loading