Skip to content

Commit

Permalink
more trace attributes and some tweaks (#367)
Browse files Browse the repository at this point in the history
a small grab-bag of changes, most things are either tweaking a batch
size or adding an attribute to a trace.
The one 'real' change here is falling back to an individual block read
instead of a prefetch if the shard in question is over a size limit.
  • Loading branch information
whyrusleeping authored Oct 3, 2023
2 parents 2222be4 + 5339281 commit b1591bc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
7 changes: 6 additions & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,11 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
ctx, span := otel.Tracer("bgs").Start(ctx, "handleFedEvent")
defer span.End()

start := time.Now()
defer func() {
eventsHandleDuration.WithLabelValues(host.Host).Observe(time.Since(start).Seconds())
}()

eventsReceivedCounter.WithLabelValues(host.Host).Add(1)

switch {
Expand Down Expand Up @@ -1222,7 +1227,7 @@ func (bgs *BGS) runRepoCompaction(ctx context.Context, lim int, dry bool) (*comp

results := make(map[models.Uid]*carstore.CompactionStats)
for _, r := range repos {
st, err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr)
st, err := bgs.repoman.CarStore().CompactUserShards(context.Background(), r.Usr)
if err != nil {
log.Errorf("failed to compact shards for user %d: %s", r.Usr, err)
continue
Expand Down
6 changes: 6 additions & 0 deletions bgs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ var eventsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "The total number of events received",
}, []string{"pds"})

var eventsHandleDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "events_handle_duration",
Help: "A histogram of handleFedEvent latencies",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"pds"})

var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "repo_commits_received_counter",
Help: "The total number of events received",
Expand Down
31 changes: 28 additions & 3 deletions carstore/bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,30 @@ func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, erro
}
}

const prefetchThreshold = 512 << 10

func (uv *userView) prefetchRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard")
defer span.End()

fi, err := os.Open(path)
if err != nil {
return nil, err
}
defer fi.Close()

st, err := fi.Stat()
if err != nil {
return nil, fmt.Errorf("stat file for prefetch: %w", err)
}

span.SetAttributes(attribute.Int64("shard_size", st.Size()))

if st.Size() > prefetchThreshold {
span.SetAttributes(attribute.Bool("no_prefetch", true))
return doBlockRead(fi, k, offset)
}

cr, err := car.NewCarReader(fi)
if err != nil {
return nil, err
Expand Down Expand Up @@ -203,6 +220,10 @@ func (uv *userView) singleRead(ctx context.Context, k cid.Cid, path string, offs
}
defer fi.Close()

return doBlockRead(fi, k, offset)
}

func doBlockRead(fi *os.File, k cid.Cid, offset int64) (blockformat.Block, error) {
seeked, err := fi.Seek(offset, io.SeekStart)
if err != nil {
return nil, err
Expand Down Expand Up @@ -724,7 +745,7 @@ func createBlockRefs(ctx context.Context, tx *gorm.DB, brefs []map[string]any) e
ctx, span := otel.Tracer("carstore").Start(ctx, "createBlockRefs")
defer span.End()

if err := createInBatches(ctx, tx, brefs, 1000); err != nil {
if err := createInBatches(ctx, tx, brefs, 2000); err != nil {
return err
}

Expand Down Expand Up @@ -1008,7 +1029,7 @@ func (cs *CarStore) deleteShards(ctx context.Context, shs []*CarShard) error {
return outErr
}

chunkSize := 100
chunkSize := 10000
for i := 0; i < len(shs); i += chunkSize {
sl := shs[i:]
if len(sl) > chunkSize {
Expand Down Expand Up @@ -1163,6 +1184,8 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint)
ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards")
defer span.End()

span.SetAttributes(attribute.Int("shards", len(shardIds)))

chunkSize := 10000
out := make([]blockRef, 0, len(shardIds))
for i := 0; i < len(shardIds); i += chunkSize {
Expand All @@ -1179,6 +1202,8 @@ func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint)
out = append(out, brefs...)
}

span.SetAttributes(attribute.Int("refs", len(out)))

return out, nil
}

Expand Down Expand Up @@ -1396,7 +1421,7 @@ func (cs *CarStore) deleteStaleRefs(ctx context.Context, brefs []blockRef, stale
}
}

chunkSize := 500
chunkSize := 10000
for i := 0; i < len(staleToDelete); i += chunkSize {
sl := staleToDelete[i:]
if len(sl) > chunkSize {
Expand Down
2 changes: 1 addition & 1 deletion cmd/gosky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ var postCmd = &cli.Command{
Repo: auth.Did,
Record: &lexutil.LexiconTypeDecoder{&appbsky.FeedPost{
Text: text,
CreatedAt: time.Now().Format(util.ISO8601),
CreatedAt: time.Now().UTC().Format(util.ISO8601),
}},
})
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ipld/go-car"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -490,6 +491,8 @@ func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint,
ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
defer span.End()

span.SetAttributes(attribute.Int64("uid", int64(uid)))

log.Infow("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)

unlock := rm.lockUser(ctx, uid)
Expand Down

0 comments on commit b1591bc

Please sign in to comment.