From 58c6f24d9ddc6e773bbe86bc1a21ac59a96a1798 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20Dee=20=28J=C5=8Dshin=29?= Date: Sun, 1 Dec 2024 21:30:04 -0500 Subject: [PATCH 1/5] redbean: counters are _Atomic --- tool/net/redbean.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 64daf4bc450..660319f0aa6 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -182,11 +182,9 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon #define HeaderEqualCase(H, S) \ SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) #define LockInc(P) \ - atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), +1, \ - memory_order_relaxed) + atomic_fetch_add_explicit(P, +1, memory_order_relaxed) #define LockDec(P) \ - atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), -1, \ - memory_order_relaxed) + atomic_fetch_add_explicit(P, -1, memory_order_relaxed) #define TRACE_BEGIN \ do { \ @@ -385,7 +383,7 @@ static struct Shared { struct rusage server; struct rusage children; struct Counters { -#define C(x) long x; +#define C(x) _Atomic(long) x; #include "tool/net/counters.inc" #undef C } c; From 493249a02f8b0c0f92118fe4901c29a161abfc84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20Dee=20=28J=C5=8Dshin=29?= Date: Sun, 1 Dec 2024 22:03:04 -0500 Subject: [PATCH 2/5] workers is atomic, add some mutexes --- tool/net/redbean.c | 73 +++++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 23 deletions(-) diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 660319f0aa6..23bb0b4a4fd 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -181,10 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon #define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a) #define HeaderEqualCase(H, S) \ SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) -#define LockInc(P) \ - atomic_fetch_add_explicit(P, +1, memory_order_relaxed) -#define LockDec(P) \ - atomic_fetch_add_explicit(P, -1, memory_order_relaxed) +#define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed) +#define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed) #define TRACE_BEGIN \ do { \ @@ -375,7 +373,7 @@ struct Blackhole { } blackhole; static struct Shared { - int workers; + _Atomic(int) workers; struct timespec nowish; struct timespec lastreindex; struct timespec lastmeltdown; @@ -387,7 +385,9 @@ static struct Shared { #include "tool/net/counters.inc" #undef C } c; - pthread_spinlock_t montermlock; + pthread_mutex_t datetime_mu; + pthread_mutex_t server_mu; + pthread_mutex_t children_mu; } *shared; static const char kCounterNames[] = @@ -1348,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) { } static void ReportWorkerExit(int pid, int ws) { - int workers; - workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1; + int workers = + atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release); if (WIFEXITED(ws)) { if (WEXITSTATUS(ws)) { LockInc(&shared->c.failedchildren); @@ -1381,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) { static void HandleWorkerExit(int pid, int ws, struct rusage *ru) { LockInc(&shared->c.connectionshandled); + unassert(!pthread_mutex_lock(&shared->children_mu)); rusage_add(&shared->children, ru); + unassert(!pthread_mutex_unlock(&shared->children_mu)); ReportWorkerExit(pid, ws); ReportWorkerResources(pid, ru); if (hasonprocessdestroy) { @@ -2127,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) { int64_t t; struct tm tm; t = now.tv_sec; - shared->nowish = now; gmtime_r(&t, &tm); + unassert(pthread_mutex_lock(&shared->datetime_mu)); + shared->nowish = now; FormatHttpDateTime(shared->currentdate, &tm); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); } static int64_t GetGmtOffset(int64_t t) { @@ -2362,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) { p = stpcpy(p, directive); } p = AppendCrlf(p); - return AppendExpires(p, shared->nowish.tv_sec + seconds); + unassert(pthread_mutex_lock(&shared->datetime_mu)); + long nowish_sec = shared->nowish.tv_sec; + unassert(pthread_mutex_unlock(&shared->datetime_mu)); + return AppendExpires(p, nowish_sec + seconds); } static inline char *AppendContentLength(char *p, size_t n) { @@ -3101,9 +3108,12 @@ td { padding-right: 3em; }\r\n\ \r\n\ /statusz\r\n\ "); - if (shared->c.connectionshandled) { + if (atomic_load_explicit(&shared->c.connectionshandled, + memory_order_acquire)) { appends(&cpm.outbuf, "says your redbean
\r\n"); + unassert(pthread_mutex_lock(&shared->children_mu)); AppendResourceReport(&cpm.outbuf, &shared->children, "
\r\n"); + unassert(pthread_mutex_unlock(&shared->children_mu)); } appends(&cpm.outbuf, "\r\n"); and = ""; @@ -3125,12 +3135,12 @@ td { padding-right: 3em; }\r\n\ } appendf(&cpm.outbuf, "%s%,ld second%s of operation
\r\n", and, y.rem, y.rem == 1 ? "" : "s"); - x = shared->c.messageshandled; + x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld message%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->c.connectionshandled; + x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->workers; + x = atomic_load_explicit(&shared->workers, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s active
\r\n", x, x == 1 ? "" : "s"); appends(&cpm.outbuf, "\r\n"); @@ -3182,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) { } static void ServeCounters(void) { - const long *c; + const _Atomic(long) *c; const char *s; - for (c = (const long *)&shared->c, s = kCounterNames; *s; + for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s; ++c, s += strlen(s) + 1) { - AppendLong1(s, *c); + AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed)); } } @@ -3199,12 +3209,15 @@ static char *ServeStatusz(void) { AppendLong1("pid", getpid()); AppendLong1("ppid", getppid()); AppendLong1("now", timespec_real().tv_sec); + unassert(pthread_mutex_lock(&shared->datetime_mu)); AppendLong1("nowish", shared->nowish.tv_sec); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); AppendLong1("gmtoff", gmtoff); AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("startserver", startserver.tv_sec); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); - AppendLong1("workers", shared->workers); + AppendLong1("workers", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); AppendLong1("assets.n", assets.n); #ifndef STATIC lua_State *L = GL; @@ -3212,8 +3225,12 @@ static char *ServeStatusz(void) { lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); #endif ServeCounters(); + unassert(pthread_mutex_lock(&shared->server_mu)); AppendRusage("server", &shared->server); + unassert(pthread_mutex_unlock(&shared->server_mu)); + unassert(pthread_mutex_lock(&shared->children_mu)); AppendRusage("children", &shared->children); + unassert(pthread_mutex_unlock(&shared->children_mu)); p = SetStatus(200, "OK"); p = AppendContentType(p, "text/plain"); if (cpm.msg.version >= 11) { @@ -3978,7 +3995,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) { #include "tool/net/fetch.inc" static int LuaGetDate(lua_State *L) { + unassert(pthread_mutex_lock(&shared->datetime_mu)); lua_pushinteger(L, shared->nowish.tv_sec); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); return 1; } @@ -5032,7 +5051,7 @@ static int LuaProgramTokenBucket(lua_State *L) { npassert(pid != -1); if (!pid) Replenisher(); - ++shared->workers; + atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire); return 0; } @@ -5677,7 +5696,8 @@ static void LogClose(const char *reason) { if (amtread || meltdown || killed) { LockInc(&shared->c.fumbles); INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)", - DescribeClient(), reason, amtread, messageshandled, shared->workers); + DescribeClient(), reason, amtread, messageshandled, + atomic_load_explicit(&shared->workers, memory_order_relaxed)); } else { DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason, messageshandled); @@ -5739,10 +5759,11 @@ static void EnterMeltdownMode(void) { (struct timespec){1}) < 0) { return; } - WARNF("(srvr) server is melting down (%,d workers)", shared->workers); + WARNF("(srvr) server is melting down (%,d workers)", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); LOGIFNEG1(kill(0, SIGUSR2)); shared->lastmeltdown = timespec_real(); - ++shared->c.meltdowns; + LockInc(&shared->c.meltdowns); } static char *HandlePayloadDisconnect(void) { @@ -5859,7 +5880,9 @@ static void HandleHeartbeat(void) { size_t i; UpdateCurrentDate(timespec_real()); Reindex(); + unassert(pthread_mutex_lock(&shared->server_mu)); getrusage(RUSAGE_SELF, &shared->server); + unassert(pthread_mutex_unlock(&shared->server_mu)); #ifndef STATIC CallSimpleHookIfDefined("OnServerHeartbeat"); CollectGarbage(); @@ -6479,7 +6502,9 @@ static bool HandleMessageActual(void) { DEBUGF("(clnt) could not synchronize message stream"); } if (cpm.msg.version >= 10) { + unassert(pthread_mutex_lock(&shared->datetime_mu)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); + unassert(pthread_mutex_unlock(&shared->datetime_mu)); if (!cpm.branded) p = stpcpy(p, serverheader); if (extrahdrs) @@ -6749,7 +6774,9 @@ static int HandleConnection(size_t i) { DEBUGF("(token) can't acquire accept() token for client"); } startconnection = timespec_real(); - if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) { + if (UNLIKELY(maxworkers) && + atomic_load_explicit(&shared->workers, memory_order_relaxed) >= + maxworkers) { EnterMeltdownMode(); SendServiceUnavailable(); close(client); From 11ec93ce17b6760b9126e8ecee86e7242b3e2fd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20Dee=20=28J=C5=8Dshin=29?= Date: Sun, 1 Dec 2024 22:10:56 -0500 Subject: [PATCH 3/5] mutex for lastmeltdown --- tool/net/redbean.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 23bb0b4a4fd..a2f815e558b 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -374,9 +374,8 @@ struct Blackhole { static struct Shared { _Atomic(int) workers; - struct timespec nowish; - struct timespec lastreindex; struct timespec lastmeltdown; + struct timespec nowish; char currentdate[32]; struct rusage server; struct rusage children; @@ -388,6 +387,7 @@ static struct Shared { pthread_mutex_t datetime_mu; pthread_mutex_t server_mu; pthread_mutex_t children_mu; + pthread_mutex_t lastmeltdown_mu; } *shared; static const char kCounterNames[] = @@ -3215,7 +3215,9 @@ static char *ServeStatusz(void) { AppendLong1("gmtoff", gmtoff); AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("startserver", startserver.tv_sec); + unassert(pthread_mutex_lock(&shared->lastmeltdown_mu)); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); + unassert(pthread_mutex_unlock(&shared->lastmeltdown_mu)); AppendLong1("workers", atomic_load_explicit(&shared->workers, memory_order_relaxed)); AppendLong1("assets.n", assets.n); @@ -5755,14 +5757,17 @@ Content-Length: 22\r\n\ } static void EnterMeltdownMode(void) { + unassert(pthread_mutex_lock(&shared->lastmeltdown_mu)); if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown), (struct timespec){1}) < 0) { + unassert(pthread_mutex_unlock(&shared->lastmeltdown_mu)); return; } + shared->lastmeltdown = timespec_real(); + pthread_mutex_unlock(&shared->lastmeltdown_mu); WARNF("(srvr) server is melting down (%,d workers)", atomic_load_explicit(&shared->workers, memory_order_relaxed)); LOGIFNEG1(kill(0, SIGUSR2)); - shared->lastmeltdown = timespec_real(); LockInc(&shared->c.meltdowns); } From 0d65b692e1ff592b6848cfd73ca3c4f2f0a09814 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20Dee=20=28J=C5=8Dshin=29?= Date: Sun, 1 Dec 2024 22:33:11 -0500 Subject: [PATCH 4/5] cross-process mutexes --- tool/net/redbean.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tool/net/redbean.c b/tool/net/redbean.c index a2f815e558b..19ea17d1b1c 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -7376,6 +7376,14 @@ void RedBean(int argc, char *argv[]) { (shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0))); + pthread_mutexattr_t attr; + unassert(pthread_mutexattr_init(&attr)); + unassert(pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)); + unassert(pthread_mutex_init(&shared->datetime_mu, &attr)); + unassert(pthread_mutex_init(&shared->server_mu, &attr)); + unassert(pthread_mutex_init(&shared->children_mu, &attr)); + unassert(pthread_mutex_init(&shared->lastmeltdown_mu, &attr)); + unassert(pthread_mutexattr_destroy(&attr)); if (daemonize) { for (int i = 0; i < 256; ++i) { close(i); From 3fb56ecf07157d5b214a011e532b4a749d866a88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steven=20Dee=20=28J=C5=8Dshin=29?= Date: Sun, 1 Dec 2024 22:38:41 -0500 Subject: [PATCH 5/5] fix polarity of unasserts --- tool/net/redbean.c | 58 +++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 19ea17d1b1c..e3b9ec65ae6 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -2130,10 +2130,10 @@ static void UpdateCurrentDate(struct timespec now) { struct tm tm; t = now.tv_sec; gmtime_r(&t, &tm); - unassert(pthread_mutex_lock(&shared->datetime_mu)); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); shared->nowish = now; FormatHttpDateTime(shared->currentdate, &tm); - unassert(pthread_mutex_unlock(&shared->datetime_mu)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); } static int64_t GetGmtOffset(int64_t t) { @@ -2366,9 +2366,9 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) { p = stpcpy(p, directive); } p = AppendCrlf(p); - unassert(pthread_mutex_lock(&shared->datetime_mu)); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); long nowish_sec = shared->nowish.tv_sec; - unassert(pthread_mutex_unlock(&shared->datetime_mu)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); return AppendExpires(p, nowish_sec + seconds); } @@ -3111,9 +3111,9 @@ td { padding-right: 3em; }\r\n\ if (atomic_load_explicit(&shared->c.connectionshandled, memory_order_acquire)) { appends(&cpm.outbuf, "says your redbean
\r\n"); - unassert(pthread_mutex_lock(&shared->children_mu)); + unassert(!pthread_mutex_lock(&shared->children_mu)); AppendResourceReport(&cpm.outbuf, &shared->children, "
\r\n"); - unassert(pthread_mutex_unlock(&shared->children_mu)); + unassert(!pthread_mutex_unlock(&shared->children_mu)); } appends(&cpm.outbuf, "\r\n"); and = ""; @@ -3209,15 +3209,15 @@ static char *ServeStatusz(void) { AppendLong1("pid", getpid()); AppendLong1("ppid", getppid()); AppendLong1("now", timespec_real().tv_sec); - unassert(pthread_mutex_lock(&shared->datetime_mu)); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); AppendLong1("nowish", shared->nowish.tv_sec); - unassert(pthread_mutex_unlock(&shared->datetime_mu)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); AppendLong1("gmtoff", gmtoff); AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("startserver", startserver.tv_sec); - unassert(pthread_mutex_lock(&shared->lastmeltdown_mu)); + unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu)); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); - unassert(pthread_mutex_unlock(&shared->lastmeltdown_mu)); + unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu)); AppendLong1("workers", atomic_load_explicit(&shared->workers, memory_order_relaxed)); AppendLong1("assets.n", assets.n); @@ -3227,12 +3227,12 @@ static char *ServeStatusz(void) { lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); #endif ServeCounters(); - unassert(pthread_mutex_lock(&shared->server_mu)); + unassert(!pthread_mutex_lock(&shared->server_mu)); AppendRusage("server", &shared->server); - unassert(pthread_mutex_unlock(&shared->server_mu)); - unassert(pthread_mutex_lock(&shared->children_mu)); + unassert(!pthread_mutex_unlock(&shared->server_mu)); + unassert(!pthread_mutex_lock(&shared->children_mu)); AppendRusage("children", &shared->children); - unassert(pthread_mutex_unlock(&shared->children_mu)); + unassert(!pthread_mutex_unlock(&shared->children_mu)); p = SetStatus(200, "OK"); p = AppendContentType(p, "text/plain"); if (cpm.msg.version >= 11) { @@ -3997,9 +3997,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) { #include "tool/net/fetch.inc" static int LuaGetDate(lua_State *L) { - unassert(pthread_mutex_lock(&shared->datetime_mu)); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); lua_pushinteger(L, shared->nowish.tv_sec); - unassert(pthread_mutex_unlock(&shared->datetime_mu)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); return 1; } @@ -5757,10 +5757,10 @@ Content-Length: 22\r\n\ } static void EnterMeltdownMode(void) { - unassert(pthread_mutex_lock(&shared->lastmeltdown_mu)); + unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu)); if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown), (struct timespec){1}) < 0) { - unassert(pthread_mutex_unlock(&shared->lastmeltdown_mu)); + unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu)); return; } shared->lastmeltdown = timespec_real(); @@ -5885,9 +5885,9 @@ static void HandleHeartbeat(void) { size_t i; UpdateCurrentDate(timespec_real()); Reindex(); - unassert(pthread_mutex_lock(&shared->server_mu)); + unassert(!pthread_mutex_lock(&shared->server_mu)); getrusage(RUSAGE_SELF, &shared->server); - unassert(pthread_mutex_unlock(&shared->server_mu)); + unassert(!pthread_mutex_unlock(&shared->server_mu)); #ifndef STATIC CallSimpleHookIfDefined("OnServerHeartbeat"); CollectGarbage(); @@ -6507,9 +6507,9 @@ static bool HandleMessageActual(void) { DEBUGF("(clnt) could not synchronize message stream"); } if (cpm.msg.version >= 10) { - unassert(pthread_mutex_lock(&shared->datetime_mu)); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); - unassert(pthread_mutex_unlock(&shared->datetime_mu)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); if (!cpm.branded) p = stpcpy(p, serverheader); if (extrahdrs) @@ -7377,13 +7377,13 @@ void RedBean(int argc, char *argv[]) { PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0))); pthread_mutexattr_t attr; - unassert(pthread_mutexattr_init(&attr)); - unassert(pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)); - unassert(pthread_mutex_init(&shared->datetime_mu, &attr)); - unassert(pthread_mutex_init(&shared->server_mu, &attr)); - unassert(pthread_mutex_init(&shared->children_mu, &attr)); - unassert(pthread_mutex_init(&shared->lastmeltdown_mu, &attr)); - unassert(pthread_mutexattr_destroy(&attr)); + unassert(!pthread_mutexattr_init(&attr)); + unassert(!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)); + unassert(!pthread_mutex_init(&shared->datetime_mu, &attr)); + unassert(!pthread_mutex_init(&shared->server_mu, &attr)); + unassert(!pthread_mutex_init(&shared->children_mu, &attr)); + unassert(!pthread_mutex_init(&shared->lastmeltdown_mu, &attr)); + unassert(!pthread_mutexattr_destroy(&attr)); if (daemonize) { for (int i = 0; i < 256; ++i) { close(i);