Skip to content

Commit

Permalink
Improve redbean concurrency (#1332)
Browse files Browse the repository at this point in the history
In the course of playing with redbean I was confused about how the state
was behaving and then noticed that some stuff is maybe getting edited by
multiple processes. I tried to improve things by changing the definition
of the counter variables to be explicitly atomic. Claude assures me that
most modern Unixes support cross-process atomics, so I just went with it
on that front.

I also added some mutexes to the shared state to try to synchronize some
other things that might get written or read from workers but couldn't be
made atomic, mainly the rusage and time values. I could've probably been
less granular and just had a global shared-state lock, but I opted to be
fairly granular as a starting point.

This also reorders the resetting of the lastmeltdown timespec before the
SIGUSR2 signal is sent; hopefully this is okay.
  • Loading branch information
mrdomino authored Dec 2, 2024
1 parent 3142758 commit b40140e
Showing 1 changed file with 67 additions and 29 deletions.
96 changes: 67 additions & 29 deletions tool/net/redbean.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon
#define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a)
#define HeaderEqualCase(H, S) \
SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H))
#define LockInc(P) \
atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), +1, \
memory_order_relaxed)
#define LockDec(P) \
atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), -1, \
memory_order_relaxed)
#define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed)
#define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed)

#define TRACE_BEGIN \
do { \
Expand Down Expand Up @@ -377,19 +373,21 @@ struct Blackhole {
} blackhole;

static struct Shared {
int workers;
struct timespec nowish;
struct timespec lastreindex;
_Atomic(int) workers;
struct timespec lastmeltdown;
struct timespec nowish;
char currentdate[32];
struct rusage server;
struct rusage children;
struct Counters {
#define C(x) long x;
#define C(x) _Atomic(long) x;
#include "tool/net/counters.inc"
#undef C
} c;
pthread_spinlock_t montermlock;
pthread_mutex_t datetime_mu;
pthread_mutex_t server_mu;
pthread_mutex_t children_mu;
pthread_mutex_t lastmeltdown_mu;
} *shared;

static const char kCounterNames[] =
Expand Down Expand Up @@ -1350,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) {
}

static void ReportWorkerExit(int pid, int ws) {
int workers;
workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1;
int workers =
atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release);
if (WIFEXITED(ws)) {
if (WEXITSTATUS(ws)) {
LockInc(&shared->c.failedchildren);
Expand Down Expand Up @@ -1383,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) {

static void HandleWorkerExit(int pid, int ws, struct rusage *ru) {
LockInc(&shared->c.connectionshandled);
unassert(!pthread_mutex_lock(&shared->children_mu));
rusage_add(&shared->children, ru);
unassert(!pthread_mutex_unlock(&shared->children_mu));
ReportWorkerExit(pid, ws);
ReportWorkerResources(pid, ru);
if (hasonprocessdestroy) {
Expand Down Expand Up @@ -2129,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) {
int64_t t;
struct tm tm;
t = now.tv_sec;
shared->nowish = now;
gmtime_r(&t, &tm);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
shared->nowish = now;
FormatHttpDateTime(shared->currentdate, &tm);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
}

static int64_t GetGmtOffset(int64_t t) {
Expand Down Expand Up @@ -2364,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) {
p = stpcpy(p, directive);
}
p = AppendCrlf(p);
return AppendExpires(p, shared->nowish.tv_sec + seconds);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
long nowish_sec = shared->nowish.tv_sec;
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
return AppendExpires(p, nowish_sec + seconds);
}

static inline char *AppendContentLength(char *p, size_t n) {
Expand Down Expand Up @@ -3103,9 +3108,12 @@ td { padding-right: 3em; }\r\n\
<td valign=\"top\">\r\n\
<a href=\"/statusz\">/statusz</a>\r\n\
");
if (shared->c.connectionshandled) {
if (atomic_load_explicit(&shared->c.connectionshandled,
memory_order_acquire)) {
appends(&cpm.outbuf, "says your redbean<br>\r\n");
unassert(!pthread_mutex_lock(&shared->children_mu));
AppendResourceReport(&cpm.outbuf, &shared->children, "<br>\r\n");
unassert(!pthread_mutex_unlock(&shared->children_mu));
}
appends(&cpm.outbuf, "<td valign=\"top\">\r\n");
and = "";
Expand All @@ -3127,12 +3135,12 @@ td { padding-right: 3em; }\r\n\
}
appendf(&cpm.outbuf, "%s%,ld second%s of operation<br>\r\n", and, y.rem,
y.rem == 1 ? "" : "s");
x = shared->c.messageshandled;
x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld message%s handled<br>\r\n", x, x == 1 ? "" : "s");
x = shared->c.connectionshandled;
x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s handled<br>\r\n", x,
x == 1 ? "" : "s");
x = shared->workers;
x = atomic_load_explicit(&shared->workers, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s active<br>\r\n", x,
x == 1 ? "" : "s");
appends(&cpm.outbuf, "</table>\r\n");
Expand Down Expand Up @@ -3184,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) {
}

static void ServeCounters(void) {
const long *c;
const _Atomic(long) *c;
const char *s;
for (c = (const long *)&shared->c, s = kCounterNames; *s;
for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s;
++c, s += strlen(s) + 1) {
AppendLong1(s, *c);
AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed));
}
}

Expand All @@ -3201,21 +3209,30 @@ static char *ServeStatusz(void) {
AppendLong1("pid", getpid());
AppendLong1("ppid", getppid());
AppendLong1("now", timespec_real().tv_sec);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
AppendLong1("nowish", shared->nowish.tv_sec);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
AppendLong1("gmtoff", gmtoff);
AppendLong1("CLK_TCK", CLK_TCK);
AppendLong1("startserver", startserver.tv_sec);
unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu));
AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec);
AppendLong1("workers", shared->workers);
unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu));
AppendLong1("workers",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
AppendLong1("assets.n", assets.n);
#ifndef STATIC
lua_State *L = GL;
AppendLong1("lua.memory",
lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB));
#endif
ServeCounters();
unassert(!pthread_mutex_lock(&shared->server_mu));
AppendRusage("server", &shared->server);
unassert(!pthread_mutex_unlock(&shared->server_mu));
unassert(!pthread_mutex_lock(&shared->children_mu));
AppendRusage("children", &shared->children);
unassert(!pthread_mutex_unlock(&shared->children_mu));
p = SetStatus(200, "OK");
p = AppendContentType(p, "text/plain");
if (cpm.msg.version >= 11) {
Expand Down Expand Up @@ -3980,7 +3997,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) {
#include "tool/net/fetch.inc"

static int LuaGetDate(lua_State *L) {
unassert(!pthread_mutex_lock(&shared->datetime_mu));
lua_pushinteger(L, shared->nowish.tv_sec);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
return 1;
}

Expand Down Expand Up @@ -5034,7 +5053,7 @@ static int LuaProgramTokenBucket(lua_State *L) {
npassert(pid != -1);
if (!pid)
Replenisher();
++shared->workers;
atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire);
return 0;
}

Expand Down Expand Up @@ -5679,7 +5698,8 @@ static void LogClose(const char *reason) {
if (amtread || meltdown || killed) {
LockInc(&shared->c.fumbles);
INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)",
DescribeClient(), reason, amtread, messageshandled, shared->workers);
DescribeClient(), reason, amtread, messageshandled,
atomic_load_explicit(&shared->workers, memory_order_relaxed));
} else {
DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason,
messageshandled);
Expand Down Expand Up @@ -5737,14 +5757,18 @@ Content-Length: 22\r\n\
}

static void EnterMeltdownMode(void) {
unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu));
if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown),
(struct timespec){1}) < 0) {
unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu));
return;
}
WARNF("(srvr) server is melting down (%,d workers)", shared->workers);
LOGIFNEG1(kill(0, SIGUSR2));
shared->lastmeltdown = timespec_real();
++shared->c.meltdowns;
pthread_mutex_unlock(&shared->lastmeltdown_mu);
WARNF("(srvr) server is melting down (%,d workers)",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
LOGIFNEG1(kill(0, SIGUSR2));
LockInc(&shared->c.meltdowns);
}

static char *HandlePayloadDisconnect(void) {
Expand Down Expand Up @@ -5861,7 +5885,9 @@ static void HandleHeartbeat(void) {
size_t i;
UpdateCurrentDate(timespec_real());
Reindex();
unassert(!pthread_mutex_lock(&shared->server_mu));
getrusage(RUSAGE_SELF, &shared->server);
unassert(!pthread_mutex_unlock(&shared->server_mu));
#ifndef STATIC
CallSimpleHookIfDefined("OnServerHeartbeat");
CollectGarbage();
Expand Down Expand Up @@ -6481,7 +6507,9 @@ static bool HandleMessageActual(void) {
DEBUGF("(clnt) could not synchronize message stream");
}
if (cpm.msg.version >= 10) {
unassert(!pthread_mutex_lock(&shared->datetime_mu));
p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate));
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
if (!cpm.branded)
p = stpcpy(p, serverheader);
if (extrahdrs)
Expand Down Expand Up @@ -6751,7 +6779,9 @@ static int HandleConnection(size_t i) {
DEBUGF("(token) can't acquire accept() token for client");
}
startconnection = timespec_real();
if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) {
if (UNLIKELY(maxworkers) &&
atomic_load_explicit(&shared->workers, memory_order_relaxed) >=
maxworkers) {
EnterMeltdownMode();
SendServiceUnavailable();
close(client);
Expand Down Expand Up @@ -7346,6 +7376,14 @@ void RedBean(int argc, char *argv[]) {
(shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
-1, 0)));
pthread_mutexattr_t attr;
unassert(!pthread_mutexattr_init(&attr));
unassert(!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED));
unassert(!pthread_mutex_init(&shared->datetime_mu, &attr));
unassert(!pthread_mutex_init(&shared->server_mu, &attr));
unassert(!pthread_mutex_init(&shared->children_mu, &attr));
unassert(!pthread_mutex_init(&shared->lastmeltdown_mu, &attr));
unassert(!pthread_mutexattr_destroy(&attr));
if (daemonize) {
for (int i = 0; i < 256; ++i) {
close(i);
Expand Down

0 comments on commit b40140e

Please sign in to comment.