Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redbean: concurrency improvements #1332

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 67 additions & 29 deletions tool/net/redbean.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,8 @@ __static_yoink("blink_xnu_aarch64"); // is apple silicon
#define HeaderLength(H) (cpm.msg.headers[H].b - cpm.msg.headers[H].a)
#define HeaderEqualCase(H, S) \
SlicesEqualCase(S, strlen(S), HeaderData(H), HeaderLength(H))
#define LockInc(P) \
atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), +1, \
memory_order_relaxed)
#define LockDec(P) \
atomic_fetch_add_explicit((_Atomic(typeof(*(P))) *)(P), -1, \
memory_order_relaxed)
#define LockInc(P) atomic_fetch_add_explicit(P, +1, memory_order_relaxed)
#define LockDec(P) atomic_fetch_add_explicit(P, -1, memory_order_relaxed)

#define TRACE_BEGIN \
do { \
Expand Down Expand Up @@ -377,19 +373,21 @@ struct Blackhole {
} blackhole;

static struct Shared {
int workers;
struct timespec nowish;
struct timespec lastreindex;
_Atomic(int) workers;
struct timespec lastmeltdown;
struct timespec nowish;
char currentdate[32];
struct rusage server;
struct rusage children;
struct Counters {
#define C(x) long x;
#define C(x) _Atomic(long) x;
#include "tool/net/counters.inc"
#undef C
} c;
pthread_spinlock_t montermlock;
pthread_mutex_t datetime_mu;
pthread_mutex_t server_mu;
pthread_mutex_t children_mu;
pthread_mutex_t lastmeltdown_mu;
} *shared;

static const char kCounterNames[] =
Expand Down Expand Up @@ -1350,8 +1348,8 @@ static void CallSimpleHookIfDefined(const char *s) {
}

static void ReportWorkerExit(int pid, int ws) {
int workers;
workers = atomic_fetch_sub((_Atomic(int) *)&shared->workers, 1) - 1;
int workers =
atomic_fetch_sub_explicit(&shared->workers, 1, memory_order_release);
if (WIFEXITED(ws)) {
if (WEXITSTATUS(ws)) {
LockInc(&shared->c.failedchildren);
Expand Down Expand Up @@ -1383,7 +1381,9 @@ static void ReportWorkerResources(int pid, struct rusage *ru) {

static void HandleWorkerExit(int pid, int ws, struct rusage *ru) {
LockInc(&shared->c.connectionshandled);
unassert(!pthread_mutex_lock(&shared->children_mu));
rusage_add(&shared->children, ru);
unassert(!pthread_mutex_unlock(&shared->children_mu));
ReportWorkerExit(pid, ws);
ReportWorkerResources(pid, ru);
if (hasonprocessdestroy) {
Expand Down Expand Up @@ -2129,9 +2129,11 @@ static void UpdateCurrentDate(struct timespec now) {
int64_t t;
struct tm tm;
t = now.tv_sec;
shared->nowish = now;
gmtime_r(&t, &tm);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
shared->nowish = now;
FormatHttpDateTime(shared->currentdate, &tm);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
}

static int64_t GetGmtOffset(int64_t t) {
Expand Down Expand Up @@ -2364,7 +2366,10 @@ static char *AppendCache(char *p, int64_t seconds, char *directive) {
p = stpcpy(p, directive);
}
p = AppendCrlf(p);
return AppendExpires(p, shared->nowish.tv_sec + seconds);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
long nowish_sec = shared->nowish.tv_sec;
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
return AppendExpires(p, nowish_sec + seconds);
}

static inline char *AppendContentLength(char *p, size_t n) {
Expand Down Expand Up @@ -3103,9 +3108,12 @@ td { padding-right: 3em; }\r\n\
<td valign=\"top\">\r\n\
<a href=\"/statusz\">/statusz</a>\r\n\
");
if (shared->c.connectionshandled) {
if (atomic_load_explicit(&shared->c.connectionshandled,
memory_order_acquire)) {
appends(&cpm.outbuf, "says your redbean<br>\r\n");
unassert(!pthread_mutex_lock(&shared->children_mu));
AppendResourceReport(&cpm.outbuf, &shared->children, "<br>\r\n");
unassert(!pthread_mutex_unlock(&shared->children_mu));
}
appends(&cpm.outbuf, "<td valign=\"top\">\r\n");
and = "";
Expand All @@ -3127,12 +3135,12 @@ td { padding-right: 3em; }\r\n\
}
appendf(&cpm.outbuf, "%s%,ld second%s of operation<br>\r\n", and, y.rem,
y.rem == 1 ? "" : "s");
x = shared->c.messageshandled;
x = atomic_load_explicit(&shared->c.messageshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld message%s handled<br>\r\n", x, x == 1 ? "" : "s");
x = shared->c.connectionshandled;
x = atomic_load_explicit(&shared->c.connectionshandled, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s handled<br>\r\n", x,
x == 1 ? "" : "s");
x = shared->workers;
x = atomic_load_explicit(&shared->workers, memory_order_relaxed);
appendf(&cpm.outbuf, "%,ld connection%s active<br>\r\n", x,
x == 1 ? "" : "s");
appends(&cpm.outbuf, "</table>\r\n");
Expand Down Expand Up @@ -3184,11 +3192,11 @@ static void AppendRusage(const char *a, struct rusage *ru) {
}

static void ServeCounters(void) {
const long *c;
const _Atomic(long) *c;
const char *s;
for (c = (const long *)&shared->c, s = kCounterNames; *s;
for (c = (const _Atomic(long) *)&shared->c, s = kCounterNames; *s;
++c, s += strlen(s) + 1) {
AppendLong1(s, *c);
AppendLong1(s, atomic_load_explicit(c, memory_order_relaxed));
}
}

Expand All @@ -3201,21 +3209,30 @@ static char *ServeStatusz(void) {
AppendLong1("pid", getpid());
AppendLong1("ppid", getppid());
AppendLong1("now", timespec_real().tv_sec);
unassert(!pthread_mutex_lock(&shared->datetime_mu));
AppendLong1("nowish", shared->nowish.tv_sec);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
AppendLong1("gmtoff", gmtoff);
AppendLong1("CLK_TCK", CLK_TCK);
AppendLong1("startserver", startserver.tv_sec);
unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu));
AppendLong1("lastmeltdown", shared->lastmeltdown.tv_sec);
AppendLong1("workers", shared->workers);
unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu));
AppendLong1("workers",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
AppendLong1("assets.n", assets.n);
#ifndef STATIC
lua_State *L = GL;
AppendLong1("lua.memory",
lua_gc(L, LUA_GCCOUNT) * 1024 + lua_gc(L, LUA_GCCOUNTB));
#endif
ServeCounters();
unassert(!pthread_mutex_lock(&shared->server_mu));
AppendRusage("server", &shared->server);
unassert(!pthread_mutex_unlock(&shared->server_mu));
unassert(!pthread_mutex_lock(&shared->children_mu));
AppendRusage("children", &shared->children);
unassert(!pthread_mutex_unlock(&shared->children_mu));
p = SetStatus(200, "OK");
p = AppendContentType(p, "text/plain");
if (cpm.msg.version >= 11) {
Expand Down Expand Up @@ -3980,7 +3997,9 @@ static int LuaNilTlsError(lua_State *L, const char *s, int r) {
#include "tool/net/fetch.inc"

static int LuaGetDate(lua_State *L) {
unassert(!pthread_mutex_lock(&shared->datetime_mu));
lua_pushinteger(L, shared->nowish.tv_sec);
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
return 1;
}

Expand Down Expand Up @@ -5034,7 +5053,7 @@ static int LuaProgramTokenBucket(lua_State *L) {
npassert(pid != -1);
if (!pid)
Replenisher();
++shared->workers;
atomic_fetch_add_explicit(&shared->workers, 1, memory_order_acquire);
return 0;
}

Expand Down Expand Up @@ -5679,7 +5698,8 @@ static void LogClose(const char *reason) {
if (amtread || meltdown || killed) {
LockInc(&shared->c.fumbles);
INFOF("(stat) %s %s with %,ld unprocessed and %,d handled (%,d workers)",
DescribeClient(), reason, amtread, messageshandled, shared->workers);
DescribeClient(), reason, amtread, messageshandled,
atomic_load_explicit(&shared->workers, memory_order_relaxed));
} else {
DEBUGF("(stat) %s %s with %,d messages handled", DescribeClient(), reason,
messageshandled);
Expand Down Expand Up @@ -5737,14 +5757,18 @@ Content-Length: 22\r\n\
}

static void EnterMeltdownMode(void) {
unassert(!pthread_mutex_lock(&shared->lastmeltdown_mu));
if (timespec_cmp(timespec_sub(timespec_real(), shared->lastmeltdown),
(struct timespec){1}) < 0) {
unassert(!pthread_mutex_unlock(&shared->lastmeltdown_mu));
return;
}
WARNF("(srvr) server is melting down (%,d workers)", shared->workers);
LOGIFNEG1(kill(0, SIGUSR2));
shared->lastmeltdown = timespec_real();
++shared->c.meltdowns;
pthread_mutex_unlock(&shared->lastmeltdown_mu);
WARNF("(srvr) server is melting down (%,d workers)",
atomic_load_explicit(&shared->workers, memory_order_relaxed));
LOGIFNEG1(kill(0, SIGUSR2));
LockInc(&shared->c.meltdowns);
}

static char *HandlePayloadDisconnect(void) {
Expand Down Expand Up @@ -5861,7 +5885,9 @@ static void HandleHeartbeat(void) {
size_t i;
UpdateCurrentDate(timespec_real());
Reindex();
unassert(!pthread_mutex_lock(&shared->server_mu));
getrusage(RUSAGE_SELF, &shared->server);
unassert(!pthread_mutex_unlock(&shared->server_mu));
#ifndef STATIC
CallSimpleHookIfDefined("OnServerHeartbeat");
CollectGarbage();
Expand Down Expand Up @@ -6481,7 +6507,9 @@ static bool HandleMessageActual(void) {
DEBUGF("(clnt) could not synchronize message stream");
}
if (cpm.msg.version >= 10) {
unassert(!pthread_mutex_lock(&shared->datetime_mu));
p = AppendCrlf(stpcpy(stpcpy(p, "Date: "), shared->currentdate));
unassert(!pthread_mutex_unlock(&shared->datetime_mu));
if (!cpm.branded)
p = stpcpy(p, serverheader);
if (extrahdrs)
Expand Down Expand Up @@ -6751,7 +6779,9 @@ static int HandleConnection(size_t i) {
DEBUGF("(token) can't acquire accept() token for client");
}
startconnection = timespec_real();
if (UNLIKELY(maxworkers) && shared->workers >= maxworkers) {
if (UNLIKELY(maxworkers) &&
atomic_load_explicit(&shared->workers, memory_order_relaxed) >=
maxworkers) {
EnterMeltdownMode();
SendServiceUnavailable();
close(client);
Expand Down Expand Up @@ -7346,6 +7376,14 @@ void RedBean(int argc, char *argv[]) {
(shared = mmap(NULL, ROUNDUP(sizeof(struct Shared), getgransize()),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS,
-1, 0)));
pthread_mutexattr_t attr;
unassert(!pthread_mutexattr_init(&attr));
unassert(!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. The time for this has come, now that PTHREAD_PROCESS_SHARED is pretty much finally fully supported.

unassert(!pthread_mutex_init(&shared->datetime_mu, &attr));
unassert(!pthread_mutex_init(&shared->server_mu, &attr));
unassert(!pthread_mutex_init(&shared->children_mu, &attr));
unassert(!pthread_mutex_init(&shared->lastmeltdown_mu, &attr));
unassert(!pthread_mutexattr_destroy(&attr));
if (daemonize) {
for (int i = 0; i < 256; ++i) {
close(i);
Expand Down