Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support benchmarking of multiple HTTP(S) endpoints #76

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions scripts/multiple-endpoints.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
-- This script extends wrk2 to handle multiple server addresses
-- as well as multiple paths (endpoints) per server

-----------------
-- main() context

-- main() globals
local threads = {}
local counter = 1

function setup(thread)
-- Fill global threads table with thread handles so done()
-- can process per-thread data
table.insert(threads, thread)
thread:set("id",counter)
counter = counter +1
math.randomseed(os.time())
end

-----------------
-- Thread context

function xtract(str, match, default, err_msg)
local ret, count = string.gsub(str, match, "%1", 1)
if count == 0 then
if not default then
print(string.format("Error parsing URL '%s': %s",str,err_msg))
os.exit(1)
end
ret = default
end
return ret
end

function init(args)
-- Thread globals used by done()
called_idxs = ""
urls = ""
-- Thread globals used by request(), response()
addrs = {}
idx = 0

-- table of lists; per entry:
-- proto, host, hostaddr, port, path + params
endpoints={}
-- tablre of prepared HTTP requests for endpoints above
reqs={}

-- parse command line URLs and prepare requests
for i=0, #args, 1 do
-- note that URL parsing does not support user/pass as
-- wrk2 does not support auth
local proto = xtract(args[i],
"^(http[s]?)://.*", nil, "missing or unsupported protocol")
local host = xtract(
args[i], "^http[s]?://([^/:]+)[:/]?.*", nil, "missing host")
local port = xtract(args[i], "^http[s]?://[^/:]+:(%d+).*", 80)
local path = xtract(args[i], "^http[s]?://[^/]+(/.*)","/")

-- get IP addr(s) from hostname, validate by connecting
local addr = nil
for k, v in ipairs(wrk.lookup(host, port)) do
if wrk.connect(v) then
addr = v
break
end
end
if not addr then
print(string.format(
"Error: Unable to connect to %s port %s.", host, port))
os.exit(2)
end

-- store the endpoint
endpoints[i] = {}
endpoints[i][0] = proto
endpoints[i][1] = host
endpoints[i][2] = addr
endpoints[i][3] = port
endpoints[i][4] = path
endpoints[i][5] = string.format(
"GET %s HTTP/1.1\r\nHost:%s:%s\r\n\r\n", path, host, port)
if urls == "" then
urls = args[i]
else
urls = string.format("%s,%s",urls,args[i])
end
end

urls = urls .. ","
-- initialize idx, assign req and addr
idx = math.random(0, #endpoints)
wrk.thread.addr = endpoints[idx][2]
end

function request()
local ret = endpoints[idx][5]
return ret
end


function response(status, headers, body)
-- add current index to string of endpointsi calle
local c = ","
if called_idxs == "" then c="" end
called_idxs = string.format("%s%s%s",called_idxs,c,idx)

-- Pick a new random endpoint for the next request
-- Also, update the thread's remote server addr if endpoint
-- is on a different server.
local prev_srv = endpoints[idx][2]
idx = math.random(0, #endpoints)
if prev_srv ~= endpoints[idx][2] then
-- Re-setting the thread's server address forces a reconnect
wrk.thread.addr = endpoints[idx][2]
end
end

-----------------
-- main() context

function done(summary, latency, requests)
print(string.format("Total Requests: %d", summary.requests))
print(string.format("HTTP errors: %d", summary.errors.status))
print(string.format("Requests timed out: %d", summary.errors.timeout))
print(string.format("Bytes received: %d", summary.bytes))
print(string.format("Socket connect errors: %d", summary.errors.connect))
print(string.format("Socket read errors: %d", summary.errors.read))
print(string.format("Socket write errors: %d", summary.errors.write))

-- generate table of URL strings from first thread's endpoints table
-- (all threads generate the same table in init())
local urls = {}
local counts = {}
local i = 0
t = unpack(threads,1,2)
t:get("urls"):gsub("([^,]+),",
function(u)
urls[i]=u
counts[i] = 0
i = i+1
end)

-- fetch url call counts of individual threads
local c = t:get("called_idxs")
c = c .. ","
for i, t in ipairs(threads) do
c:gsub("([0-9]+),", function(s)
i = tonumber(s)
counts[i] = counts[i] + 1
end)
end

print("\nURL call count")
for i=0, #urls, 1 do
print(string.format("%s : %d", urls[i], counts[i]))
end
end
4 changes: 2 additions & 2 deletions src/ae_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_READABLE) ee.events |= (EPOLLIN|EPOLLHUP|EPOLLRDHUP);
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
Expand Down Expand Up @@ -114,7 +114,7 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
int mask = 0;
struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP)) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
Expand Down
3 changes: 2 additions & 1 deletion src/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ status sock_close(connection *c) {
status sock_read(connection *c, size_t *n) {
ssize_t r = read(c->fd, c->buf, sizeof(c->buf));
*n = (size_t) r;
return r >= 0 ? OK : ERROR;
if (r == 0) return READ_EOF;
return r > 0 ? OK : ERROR;
}

status sock_write(connection *c, char *buf, size_t len, size_t *n) {
Expand Down
3 changes: 2 additions & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
typedef enum {
OK,
ERROR,
RETRY
RETRY,
READ_EOF
} status;

struct sock {
Expand Down
1 change: 1 addition & 0 deletions src/script.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ static int script_thread_newindex(lua_State *L) {
if (t->addr) zfree(t->addr->ai_addr);
t->addr = zrealloc(t->addr, sizeof(*addr));
script_addr_copy(addr, t->addr);
if (t->reconnect_all) t->reconnect_all(t);
} else {
luaL_error(L, "cannot set '%s' on thread", luaL_typename(L, -1));
}
Expand Down
19 changes: 18 additions & 1 deletion src/wrk.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ static void usage() {
" Time arguments may include a time unit (2s, 2m, 2h)\n");
}

static void thread_reconnect_all(void*);

int main(int argc, char **argv) {
char *url, **headers = zmalloc(argc * sizeof(char *));
struct http_parser_url parts = {};
Expand Down Expand Up @@ -130,6 +132,7 @@ int main(int argc, char **argv) {
t->connections = connections;
t->throughput = throughput;
t->stop_at = stop_at;
t->reconnect_all = NULL;

t->L = script_create(cfg.script, url, headers);
script_init(L, t, argc - optind, &argv[optind]);
Expand Down Expand Up @@ -268,6 +271,7 @@ void *thread_main(void *arg) {
connection *c = thread->cs;

for (uint64_t i = 0; i < thread->connections; i++, c++) {
c->fd = -1;
c->thread = thread;
c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
c->request = request;
Expand All @@ -279,6 +283,7 @@ void *thread_main(void *arg) {
// Stagger connects 5 msec apart within thread:
aeCreateTimeEvent(loop, i * 5, delayed_initial_connect, c, NULL);
}
thread->reconnect_all = &thread_reconnect_all;

uint64_t calibrate_delay = CALIBRATE_DELAY_MS + (thread->connections * 5);
uint64_t timeout_delay = TIMEOUT_INTERVAL_MS + (thread->connections * 5);
Expand All @@ -295,6 +300,14 @@ void *thread_main(void *arg) {
return NULL;
}

static void thread_reconnect_all(void *_t) {
thread * t = (thread*)_t;
connection *c = t->cs;
for (uint64_t i = 0; i < t->connections; i++, c++) {
if (c && 0 < c->fd) reconnect_socket(t, c);
}
}

static int connect_socket(thread *thread, connection *c) {
struct addrinfo *addr = thread->addr;
struct aeEventLoop *loop = thread->loop;
Expand Down Expand Up @@ -653,22 +666,26 @@ static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
connection *c = data;
size_t n;
int read_status = OK;

do {
switch (sock.read(c, &n)) {
switch (read_status = sock.read(c, &n)) {
case OK: break;
case ERROR: goto error;
case RETRY: return;
case READ_EOF: break;
}

if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
c->thread->bytes += n;
} while (n == RECVBUF && sock.readable(c) > 0);

if (read_status == READ_EOF) goto reconnect;
return;

error:
c->thread->errors.read++;
reconnect:
reconnect_socket(c->thread, c);
}

Expand Down
1 change: 1 addition & 0 deletions src/wrk.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef struct {
aeEventLoop *loop;
struct addrinfo *addr;
uint64_t connections;
void (*reconnect_all)(void*);
int interval;
uint64_t stop_at;
uint64_t complete;
Expand Down