-
Notifications
You must be signed in to change notification settings - Fork 164
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sources/config_reader.c: new default client_max_routing (#715)
Default per worker 16 value should be increased to achive more CPU utilization and client connections rate. After testing some client connection parallelism i learn that there should be ~ 60-80 client connections per worker to get 100% of cpu loading (with non-localhost connections) So this patch increases 16 to 64 benchmarks/client_max_routing might be used for future learning Signed-off-by: rkhapov <[email protected]> Co-authored-by: rkhapov <[email protected]>
- Loading branch information
Showing
5 changed files
with
358 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
module catchuploadtest | ||
|
||
go 1.23.3 | ||
|
||
require ( | ||
github.com/lib/pq v1.10.9 | ||
golang.org/x/term v0.26.0 | ||
) | ||
|
||
require golang.org/x/sys v0.27.0 // indirect |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= | ||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= | ||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= | ||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= | ||
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,340 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"encoding/csv" | ||
"flag" | ||
"fmt" | ||
"os" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
_ "github.com/lib/pq" | ||
"golang.org/x/term" | ||
) | ||
|
||
type IntArrayFlags []int | ||
|
||
func newIntArrayFlags(defaults []int) *IntArrayFlags { | ||
i := IntArrayFlags(defaults) | ||
return &i | ||
} | ||
|
||
func (i *IntArrayFlags) String() string { | ||
return fmt.Sprintf("%v", *i) | ||
} | ||
|
||
func (i *IntArrayFlags) Set(value string) error { | ||
*i = nil | ||
|
||
for _, n := range strings.FieldsFunc(value, func(r rune) bool { return r == ',' || r == ' ' }) { | ||
v, err := strconv.Atoi(n) | ||
if err != nil { | ||
return err | ||
} | ||
*i = append(*i, v) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type DurationArrayFlags []time.Duration | ||
|
||
func newDurationArrayFlags(defaults []time.Duration) *DurationArrayFlags { | ||
d := DurationArrayFlags(defaults) | ||
return &d | ||
} | ||
|
||
func (d *DurationArrayFlags) String() string { | ||
return fmt.Sprintf("%v", *d) | ||
} | ||
|
||
func (d *DurationArrayFlags) Set(value string) error { | ||
*d = nil | ||
|
||
for _, n := range strings.FieldsFunc(value, func(r rune) bool { return r == ',' || r == ' ' }) { | ||
v, err := time.ParseDuration(n) | ||
if err != nil { | ||
return err | ||
} | ||
*d = append(*d, v) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type Config struct { | ||
ConnectionString string | ||
BenchmarkDuration time.Duration | ||
ClientParallels []int | ||
ConnectTimeouts []time.Duration | ||
SelectTimeouts []time.Duration | ||
PauseDuration time.Duration | ||
OutputFile string | ||
} | ||
|
||
type Result struct { | ||
Connections int64 | ||
Selects int64 | ||
Errors int64 | ||
Clients int | ||
TotalTime time.Duration | ||
ConnectTimeout time.Duration | ||
SelectTimeout time.Duration | ||
} | ||
|
||
var ( | ||
connectionsCounter int64 = 0 | ||
selectCounter int64 = 0 | ||
errorsCounter int64 = 0 | ||
) | ||
|
||
func doConnect(cfg *Config, connectTimeout time.Duration) { | ||
db, err := sql.Open("postgres", cfg.ConnectionString) | ||
if err != nil { | ||
fmt.Printf("connect error: %v\n", err) | ||
atomic.AddInt64(&errorsCounter, 1) | ||
return | ||
} | ||
|
||
defer db.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), connectTimeout) | ||
defer cancel() | ||
|
||
err = db.PingContext(ctx) | ||
if err != nil { | ||
fmt.Printf("ping error: %v\n", err) | ||
atomic.AddInt64(&errorsCounter, 1) | ||
return | ||
} | ||
|
||
atomic.AddInt64(&connectionsCounter, 1) | ||
} | ||
|
||
func doConnectInf(ctx context.Context, cfg *Config, connectTimeout time.Duration, wg *sync.WaitGroup, syncStart *sync.WaitGroup) { | ||
syncStart.Wait() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
wg.Done() | ||
return | ||
default: | ||
doConnect(cfg, connectTimeout) | ||
} | ||
} | ||
} | ||
|
||
func doSelect(db *sql.DB, timeout time.Duration) { | ||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||
defer cancel() | ||
|
||
var val int | ||
err := db.QueryRowContext(ctx, "select 1 + 2").Scan(&val) | ||
if err != nil { | ||
fmt.Printf("select error: %v\n", err) | ||
atomic.AddInt64(&errorsCounter, 1) | ||
return | ||
} | ||
|
||
if val != 3 { | ||
panic("wrong value") | ||
} | ||
|
||
atomic.AddInt64(&selectCounter, 1) | ||
} | ||
|
||
func doSelectInf(ctx context.Context, cfg *Config, selectTimeout time.Duration, wg *sync.WaitGroup) { | ||
db, err := sql.Open("postgres", cfg.ConnectionString) | ||
noerror(err) | ||
|
||
defer db.Close() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
wg.Done() | ||
return | ||
default: | ||
doSelect(db, selectTimeout) | ||
} | ||
|
||
} | ||
} | ||
|
||
func doMeasure(cfg *Config, nparallel int, connectTimeout time.Duration, selectTimeout time.Duration) *Result { | ||
atomic.StoreInt64(&connectionsCounter, 0) | ||
atomic.StoreInt64(&selectCounter, 0) | ||
atomic.StoreInt64(&errorsCounter, 0) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), cfg.BenchmarkDuration) | ||
defer cancel() | ||
|
||
wg := &sync.WaitGroup{} | ||
syncStart := &sync.WaitGroup{} | ||
|
||
syncStart.Add(1) | ||
|
||
for i := 0; i < nparallel; i++ { | ||
wg.Add(1) | ||
go doSelectInf(ctx, cfg, selectTimeout, wg) | ||
} | ||
|
||
for i := 0; i < nparallel; i++ { | ||
wg.Add(1) | ||
go doConnectInf(ctx, cfg, connectTimeout, wg, syncStart) | ||
} | ||
|
||
startTime := time.Now() | ||
syncStart.Done() | ||
wg.Wait() | ||
totalTime := time.Since(startTime) | ||
|
||
return &Result{ | ||
Connections: atomic.LoadInt64(&connectionsCounter), | ||
Selects: atomic.LoadInt64(&selectCounter), | ||
Errors: atomic.LoadInt64(&errorsCounter), | ||
TotalTime: totalTime, | ||
Clients: nparallel, | ||
ConnectTimeout: connectTimeout, | ||
SelectTimeout: selectTimeout, | ||
} | ||
} | ||
|
||
func printResultHeaderLine() { | ||
fmt.Printf("clients\ttime\tconn to\tc/s\ts/s\te/s\n") | ||
} | ||
|
||
func printResultLine(r *Result) { | ||
fmt.Printf("%d\t%v\t%v\t%v\t%f\t%f\t%f\n", | ||
r.Clients, | ||
r.TotalTime, | ||
r.ConnectTimeout, | ||
r.SelectTimeout, | ||
float64(r.Connections)/r.TotalTime.Seconds(), | ||
float64(r.Selects)/r.TotalTime.Seconds(), | ||
float64(r.Errors)/r.TotalTime.Seconds(), | ||
) | ||
} | ||
|
||
func printResultConsole(results []*Result) { | ||
printResultHeaderLine() | ||
|
||
for _, r := range results { | ||
printResultLine(r) | ||
} | ||
} | ||
|
||
func printResultCsv(filename string, results []*Result) { | ||
f, err := os.Create(filename) | ||
noerror(err) | ||
defer f.Close() | ||
|
||
w := csv.NewWriter(f) | ||
defer w.Flush() | ||
|
||
w.Write([]string{"#", "clients", "time", "conn to", "select to", "c/s", "s/s", "e/s"}) | ||
for i, r := range results { | ||
err := w.Write([]string{ | ||
strconv.Itoa(i + 1), | ||
strconv.Itoa(r.Clients), | ||
r.TotalTime.String(), | ||
r.ConnectTimeout.String(), | ||
r.SelectTimeout.String(), | ||
fmt.Sprintf("%f", float64(r.Connections)/r.TotalTime.Seconds()), | ||
fmt.Sprintf("%f", float64(r.Selects)/r.TotalTime.Seconds()), | ||
fmt.Sprintf("%f", float64(r.Errors)/r.TotalTime.Seconds()), | ||
}) | ||
noerror(err) | ||
} | ||
|
||
fmt.Printf("Results saved to %s\n", filename) | ||
} | ||
|
||
func printResults(cfg *Config, results []*Result) { | ||
printResultConsole(results) | ||
printResultCsv(cfg.OutputFile, results) | ||
} | ||
|
||
func runBenchmarks(cfg *Config) []*Result { | ||
results := make([]*Result, 0, len(cfg.ClientParallels)) | ||
|
||
for _, np := range cfg.ClientParallels { | ||
for _, ct := range cfg.ConnectTimeouts { | ||
for _, st := range cfg.SelectTimeouts { | ||
r := doMeasure(cfg, np, ct, st) | ||
printResultLine(r) | ||
results = append(results, r) | ||
|
||
time.Sleep(cfg.PauseDuration) | ||
} | ||
|
||
} | ||
} | ||
|
||
return results | ||
} | ||
|
||
func readConfig() *Config { | ||
host := flag.String("host", "", "odyssey host") | ||
port := flag.Int("port", 6432, "odyssey port") | ||
user := flag.String("user", "user1", "user to connect") | ||
dbName := flag.String("db", "db1", "db to connect") | ||
sslRoot := flag.String("sslroot", "./root.crt", "ssl root.crt file path") | ||
benchDuration := flag.Duration("bench-duration", time.Second*5, "one benchmark run duration") | ||
pauseDuration := flag.Duration("pause-duration", time.Second*3, "pause between runs") | ||
outputFile := flag.String("output-file", "./result.csv", "result filename") | ||
|
||
clients := newIntArrayFlags([]int{10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) | ||
flag.Var(clients, "clients", "numbers of parallel connecting clients") | ||
|
||
connectTimeouts := newDurationArrayFlags([]time.Duration{time.Second}) | ||
flag.Var(connectTimeouts, "connect-timeouts", "connect timeouts") | ||
|
||
selectTimeouts := newDurationArrayFlags([]time.Duration{time.Second}) | ||
flag.Var(selectTimeouts, "select-timeouts", "select timeouts") | ||
|
||
flag.Parse() | ||
|
||
if len(*host) == 0 { | ||
fmt.Printf("Error: the host parameter is empty\n") | ||
flag.Usage() | ||
return nil | ||
} | ||
|
||
fmt.Printf("%s's password (no echo):", *user) | ||
password, err := term.ReadPassword(0) | ||
noerror(err) | ||
|
||
return &Config{ | ||
ConnectionString: fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=verify-full sslrootcert=%s", *host, *port, *user, string(password), *dbName, *sslRoot), | ||
BenchmarkDuration: *benchDuration, | ||
ClientParallels: *clients, | ||
PauseDuration: *pauseDuration, | ||
OutputFile: *outputFile, | ||
ConnectTimeouts: *connectTimeouts, | ||
SelectTimeouts: *selectTimeouts, | ||
} | ||
} | ||
|
||
func main() { | ||
cfg := readConfig() | ||
if cfg == nil { | ||
return | ||
} | ||
|
||
results := runBenchmarks(cfg) | ||
|
||
printResults(cfg, results) | ||
} | ||
|
||
func noerror(err error) { | ||
if err != nil { | ||
panic(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters