Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebase heartbeat branch #428

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/hashkit/nc_ketama.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ ketama_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == FAIL_STATUS_NORMAL) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand All @@ -104,7 +100,7 @@ ketama_update(struct server_pool *pool)
ASSERT(server->weight > 0);

/* count weight only for live servers */
if (!pool->auto_eject_hosts || server->next_retry <= now) {
if (!pool->auto_eject_hosts || server->fail == 0) {
total_weight += server->weight;
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/hashkit/nc_modula.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ modula_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == FAIL_STATUS_NORMAL) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand All @@ -68,7 +64,7 @@ modula_update(struct server_pool *pool)
ASSERT(server->weight > 0);

/* count weight only for live servers */
if (!pool->auto_eject_hosts || server->next_retry <= now) {
if (!pool->auto_eject_hosts || server->fail == FAIL_STATUS_NORMAL) {
total_weight += server->weight;
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/hashkit/nc_random.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ random_update(struct server_pool *pool)
struct server *server = array_get(&pool->server, server_index);

if (pool->auto_eject_hosts) {
if (server->next_retry <= now) {
server->next_retry = 0LL;
if (server->fail == FAIL_STATUS_NORMAL) {
nlive_server++;
} else if (pool->next_rebuild == 0LL ||
server->next_retry < pool->next_rebuild) {
pool->next_rebuild = server->next_retry;
}
} else {
nlive_server++;
Expand Down
5 changes: 5 additions & 0 deletions src/nc_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,8 @@ client_close(struct context *ctx, struct conn *conn)

conn_put(conn);
}

void
client_restore(struct context *ctx, struct conn *conn)
{
}
1 change: 1 addition & 0 deletions src/nc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ bool client_active(struct conn *conn);
void client_ref(struct conn *conn, void *owner);
void client_unref(struct conn *conn);
void client_close(struct context *ctx, struct conn *conn);
void client_restore(struct context *ctx, struct conn *conn);

#endif
9 changes: 9 additions & 0 deletions src/nc_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ conf_server_each_transform(void *elem, void *data)

s->next_retry = 0LL;
s->failure_count = 0;
s->fail = FAIL_STATUS_NORMAL;

log_debug(LOG_VERB, "transform to server %"PRIu32" '%.*s'",
s->idx, s->pname.len, s->pname.data);
Expand Down Expand Up @@ -1136,6 +1137,14 @@ conf_pre_validate(struct conf *cf)
return NC_OK;
}

static int
conf_server_pname_cmp(const void *t1, const void *t2)
{
const struct conf_server *s1 = t1, *s2 = t2;

return string_compare(&s1->pname, &s2->pname);
}

static int
conf_server_name_cmp(const void *t1, const void *t2)
{
Expand Down
3 changes: 3 additions & 0 deletions src/nc_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ conn_get(void *owner, bool client, bool redis)

conn->close = client_close;
conn->active = client_active;
conn->restore = client_restore;

conn->ref = client_ref;
conn->unref = client_unref;
Expand Down Expand Up @@ -221,6 +222,7 @@ conn_get(void *owner, bool client, bool redis)

conn->close = server_close;
conn->active = server_active;
conn->restore = server_restore;

conn->ref = server_ref;
conn->unref = server_unref;
Expand Down Expand Up @@ -269,6 +271,7 @@ conn_get_proxy(void *owner)

conn->close = proxy_close;
conn->active = NULL;
conn->restore = proxy_restore;

conn->ref = proxy_ref;
conn->unref = proxy_unref;
Expand Down
3 changes: 3 additions & 0 deletions src/nc_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ typedef bool (*conn_active_t)(struct conn *);

typedef void (*conn_ref_t)(struct conn *, void *);
typedef void (*conn_unref_t)(struct conn *);
typedef void (*conn_restore_t)(struct context *, struct conn *);

typedef void (*conn_msgq_t)(struct context *, struct conn *, struct msg *);
typedef void (*conn_post_connect_t)(struct context *ctx, struct conn *, struct server *server);
Expand Down Expand Up @@ -60,6 +61,7 @@ struct conn {
conn_send_done_t send_done; /* write done handler */
conn_close_t close; /* close handler */
conn_active_t active; /* active? handler */
conn_restore_t restore; /* restore handler */
conn_post_connect_t post_connect; /* post connect handler */
conn_swallow_msg_t swallow_msg; /* react on messages to be swallowed */

Expand Down Expand Up @@ -105,5 +107,6 @@ uint32_t conn_ncurr_conn(void);
uint64_t conn_ntotal_conn(void);
uint32_t conn_ncurr_cconn(void);
bool conn_authenticated(struct conn *conn);
rstatus_t event_add_out_with_conn(struct context *ctx, struct conn *conn, struct msg *msg);

#endif
73 changes: 71 additions & 2 deletions src/nc_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@

static uint32_t ctx_id; /* context generation */

static void
core_failed_servers_init(struct context *ctx)
{
int i;

for (i = 0; i < 2; i++) {
array_init(&(ctx->failed_servers[i]), 10, sizeof(struct server *));
}
}

static void
core_failed_servers_deinit(struct context *ctx)
{
uint32_t i, n, nsize;

for (i = 0; i < 2; i++) {
nsize = array_n(&(ctx->failed_servers[i]));
for (n = 0; n < nsize; n++) {
array_pop(&(ctx->failed_servers[n]));
}
array_deinit(&(ctx->failed_servers[n]));
}
}

static rstatus_t
core_calc_connections(struct context *ctx)
{
Expand Down Expand Up @@ -60,6 +84,11 @@ core_ctx_create(struct instance *nci)
ctx->stats = NULL;
ctx->evb = NULL;
array_null(&ctx->pool);
array_null(&(ctx->failed_servers[0]));
array_null(&(ctx->failed_servers[1]));
ctx->failed_idx = 0;
ctx->fails = &(ctx->failed_servers[0]);

ctx->max_timeout = nci->stats_interval;
ctx->timeout = ctx->max_timeout;
ctx->max_nfd = 0;
Expand Down Expand Up @@ -93,6 +122,8 @@ core_ctx_create(struct instance *nci)
return NULL;
}

core_failed_servers_init(ctx);

/* create stats per server pool */
ctx->stats = stats_create(nci->stats_port, nci->stats_addr, nci->stats_interval,
nci->hostname, &ctx->pool);
Expand Down Expand Up @@ -261,6 +292,41 @@ core_error(struct context *ctx, struct conn *conn)
core_close(ctx, conn);
}

static void
retry_connection(struct context *ctx)
{
struct array *servers;
int idx;
struct server *server;
int64_t now;
uint32_t i, nsize;
rstatus_t status;

servers = ctx->fails;
idx = (ctx->failed_idx == 0) ? 1 : 0;

ctx->failed_idx = idx;
ctx->fails = &(ctx->failed_servers[idx]);

now = nc_usec_now();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can move this below the if (nsize == 0) to avoid the call to gettimeofday when no servers fail

nsize = array_n(servers);
if (nsize == 0) {
return;
}

for (i = 0; i < nsize; i++) {
server = *(struct server **)array_pop(servers);
if (server->next_retry == 0 || server->next_retry < now) {
status = server_reconnect(ctx, server);
if (status != NC_OK) {
add_failed_server(ctx, server);
}
} else {
add_failed_server(ctx, server);
}
}
}

static void
core_timeout(struct context *ctx)
{
Expand All @@ -272,14 +338,14 @@ core_timeout(struct context *ctx)
msg = msg_tmo_min();
if (msg == NULL) {
ctx->timeout = ctx->max_timeout;
return;
break;
}

/* skip over req that are in-error or done */

if (msg->error || msg->done) {
msg_tmo_delete(msg);
continue;
break;
Copy link
Collaborator

@TysonAndre TysonAndre Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intent of changing this from a continue to a break - It seems to work anyway, but if there are a large number of timeouts should this try to process those first for efficiency and close connections by continuing?

Should the return below also be a break;?

(it seems like in the typical case, msg is null or reprocessed because it's removed before it times out, so this doesn't matter too much)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems possibly concerning - what would happen if these still belonged to a connection (e.g. multiple heartbeats), and we retried a connection before processing some requests that had also errored on that connection.

I guess that break; may make it less likely to call retry_connection on the same server twice, though that should be a different check.

I see that retry_connection will call add_failed_server. Should add_failed_server iterate over the array to check if something already marked the server as failed (e.g. if multiple heartbeats were sent and got multiple errors, e.g. a server that was connected to but immediately resulted in a protocol error)

}

/*
Expand All @@ -304,6 +370,8 @@ core_timeout(struct context *ctx)

core_close(ctx, conn);
}

retry_connection(ctx);
}

rstatus_t
Expand All @@ -324,6 +392,7 @@ core_core(void *arg, uint32_t events)
conn->client ? 'c' : (conn->proxy ? 'p' : 's'), conn->sd);

conn->events = events;
conn->restore(ctx, conn);

/* error takes precedence over read | write */
if (events & EVENT_ERR) {
Expand Down
4 changes: 4 additions & 0 deletions src/nc_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ struct context {
struct stats *stats; /* stats */

struct array pool; /* server_pool[] */
struct array failed_servers[2]; /* failed servers */
struct array *fails; /* ref of current fails server */

int failed_idx; /* current idx for failed servers */
struct event_base *evb; /* event base */
int max_timeout; /* max timeout in msec */
int timeout; /* timeout in msec */
Expand Down
5 changes: 5 additions & 0 deletions src/nc_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,8 @@ proxy_recv(struct context *ctx, struct conn *conn)

return NC_OK;
}

void
proxy_restore(struct context *ctx, struct conn *conn)
{
}
1 change: 1 addition & 0 deletions src/nc_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ rstatus_t proxy_each_deinit(void *elem, void *data);
rstatus_t proxy_init(struct context *ctx);
void proxy_deinit(struct context *ctx);
rstatus_t proxy_recv(struct context *ctx, struct conn *conn);
void proxy_restore(struct context *ctx, struct conn *conn);

#endif
10 changes: 10 additions & 0 deletions src/nc_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,12 @@ static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
struct msg *pmsg;
struct server *server;

ASSERT(!conn->client && !conn->proxy);

server = (struct server *)conn->owner;

if (msg_empty(msg)) {
ASSERT(conn->rmsg == NULL);
log_debug(LOG_VERB, "filter empty rsp %"PRIu64" on s %d", msg->id,
Expand Down Expand Up @@ -204,6 +207,13 @@ rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
}

if (pmsg->swallow) {
if (server->fail == FAIL_STATUS_ERR_TRY_HEARTBEAT) {
struct conn *c_conn;

c_conn = pmsg->owner;
server_restore_from_heartbeat(server, c_conn);
}

conn->swallow_msg(conn, pmsg, msg);

conn->dequeue_outq(ctx, conn, pmsg);
Expand Down
Loading