From b40140e6c58272ffbeb2a9c26e4270f461773a68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20Dee=20=28J=C5=8Dshin=29?= Date: Mon, 2 Dec 2024 17:05:38 -0500 Subject: [PATCH] Improve redbean concurrency (#1332) In the course of playing with redbean I was confused about how the state was behaving and then noticed that some stuff is maybe getting edited by multiple processes. I tried to improve things by changing the definition of the counter variables to be explicitly atomic. Claude assures me that most modern Unixes support cross-process atomics, so I just went with it on that front. I also added some mutexes to the shared state to try to synchronize some other things that might get written or read from workers but couldn't be made atomic, mainly the rusage and time values. I could've probably been less granular and just had a global shared-state lock, but I opted to be fairly granular as a starting point. This also reorders the resetting of the lastmeltdown timespec before the SIGUSR2 signal is sent; hopefully this is okay. --- tool/net/redbean.c | 96 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 29 deletions(-) diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 64daf4bc450..e3b9ec65ae6 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -181,12 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon #define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a) #define HeaderEqualCase(H, S) \ SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) -#define LockInc(P) \ - atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), +1, \ - memory_order_relaxed) -#define LockDec(P) \ - atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), -1, \ - memory_order_relaxed) +#define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed) +#define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed) #define TRACE_BEGIN \ do { \ @@ -377,19 +373,21 @@ struct Blackhole { } blackhole; static struct Shared { - int workers; - struct timespec nowish; - struct timespec lastreindex; + _Atomic(int) workers; struct timespec lastmeltdown; + struct timespec nowish; char currentdate[32]; struct rusage server; struct rusage children; struct Counters { -#define C(x) long x; +#define C(x) _Atomic(long) x; #include "tool/net/counters.inc" #undef C } c; - pthread_spinlock_t montermlock; + pthread_mutex_t datetime_mu; + pthread_mutex_t server_mu; + pthread_mutex_t children_mu; + pthread_mutex_t lastmeltdown_mu; } *shared; static const char kCounterNames[] = @@ -1350,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) { } static void ReportWorkerExit(int pid, int ws) { - int workers; - workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1; + int workers = + atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release); if (WIFEXITED(ws)) { if (WEXITSTATUS(ws)) { LockInc(&shared->c.failedchildren); @@ -1383,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) { static void HandleWorkerExit(int pid, int ws, struct rusage *ru) { LockInc(&shared->c.connectionshandled); + unassert(!pthread_mutex_lock(&shared->children_mu)); rusage_add(&shared->children, ru); + unassert(!pthread_mutex_unlock(&shared->children_mu)); ReportWorkerExit(pid, ws); ReportWorkerResources(pid, ru); if (hasonprocessdestroy) { @@ -2129,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) { int64_t t; struct tm tm; t = now.tv_sec; - shared->nowish = now; gmtime_r(&t, &tm); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); + shared->nowish = now; FormatHttpDateTime(shared->currentdate, &tm); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); } static int64_t GetGmtOffset(int64_t t) { @@ -2364,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) { p = stpcpy(p, directive); } p = AppendCrlf(p); - return AppendExpires(p, shared->nowish.tv_sec + seconds); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); + long nowish_sec = shared->nowish.tv_sec; + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); + return AppendExpires(p, nowish_sec + seconds); } static inline char *AppendContentLength(char *p, size_t n) { @@ -3103,9 +3108,12 @@ td { padding-right: 3em; }\r\n\ \r\n\ /statusz\r\n\ "); - if (shared->c.connectionshandled) { + if (atomic_load_explicit(&shared->c.connectionshandled, + memory_order_acquire)) { appends(&cpm.outbuf, "says your redbean
\r\n"); + unassert(!pthread_mutex_lock(&shared->children_mu)); AppendResourceReport(&cpm.outbuf, &shared->children, "
\r\n"); + unassert(!pthread_mutex_unlock(&shared->children_mu)); } appends(&cpm.outbuf, "\r\n"); and = ""; @@ -3127,12 +3135,12 @@ td { padding-right: 3em; }\r\n\ } appendf(&cpm.outbuf, "%s%,ld second%s of operation
\r\n", and, y.rem, y.rem == 1 ? "" : "s"); - x = shared->c.messageshandled; + x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld message%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->c.connectionshandled; + x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->workers; + x = atomic_load_explicit(&shared->workers, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s active
\r\n", x, x == 1 ? "" : "s"); appends(&cpm.outbuf, "\r\n"); @@ -3184,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) { } static void ServeCounters(void) { - const long *c; + const _Atomic(long) *c; const char *s; - for (c = (const long *)&shared->c, s = kCounterNames; *s; + for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s; ++c, s += strlen(s) + 1) { - AppendLong1(s, *c); + AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed)); } } @@ -3201,12 +3209,17 @@ static char *ServeStatusz(void) { AppendLong1("pid", getpid()); AppendLong1("ppid", getppid()); AppendLong1("now", timespec_real().tv_sec); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); AppendLong1("nowish", shared->nowish.tv_sec); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); AppendLong1("gmtoff", gmtoff); AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("startserver", startserver.tv_sec); + unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu)); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); - AppendLong1("workers", shared->workers); + unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu)); + AppendLong1("workers", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); AppendLong1("assets.n", assets.n); #ifndef STATIC lua_State *L = GL; @@ -3214,8 +3227,12 @@ static char *ServeStatusz(void) { lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); #endif ServeCounters(); + unassert(!pthread_mutex_lock(&shared->server_mu)); AppendRusage("server", &shared->server); + unassert(!pthread_mutex_unlock(&shared->server_mu)); + unassert(!pthread_mutex_lock(&shared->children_mu)); AppendRusage("children", &shared->children); + unassert(!pthread_mutex_unlock(&shared->children_mu)); p = SetStatus(200, "OK"); p = AppendContentType(p, "text/plain"); if (cpm.msg.version >= 11) { @@ -3980,7 +3997,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) { #include "tool/net/fetch.inc" static int LuaGetDate(lua_State *L) { + unassert(!pthread_mutex_lock(&shared->datetime_mu)); lua_pushinteger(L, shared->nowish.tv_sec); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); return 1; } @@ -5034,7 +5053,7 @@ static int LuaProgramTokenBucket(lua_State *L) { npassert(pid != -1); if (!pid) Replenisher(); - ++shared->workers; + atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire); return 0; } @@ -5679,7 +5698,8 @@ static void LogClose(const char *reason) { if (amtread || meltdown || killed) { LockInc(&shared->c.fumbles); INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)", - DescribeClient(), reason, amtread, messageshandled, shared->workers); + DescribeClient(), reason, amtread, messageshandled, + atomic_load_explicit(&shared->workers, memory_order_relaxed)); } else { DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason, messageshandled); @@ -5737,14 +5757,18 @@ Content-Length: 22\r\n\ } static void EnterMeltdownMode(void) { + unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu)); if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown), (struct timespec){1}) < 0) { + unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu)); return; } - WARNF("(srvr) server is melting down (%,d workers)", shared->workers); - LOGIFNEG1(kill(0, SIGUSR2)); shared->lastmeltdown = timespec_real(); - ++shared->c.meltdowns; + pthread_mutex_unlock(&shared->lastmeltdown_mu); + WARNF("(srvr) server is melting down (%,d workers)", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); + LOGIFNEG1(kill(0, SIGUSR2)); + LockInc(&shared->c.meltdowns); } static char *HandlePayloadDisconnect(void) { @@ -5861,7 +5885,9 @@ static void HandleHeartbeat(void) { size_t i; UpdateCurrentDate(timespec_real()); Reindex(); + unassert(!pthread_mutex_lock(&shared->server_mu)); getrusage(RUSAGE_SELF, &shared->server); + unassert(!pthread_mutex_unlock(&shared->server_mu)); #ifndef STATIC CallSimpleHookIfDefined("OnServerHeartbeat"); CollectGarbage(); @@ -6481,7 +6507,9 @@ static bool HandleMessageActual(void) { DEBUGF("(clnt) could not synchronize message stream"); } if (cpm.msg.version >= 10) { + unassert(!pthread_mutex_lock(&shared->datetime_mu)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); if (!cpm.branded) p = stpcpy(p, serverheader); if (extrahdrs) @@ -6751,7 +6779,9 @@ static int HandleConnection(size_t i) { DEBUGF("(token) can't acquire accept() token for client"); } startconnection = timespec_real(); - if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) { + if (UNLIKELY(maxworkers) && + atomic_load_explicit(&shared->workers, memory_order_relaxed) >= + maxworkers) { EnterMeltdownMode(); SendServiceUnavailable(); close(client); @@ -7346,6 +7376,14 @@ void RedBean(int argc, char *argv[]) { (shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0))); + pthread_mutexattr_t attr; + unassert(!pthread_mutexattr_init(&attr)); + unassert(!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)); + unassert(!pthread_mutex_init(&shared->datetime_mu, &attr)); + unassert(!pthread_mutex_init(&shared->server_mu, &attr)); + unassert(!pthread_mutex_init(&shared->children_mu, &attr)); + unassert(!pthread_mutex_init(&shared->lastmeltdown_mu, &attr)); + unassert(!pthread_mutexattr_destroy(&attr)); if (daemonize) { for (int i = 0; i < 256; ++i) { close(i);