diff --git a/bgs/bgs.go b/bgs/bgs.go index 245afe404..c3501ff41 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -744,6 +744,11 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event ctx, span := otel.Tracer("bgs").Start(ctx, "handleFedEvent") defer span.End() + start := time.Now() + defer func() { + eventsHandleDuration.WithLabelValues(host.Host).Observe(time.Since(start).Seconds()) + }() + eventsReceivedCounter.WithLabelValues(host.Host).Add(1) switch { @@ -1222,7 +1227,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*comp results := make(map[models.Uid]*carstore.CompactionStats) for _, r := range repos { - st, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr) + st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr) if err != nil { log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) continue diff --git a/bgs/metrics.go b/bgs/metrics.go index b45f06b0d..97f444c48 100644 --- a/bgs/metrics.go +++ b/bgs/metrics.go @@ -16,6 +16,12 @@ var eventsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "The total number of events received", }, []string{"pds"}) +var eventsHandleDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "events_handle_duration", + Help: "A histogram of handleFedEvent latencies", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), +}, []string{"pds"}) + var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "repo_commits_received_counter", Help: "The total number of events received", diff --git a/carstore/bs.go b/carstore/bs.go index 497fd3373..2adb7e3ce 100644 --- a/carstore/bs.go +++ b/carstore/bs.go @@ -164,13 +164,30 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro } } +const prefetchThreshold = 512 << 10 + func (uv *userView) prefetchRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) { + ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") + defer span.End() + fi, err := os.Open(path) if err != nil { return nil, err } defer fi.Close() + st, err := fi.Stat() + if err != nil { + return nil, fmt.Errorf("stat file for prefetch: %w", err) + } + + span.SetAttributes(attribute.Int64("shard_size", st.Size())) + + if st.Size() > prefetchThreshold { + span.SetAttributes(attribute.Bool("no_prefetch", true)) + return doBlockRead(fi, k, offset) + } + cr, err := car.NewCarReader(fi) if err != nil { return nil, err @@ -203,6 +220,10 @@ func (uv *userView) singleRead(ctx context.Context, k cid.Cid, path string, offs } defer fi.Close() + return doBlockRead(fi, k, offset) +} + +func doBlockRead(fi *os.File, k cid.Cid, offset int64) (blockformat.Block, error) { seeked, err := fi.Seek(offset, io.SeekStart) if err != nil { return nil, err @@ -724,7 +745,7 @@ func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) e ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs") defer span.End() - if err := createInBatches(ctx, tx, brefs, 1000); err != nil { + if err := createInBatches(ctx, tx, brefs, 2000); err != nil { return err } @@ -1008,7 +1029,7 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error { return outErr } - chunkSize := 100 + chunkSize := 10000 for i := 0; i < len(shs); i += chunkSize { sl := shs[i:] if len(sl) > chunkSize { @@ -1163,6 +1184,8 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards") defer span.End() + span.SetAttributes(attribute.Int("shards", len(shardIds))) + chunkSize := 10000 out := make([]blockRef, 0, len(shardIds)) for i := 0; i < len(shardIds); i += chunkSize { @@ -1179,6 +1202,8 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) out = append(out, brefs...) } + span.SetAttributes(attribute.Int("refs", len(out))) + return out, nil } @@ -1396,7 +1421,7 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale } } - chunkSize := 500 + chunkSize := 10000 for i := 0; i < len(staleToDelete); i += chunkSize { sl := staleToDelete[i:] if len(sl) > chunkSize { diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index 87f18bedb..30ba23edd 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -199,7 +199,7 @@ var postCmd = &cli.Command{ Repo: auth.Did, Record: &lexutil.LexiconTypeDecoder{&appbsky.FeedPost{ Text: text, - CreatedAt: time.Now().Format(util.ISO8601), + CreatedAt: time.Now().UTC().Format(util.ISO8601), }}, }) if err != nil { diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index e9ffece46..1a5d07d5b 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -25,6 +25,7 @@ import ( "github.com/ipld/go-car" cbg "github.com/whyrusleeping/cbor-gen" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "gorm.io/gorm" ) @@ -490,6 +491,8 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") defer span.End() + span.SetAttributes(attribute.Int64("uid", int64(uid))) + log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) unlock := rm.lockUser(ctx, uid)