Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bolson/log expirement #858

Closed
wants to merge 10 commits into from
2 changes: 1 addition & 1 deletion automod/consumer/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (fc *FirehoseConsumer) Run(ctx context.Context) error {
fc.Logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency)
}

return events.HandleRepoStream(ctx, con, scheduler)
return events.HandleRepoStream(ctx, con, scheduler, fc.Logger)
}

// NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better.
Expand Down
2 changes: 1 addition & 1 deletion bgs/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (bgs *BGS) handleAdminPostResyncPDS(e echo.Context) error {
ctx := context.Background()
err := bgs.ResyncPDS(ctx, pds)
if err != nil {
log.Errorw("failed to resync PDS", "err", err, "pds", pds.Host)
log.Error("failed to resync PDS", "err", err, "pds", pds.Host)
}
}()

Expand Down
116 changes: 60 additions & 56 deletions bgs/bgs.go

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions bgs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *Compactor) Start(bgs *BGS) {
}
if c.requeueInterval > 0 {
go func() {
log.Infow("starting compactor requeue routine",
log.Info("starting compactor requeue routine",
"interval", c.requeueInterval,
"limit", c.requeueLimit,
"shardCount", c.requeueShardCount,
Expand All @@ -226,7 +226,7 @@ func (c *Compactor) Start(bgs *BGS) {
ctx := context.Background()
ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine")
if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil {
log.Errorw("failed to enqueue all repos", "err", err)
log.Error("failed to enqueue all repos", "err", err)
}
span.End()
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func (c *Compactor) doWork(bgs *BGS, strategy NextStrategy) {
time.Sleep(time.Second * 5)
continue
}
log.Errorw("failed to compact repo",
log.Error("failed to compact repo",
"err", err,
"uid", state.latestUID,
"repo", state.latestDID,
Expand All @@ -273,7 +273,7 @@ func (c *Compactor) doWork(bgs *BGS, strategy NextStrategy) {
// Pause for a bit to avoid spamming failed compactions
time.Sleep(time.Millisecond * 100)
} else {
log.Infow("compacted repo",
log.Info("compacted repo",
"uid", state.latestUID,
"repo", state.latestDID,
"status", state.status,
Expand Down Expand Up @@ -352,7 +352,7 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStra
func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) {
ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
defer span.End()
log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
log.Info("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
c.q.Append(user.ID, fast)
}

Expand Down Expand Up @@ -396,7 +396,7 @@ func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shar
c.q.Append(r.Usr, fast)
}

log.Infow("done enqueueing all repos", "repos_enqueued", len(repos))
log.Info("done enqueueing all repos", "repos_enqueued", len(repos))

return nil
}
49 changes: 26 additions & 23 deletions bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"math/rand"
"strings"
"sync"
Expand All @@ -22,6 +23,8 @@ import (
"gorm.io/gorm"
)

var log = slog.Default().With("system", "bgs")

type IndexCallback func(context.Context, *models.PDS, *events.XRPCStreamEvent) error

// TODO: rename me
Expand Down Expand Up @@ -129,7 +132,7 @@ func NewSlurper(db *gorm.DB, cb IndexCallback, opts *SlurperOptions) (*Slurper,
var errs []error
if errs = s.flushCursors(ctx); len(errs) > 0 {
for _, err := range errs {
log.Errorf("failed to flush cursors on shutdown: %s", err)
log.Error("failed to flush cursors on shutdown", "err", err)
}
}
log.Info("done flushing PDS cursors on shutdown")
Expand All @@ -142,7 +145,7 @@ func NewSlurper(db *gorm.DB, cb IndexCallback, opts *SlurperOptions) (*Slurper,
defer span.End()
if errs := s.flushCursors(ctx); len(errs) > 0 {
for _, err := range errs {
log.Errorf("failed to flush cursors: %s", err)
log.Error("failed to flush cursors", "err", err)
}
}
log.Debug("done flushing PDS cursors")
Expand Down Expand Up @@ -210,7 +213,7 @@ func (s *Slurper) Shutdown() []error {
errs := <-s.shutdownResult
if len(errs) > 0 {
for _, err := range errs {
log.Errorf("shutdown error: %s", err)
log.Error("shutdown error", "err", err)
}
}
log.Info("slurper shutdown complete")
Expand Down Expand Up @@ -490,14 +493,14 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s
url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host.Host, cursor)
con, res, err := d.DialContext(ctx, url, nil)
if err != nil {
log.Warnw("dialing failed", "pdsHost", host.Host, "err", err, "backoff", backoff)
log.Warn("dialing failed", "pdsHost", host.Host, "err", err, "backoff", backoff)
time.Sleep(sleepForBackoff(backoff))
backoff++

if backoff > 15 {
log.Warnw("pds does not appear to be online, disabling for now", "pdsHost", host.Host)
log.Warn("pds does not appear to be online, disabling for now", "pdsHost", host.Host)
if err := s.db.Model(&models.PDS{}).Where("id = ?", host.ID).Update("registered", false).Error; err != nil {
log.Errorf("failed to unregister failing pds: %w", err)
log.Error("failed to unregister failing pds", "err", err)
}

return
Expand All @@ -506,15 +509,15 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s
continue
}

log.Info("event subscription response code: ", res.StatusCode)
log.Info("event subscription response", "code", res.StatusCode)

curCursor := cursor
if err := s.handleConnection(ctx, host, con, &cursor, sub); err != nil {
if errors.Is(err, ErrTimeoutShutdown) {
log.Infof("shutting down pds subscription to %s, no activity after %s", host.Host, EventsTimeout)
log.Info("shutting down pds subscription after timeout", "host", host.Host, "time", EventsTimeout)
return
}
log.Warnf("connection to %q failed: %s", host.Host, err)
log.Warn("connection to failed", "host", host.Host, "err", err)
}

if cursor > curCursor {
Expand Down Expand Up @@ -545,11 +548,11 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w

rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
log.Debugw("got remote repo event", "pdsHost", host.Host, "repo", evt.Repo, "seq", evt.Seq)
log.Debug("got remote repo event", "pdsHost", host.Host, "repo", evt.Repo, "seq", evt.Seq)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoCommit: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
}
*lastCursor = evt.Seq

Expand All @@ -560,11 +563,11 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
log.Infow("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
log.Info("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoHandle: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
}
*lastCursor = evt.Seq

Expand All @@ -575,11 +578,11 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
log.Infow("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
log.Info("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoMigrate: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
}
*lastCursor = evt.Seq

Expand All @@ -590,11 +593,11 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
log.Infow("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
log.Info("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoTombstone: evt,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
}
*lastCursor = evt.Seq

Expand All @@ -605,15 +608,15 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
log.Infow("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
log.Info("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
return nil
},
RepoIdentity: func(ident *comatproto.SyncSubscribeRepos_Identity) error {
log.Infow("identity event", "did", ident.Did)
log.Info("identity event", "did", ident.Did)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoIdentity: ident,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, ident.Seq, err)
log.Error("failed handling event", "host", host.Host, "seq", ident.Seq, "err", err)
}
*lastCursor = ident.Seq

Expand All @@ -624,11 +627,11 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoAccount: func(acct *comatproto.SyncSubscribeRepos_Account) error {
log.Infow("account event", "did", acct.Did, "status", acct.Status)
log.Info("account event", "did", acct.Did, "status", acct.Status)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoAccount: acct,
}); err != nil {
log.Errorf("failed handling event from %q (%d): %s", host.Host, acct.Seq, err)
log.Error("failed handling event", "host", host.Host, "seq", acct.Seq, "err", err)
}
*lastCursor = acct.Seq

Expand Down Expand Up @@ -671,7 +674,7 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
con.RemoteAddr().String(),
instrumentedRSC.EventHandler,
)
return events.HandleRepoStream(ctx, con, pool)
return events.HandleRepoStream(ctx, con, pool, nil)
}

func (s *Slurper) updateCursor(sub *activeSub, curs int64) error {
Expand Down
26 changes: 13 additions & 13 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
log.Errorw("failed to lookup user", "err", err, "did", did)
log.Error("failed to lookup user", "err", err, "did", did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

Expand Down Expand Up @@ -61,7 +61,7 @@ func (s *BGS) handleComAtprotoSyncGetRecord(ctx context.Context, collection stri
if errors.Is(err, mst.ErrNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo")
}
log.Errorw("failed to get record from repo", "err", err, "did", did, "collection", collection, "rkey", rkey)
log.Error("failed to get record from repo", "err", err, "did", did, "collection", collection, "rkey", rkey)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get record from repo")
}

Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
}
log.Errorw("failed to lookup user", "err", err, "did", did)
log.Error("failed to lookup user", "err", err, "did", did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
}

Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *BGS) handleComAtprotoSyncGetRepo(ctx context.Context, did string, since
// TODO: stream the response
buf := new(bytes.Buffer)
if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil {
log.Errorw("failed to read repo into buffer", "err", err, "did", did)
log.Error("failed to read repo into buffer", "err", err, "did", did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer")
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned")
}

log.Warnf("TODO: better host validation for crawl requests")
log.Warn("TODO: better host validation for crawl requests")

clientHost := fmt.Sprintf("%s://%s", u.Scheme, host)

Expand All @@ -191,7 +191,7 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
if len(s.nextCrawlers) != 0 {
blob, err := json.Marshal(body)
if err != nil {
log.Warnw("could not forward requestCrawl, json err", "err", err)
log.Warn("could not forward requestCrawl, json err", "err", err)
} else {
go func(bodyBlob []byte) {
for _, rpu := range s.nextCrawlers {
Expand All @@ -201,11 +201,11 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
response.Body.Close()
}
if err != nil || response == nil {
log.Warnw("requestCrawl forward failed", "host", rpu, "err", err)
log.Warn("requestCrawl forward failed", "host", rpu, "err", err)
} else if response.StatusCode != http.StatusOK {
log.Warnw("requestCrawl forward failed", "host", rpu, "status", response.Status)
log.Warn("requestCrawl forward failed", "host", rpu, "status", response.Status)
} else {
log.Infow("requestCrawl forward successful", "host", rpu)
log.Info("requestCrawl forward successful", "host", rpu)
}
}
}(blob)
Expand All @@ -231,7 +231,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, l
if err == gorm.ErrRecordNotFound {
return &comatprototypes.SyncListRepos_Output{}, nil
}
log.Errorw("failed to query users", "err", err)
log.Error("failed to query users", "err", err)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query users")
}

Expand All @@ -252,7 +252,7 @@ func (s *BGS) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, l

root, err := s.repoman.GetRepoRoot(ctx, user.ID)
if err != nil {
log.Errorw("failed to get repo root", "err", err, "did", user.Did)
log.Error("failed to get repo root", "err", err, "did", user.Did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error()))
}

Expand Down Expand Up @@ -303,13 +303,13 @@ func (s *BGS) handleComAtprotoSyncGetLatestCommit(ctx context.Context, did strin

root, err := s.repoman.GetRepoRoot(ctx, u.ID)
if err != nil {
log.Errorw("failed to get repo root", "err", err, "did", u.Did)
log.Error("failed to get repo root", "err", err, "did", u.Did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo root")
}

rev, err := s.repoman.GetRepoRev(ctx, u.ID)
if err != nil {
log.Errorw("failed to get repo rev", "err", err, "did", u.Did)
log.Error("failed to get repo rev", "err", err, "did", u.Did)
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo rev")
}

Expand Down
Loading
Loading