Skip to content

Commit

Permalink
add pacer based on wall clock instead of relative differences in dura…
Browse files Browse the repository at this point in the history
…tion (#12)

* add pacer based on wall clock instead of relative differences in duration

The reason for the change is that errors in pacing add up if we pace relative to
the previous request, but do not add up if we always do our calculations relative
to the start of the phase.

Like measuring 1 meter by measuring 2 millimeters on top of each other 500 times
will not be as accurate as measuring the 1 meter in one go.

The source of errors is simply execution time.

* fix last request's wall time

* dummyweb with latency, worker tuning

* add idle/max connections

* print stats from pacer

* spelling

---------

Co-authored-by: George Malamidis <[email protected]>
  • Loading branch information
szmglh and georgemalamidis-lh authored Jul 26, 2024
1 parent 2c8eb69 commit 92ce3fd
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 19 deletions.
5 changes: 4 additions & 1 deletion etc/dummyweb.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"fmt"
"log"
"math/rand"
"net/http"
"time"
)
Expand All @@ -10,14 +12,15 @@ var count int

func handler(w http.ResponseWriter, r *http.Request) {
count++
time.Sleep(time.Duration(rand.Intn(250)) * time.Millisecond)
w.Write([]byte("hi\n"))
}

func main() {
// Crude RPS reporting
go func() {
for {
println(count)
fmt.Printf("rps: %d\n", count)
count = 0
time.Sleep(time.Second)
}
Expand Down
32 changes: 30 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,38 @@ func main() {
silent := flag.Bool("silent", false, "Suppress output")
dryRun := flag.Bool("dry-run", false, "Consume input but do not send HTTP requests to targets")
timeout := flag.Int("timeout", 10, "HTTP client timeout in seconds")
connections := flag.Int("connections", 10000, "Max open idle connections per target host")
maxConnections := flag.Int("max-connections", 0, "Max connections per target host (default unlimited)")
strict := flag.Bool("strict", false, "Panic on bad input")
memprofile := flag.String("memprofile", "", "Write memory profile to `file` before exit")
cpuprofile := flag.String("cpuprofile", "", "Write cpu profile to `file` before exit")
numWorkers := flag.Int("workers", 1000, "Number of client workers to use")
numWorkers := flag.Int("workers", runtime.NumCPU()*2, "Number of client workers to use")
printStatsInterval := flag.Duration("print-stats", 0, `Statistics report interval, e.g., "1m"
Each report line is printed to stderr with the following fields in logfmt format:
report_time
The calculated wall time for when this line should be printed in RFC3339 format.
skew_seconds
Difference between "report_time" and current time in seconds. When the absolute
value of this is higher than about 100ms, it shows that ripley cannot generate
enough load. Consider increasing workers, max connections, and/or CPU and IO requests.
last_request_time
Original request time of the last request in RFC3339 format.
rate
Current rate of playback as specified in "pace" flag.
expected_rps
Expected requests per second since the last report. This will differ from the
actual requests per second if the system is unable to drive that many requests.
If that is the case, consider increasing workers, max connections, and/or
CPU and IO requests.
When 0 (default) or negative, reporting is switched off.
`)

flag.Parse()

Expand All @@ -58,7 +86,7 @@ func main() {
defer pprof.StopCPUProfile()
}

exitCode = ripley.Replay(*paceStr, *silent, *dryRun, *timeout, *strict, *numWorkers)
exitCode = ripley.Replay(*paceStr, *silent, *dryRun, *timeout, *strict, *numWorkers, *connections, *maxConnections, *printStatsInterval)

if *memprofile != "" {
f, err := os.Create(*memprofile)
Expand Down
8 changes: 6 additions & 2 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@ type Result struct {
ErrorMsg string `json:"error"`
}

func startClientWorkers(numWorkers int, requests <-chan *request, results chan<- *Result, dryRun bool, timeout int) {
func startClientWorkers(numWorkers int, requests <-chan *request, results chan<- *Result, dryRun bool, timeout, connections, maxConnections int) {
client := &http.Client{
Timeout: time.Duration(timeout) * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Transport: &http.Transport{
MaxIdleConnsPerHost: connections,
MaxConnsPerHost: maxConnections,
},
}

for i := 0; i <= numWorkers; i++ {
for i := 0; i < numWorkers; i++ {
go doHttpRequest(client, requests, results, dryRun)
}
}
Expand Down
53 changes: 47 additions & 6 deletions pkg/pace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
package ripley

import (
"fmt"
"os"
"strconv"
"strings"
"time"
)

type pacer struct {
phases []*phase
lastRequestTime time.Time
done bool
ReportInterval time.Duration
phases []*phase
lastRequestTime time.Time // last request that we already replayed in "log time"
lastRequestWallTime time.Time // last request that we already replayed in "wall time"
phaseStartRequestTime time.Time
phaseStartWallTime time.Time
done bool
requestCounter int
nextReport time.Time
}

type phase struct {
Expand All @@ -48,11 +56,16 @@ func newPacer(phasesStr string) (*pacer, error) {
func (p *pacer) start() {
// Run a timer for the first phase's duration
time.AfterFunc(p.phases[0].duration, p.onPhaseElapsed)
if p.ReportInterval > 0 {
p.nextReport = time.Now().Add(p.ReportInterval)
}
}

func (p *pacer) onPhaseElapsed() {
// Pop phase
p.phases = p.phases[1:]
p.phaseStartRequestTime = p.lastRequestTime
p.phaseStartWallTime = p.lastRequestWallTime

if len(p.phases) == 0 {
p.done = true
Expand All @@ -63,14 +76,42 @@ func (p *pacer) onPhaseElapsed() {
}

func (p *pacer) waitDuration(t time.Time) time.Duration {
// If there are no more phases left, continue with the last phase's rate
now := time.Now()

if p.lastRequestTime.IsZero() {
p.lastRequestTime = t
p.lastRequestWallTime = now
p.phaseStartRequestTime = p.lastRequestTime
p.phaseStartWallTime = p.lastRequestWallTime
}

duration := t.Sub(p.lastRequestTime)
originalDurationFromPhaseStart := t.Sub(p.phaseStartRequestTime)
expectedDurationFromPhaseStart := time.Duration(float64(originalDurationFromPhaseStart) / p.phases[0].rate)
expectedWallTime := p.phaseStartWallTime.Add(expectedDurationFromPhaseStart)

p.reportStats(now, expectedWallTime)

duration := expectedWallTime.Sub(now)
p.lastRequestTime = t
return time.Duration(float64(duration) / p.phases[0].rate)
p.lastRequestWallTime = expectedWallTime
return duration
}

func (p *pacer) reportStats(now, expectedWallTime time.Time) {
if p.ReportInterval <= 0 {
return
}
for p.nextReport.Before(expectedWallTime) {
fmt.Fprintf(os.Stderr, "report_time=%s skew_seconds=%f last_request_time=%s rate=%f expected_rps=%d\n",
p.nextReport.Format(time.RFC3339),
now.Sub(p.nextReport).Seconds(),
p.lastRequestTime.Format(time.RFC3339),
p.phases[0].rate, // note that this is correct... the phase change itself is incorrect, but its error is minimal with enough requests, and it is simpler
p.requestCounter/int(p.ReportInterval.Seconds()))
p.nextReport = p.nextReport.Add(p.ReportInterval)
p.requestCounter = 0
}
p.requestCounter++
}

// Format is [duration]@[rate] [duration]@[rate]..."
Expand Down
17 changes: 11 additions & 6 deletions pkg/pace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
package ripley

import (
"math"
"testing"
"time"
)
Expand Down Expand Up @@ -83,15 +84,15 @@ func TestWaitDuration(t *testing.T) {
now := time.Now()
duration := pacer.waitDuration(now)

if duration != 0 {
t.Errorf("duration = %v; want 0", duration)
if duration > 0 {
t.Errorf("duration = %v; want 0 or negative", duration)
}

now = now.Add(2 * time.Second)
duration = pacer.waitDuration(now)
expected := 2 * time.Second

if duration != expected {
if !equalsWithinThreshold(duration, expected, 10*time.Microsecond) {
t.Errorf("duration = %v; want %v", duration, expected)
}
}
Expand All @@ -106,15 +107,15 @@ func TestWaitDuration10X(t *testing.T) {
now := time.Now()
duration := pacer.waitDuration(now)

if duration != 0 {
t.Errorf("duration = %v; want 0", duration)
if duration > 0 {
t.Errorf("duration = %v; want 0 or negative", duration)
}

now = now.Add(1 * time.Second)
duration = pacer.waitDuration(now)
expected := time.Second / 10

if duration != expected {
if !equalsWithinThreshold(duration, expected, 10*time.Microsecond) {
t.Errorf("duration = %v; want %v", duration, expected)
}
}
Expand All @@ -136,3 +137,7 @@ func TestPacerDoneOnLastPhaseElapsed(t *testing.T) {
t.Errorf("pacer.done = %v; want true", pacer.done)
}
}

func equalsWithinThreshold(d1, d2, threshold time.Duration) bool {
return math.Abs(float64(d1-d2)) <= float64(threshold)
}
5 changes: 3 additions & 2 deletions pkg/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"time"
)

func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, numWorkers int) int {
func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, numWorkers, connections, maxConnections int, printStatsInterval time.Duration) int {
// Default exit code
var exitCode int = 0
// Ensures we have handled all HTTP request results before exiting
Expand All @@ -43,6 +43,7 @@ func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, num

// The pacer controls the rate of replay
pacer, err := newPacer(phasesStr)
pacer.ReportInterval = printStatsInterval

if err != nil {
panic(err)
Expand All @@ -52,7 +53,7 @@ func Replay(phasesStr string, silent, dryRun bool, timeout int, strict bool, num
scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 32*1024*1024))

// Start HTTP client goroutine pool
startClientWorkers(numWorkers, requests, results, dryRun, timeout)
startClientWorkers(numWorkers, requests, results, dryRun, timeout, connections, maxConnections)
pacer.start()

// Goroutine to handle the HTTP client result
Expand Down

0 comments on commit 92ce3fd

Please sign in to comment.