diff --git a/tool/net/redbean.c b/tool/net/redbean.c index 64daf4bc450..e3b9ec65ae6 100644 --- a/tool/net/redbean.c +++ b/tool/net/redbean.c @@ -181,12 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon #define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a) #define HeaderEqualCase(H, S) \ SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H)) -#define LockInc(P) \ - atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), +1, \ - memory_order_relaxed) -#define LockDec(P) \ - atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), -1, \ - memory_order_relaxed) +#define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed) +#define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed) #define TRACE_BEGIN \ do { \ @@ -377,19 +373,21 @@ struct Blackhole { } blackhole; static struct Shared { - int workers; - struct timespec nowish; - struct timespec lastreindex; + _Atomic(int) workers; struct timespec lastmeltdown; + struct timespec nowish; char currentdate[32]; struct rusage server; struct rusage children; struct Counters { -#define C(x) long x; +#define C(x) _Atomic(long) x; #include "tool/net/counters.inc" #undef C } c; - pthread_spinlock_t montermlock; + pthread_mutex_t datetime_mu; + pthread_mutex_t server_mu; + pthread_mutex_t children_mu; + pthread_mutex_t lastmeltdown_mu; } *shared; static const char kCounterNames[] = @@ -1350,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) { } static void ReportWorkerExit(int pid, int ws) { - int workers; - workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1; + int workers = + atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release); if (WIFEXITED(ws)) { if (WEXITSTATUS(ws)) { LockInc(&shared->c.failedchildren); @@ -1383,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) { static void HandleWorkerExit(int pid, int ws, struct rusage *ru) { LockInc(&shared->c.connectionshandled); + unassert(!pthread_mutex_lock(&shared->children_mu)); rusage_add(&shared->children, ru); + unassert(!pthread_mutex_unlock(&shared->children_mu)); ReportWorkerExit(pid, ws); ReportWorkerResources(pid, ru); if (hasonprocessdestroy) { @@ -2129,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) { int64_t t; struct tm tm; t = now.tv_sec; - shared->nowish = now; gmtime_r(&t, &tm); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); + shared->nowish = now; FormatHttpDateTime(shared->currentdate, &tm); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); } static int64_t GetGmtOffset(int64_t t) { @@ -2364,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) { p = stpcpy(p, directive); } p = AppendCrlf(p); - return AppendExpires(p, shared->nowish.tv_sec + seconds); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); + long nowish_sec = shared->nowish.tv_sec; + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); + return AppendExpires(p, nowish_sec + seconds); } static inline char *AppendContentLength(char *p, size_t n) { @@ -3103,9 +3108,12 @@ td { padding-right: 3em; }\r\n\ \r\n\ /statusz\r\n\ "); - if (shared->c.connectionshandled) { + if (atomic_load_explicit(&shared->c.connectionshandled, + memory_order_acquire)) { appends(&cpm.outbuf, "says your redbean
\r\n"); + unassert(!pthread_mutex_lock(&shared->children_mu)); AppendResourceReport(&cpm.outbuf, &shared->children, "
\r\n"); + unassert(!pthread_mutex_unlock(&shared->children_mu)); } appends(&cpm.outbuf, "\r\n"); and = ""; @@ -3127,12 +3135,12 @@ td { padding-right: 3em; }\r\n\ } appendf(&cpm.outbuf, "%s%,ld second%s of operation
\r\n", and, y.rem, y.rem == 1 ? "" : "s"); - x = shared->c.messageshandled; + x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld message%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->c.connectionshandled; + x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s handled
\r\n", x, x == 1 ? "" : "s"); - x = shared->workers; + x = atomic_load_explicit(&shared->workers, memory_order_relaxed); appendf(&cpm.outbuf, "%,ld connection%s active
\r\n", x, x == 1 ? "" : "s"); appends(&cpm.outbuf, "\r\n"); @@ -3184,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) { } static void ServeCounters(void) { - const long *c; + const _Atomic(long) *c; const char *s; - for (c = (const long *)&shared->c, s = kCounterNames; *s; + for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s; ++c, s += strlen(s) + 1) { - AppendLong1(s, *c); + AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed)); } } @@ -3201,12 +3209,17 @@ static char *ServeStatusz(void) { AppendLong1("pid", getpid()); AppendLong1("ppid", getppid()); AppendLong1("now", timespec_real().tv_sec); + unassert(!pthread_mutex_lock(&shared->datetime_mu)); AppendLong1("nowish", shared->nowish.tv_sec); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); AppendLong1("gmtoff", gmtoff); AppendLong1("CLK_TCK", CLK_TCK); AppendLong1("startserver", startserver.tv_sec); + unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu)); AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec); - AppendLong1("workers", shared->workers); + unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu)); + AppendLong1("workers", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); AppendLong1("assets.n", assets.n); #ifndef STATIC lua_State *L = GL; @@ -3214,8 +3227,12 @@ static char *ServeStatusz(void) { lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB)); #endif ServeCounters(); + unassert(!pthread_mutex_lock(&shared->server_mu)); AppendRusage("server", &shared->server); + unassert(!pthread_mutex_unlock(&shared->server_mu)); + unassert(!pthread_mutex_lock(&shared->children_mu)); AppendRusage("children", &shared->children); + unassert(!pthread_mutex_unlock(&shared->children_mu)); p = SetStatus(200, "OK"); p = AppendContentType(p, "text/plain"); if (cpm.msg.version >= 11) { @@ -3980,7 +3997,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) { #include "tool/net/fetch.inc" static int LuaGetDate(lua_State *L) { + unassert(!pthread_mutex_lock(&shared->datetime_mu)); lua_pushinteger(L, shared->nowish.tv_sec); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); return 1; } @@ -5034,7 +5053,7 @@ static int LuaProgramTokenBucket(lua_State *L) { npassert(pid != -1); if (!pid) Replenisher(); - ++shared->workers; + atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire); return 0; } @@ -5679,7 +5698,8 @@ static void LogClose(const char *reason) { if (amtread || meltdown || killed) { LockInc(&shared->c.fumbles); INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)", - DescribeClient(), reason, amtread, messageshandled, shared->workers); + DescribeClient(), reason, amtread, messageshandled, + atomic_load_explicit(&shared->workers, memory_order_relaxed)); } else { DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason, messageshandled); @@ -5737,14 +5757,18 @@ Content-Length: 22\r\n\ } static void EnterMeltdownMode(void) { + unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu)); if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown), (struct timespec){1}) < 0) { + unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu)); return; } - WARNF("(srvr) server is melting down (%,d workers)", shared->workers); - LOGIFNEG1(kill(0, SIGUSR2)); shared->lastmeltdown = timespec_real(); - ++shared->c.meltdowns; + pthread_mutex_unlock(&shared->lastmeltdown_mu); + WARNF("(srvr) server is melting down (%,d workers)", + atomic_load_explicit(&shared->workers, memory_order_relaxed)); + LOGIFNEG1(kill(0, SIGUSR2)); + LockInc(&shared->c.meltdowns); } static char *HandlePayloadDisconnect(void) { @@ -5861,7 +5885,9 @@ static void HandleHeartbeat(void) { size_t i; UpdateCurrentDate(timespec_real()); Reindex(); + unassert(!pthread_mutex_lock(&shared->server_mu)); getrusage(RUSAGE_SELF, &shared->server); + unassert(!pthread_mutex_unlock(&shared->server_mu)); #ifndef STATIC CallSimpleHookIfDefined("OnServerHeartbeat"); CollectGarbage(); @@ -6481,7 +6507,9 @@ static bool HandleMessageActual(void) { DEBUGF("(clnt) could not synchronize message stream"); } if (cpm.msg.version >= 10) { + unassert(!pthread_mutex_lock(&shared->datetime_mu)); p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate)); + unassert(!pthread_mutex_unlock(&shared->datetime_mu)); if (!cpm.branded) p = stpcpy(p, serverheader); if (extrahdrs) @@ -6751,7 +6779,9 @@ static int HandleConnection(size_t i) { DEBUGF("(token) can't acquire accept() token for client"); } startconnection = timespec_real(); - if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) { + if (UNLIKELY(maxworkers) && + atomic_load_explicit(&shared->workers, memory_order_relaxed) >= + maxworkers) { EnterMeltdownMode(); SendServiceUnavailable(); close(client); @@ -7346,6 +7376,14 @@ void RedBean(int argc, char *argv[]) { (shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0))); + pthread_mutexattr_t attr; + unassert(!pthread_mutexattr_init(&attr)); + unassert(!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)); + unassert(!pthread_mutex_init(&shared->datetime_mu, &attr)); + unassert(!pthread_mutex_init(&shared->server_mu, &attr)); + unassert(!pthread_mutex_init(&shared->children_mu, &attr)); + unassert(!pthread_mutex_init(&shared->lastmeltdown_mu, &attr)); + unassert(!pthread_mutexattr_destroy(&attr)); if (daemonize) { for (int i = 0; i < 256; ++i) { close(i);