From 5cf0703417fa3640a4ced35e931fe6462b1ebea0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 10 Nov 2023 09:29:32 -0800 Subject: [PATCH 1/2] make testing slow consumers easy --- cmd/gosky/main.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index d2e231241..3c2fefa88 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -143,6 +143,10 @@ var readRepoStreamCmd = &cli.Command{ &cli.BoolFlag{ Name: "resolve-handles", }, + &cli.DurationFlag{ + Name: "read-delay", + Usage: "make handling each event take at least this long (debug utility)", + }, }, ArgsUsage: `[ [cursor]]`, Action: func(cctx *cli.Context) error { @@ -204,8 +208,14 @@ var readRepoStreamCmd = &cli.Command{ return h, nil } + rr := cctx.Duration("read-delay") + rsc := &events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { + if rr != 0 { + time.Sleep(rr) + } + if jsonfmt { b, err := json.Marshal(evt) if err != nil { From ab48aed75834bbd8ecfd66ceeb019f108e8077b5 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 21:24:29 +0000 Subject: [PATCH 2/2] Use a rate.limit --- cmd/gosky/main.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/cmd/gosky/main.go b/cmd/gosky/main.go index 3c2fefa88..fd572282b 100644 --- a/cmd/gosky/main.go +++ b/cmd/gosky/main.go @@ -25,6 +25,7 @@ import ( "github.com/bluesky-social/indigo/util" "github.com/bluesky-social/indigo/util/cliutil" "github.com/bluesky-social/indigo/xrpc" + "golang.org/x/time/rate" "github.com/gorilla/websocket" lru "github.com/hashicorp/golang-lru" @@ -143,9 +144,9 @@ var readRepoStreamCmd = &cli.Command{ &cli.BoolFlag{ Name: "resolve-handles", }, - &cli.DurationFlag{ - Name: "read-delay", - Usage: "make handling each event take at least this long (debug utility)", + &cli.Float64Flag{ + Name: "max-throughput", + Usage: "limit event consumption to a given # of req/sec (debug utility)", }, }, ArgsUsage: `[ [cursor]]`, @@ -207,13 +208,15 @@ var readRepoStreamCmd = &cli.Command{ return h, nil } - - rr := cctx.Duration("read-delay") + var limiter *rate.Limiter + if cctx.Float64("max-throughput") > 0 { + limiter = rate.NewLimiter(rate.Limit(cctx.Float64("max-throughput")), 1) + } rsc := &events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { - if rr != 0 { - time.Sleep(rr) + if limiter != nil { + limiter.Wait(ctx) } if jsonfmt {