From 05197afca29a1cf1dae805a0e916c3ec13e43cf3 Mon Sep 17 00:00:00 2001 From: Justine Tunney Date: Fri, 7 Oct 2022 03:11:07 -0700 Subject: [PATCH] Do some work on TurfWar --- libc/calls/sigsuspend.c | 2 +- net/turfwar/turfwar.c | 442 +++++++++++++++++---------- test/libc/calls/tkill_test.c | 57 ++++ test/tool/net/sqlite_test.c | 115 ++++++- test/tool/net/test.mk | 3 +- third_party/sqlite3/fts3_write.c | 1 + third_party/sqlite3/mutex.internal.h | 41 +++ third_party/sqlite3/os_unix.c | 3 + third_party/sqlite3/pcache1.c | 2 + third_party/sqlite3/sqlite3.h | 2 - third_party/sqlite3/sqlite3.mk | 6 + tool/net/help.txt | 2 +- 12 files changed, 497 insertions(+), 179 deletions(-) create mode 100644 third_party/sqlite3/mutex.internal.h diff --git a/libc/calls/sigsuspend.c b/libc/calls/sigsuspend.c index fabb66a92ee..529ef7da534 100644 --- a/libc/calls/sigsuspend.c +++ b/libc/calls/sigsuspend.c @@ -32,7 +32,7 @@ #include "libc/sysv/errfuns.h" /** - * Blocks until SIG ∉ MASK is delivered to process. + * Blocks until SIG ∉ MASK is delivered to thread. * * This temporarily replaces the signal mask until a signal that it * doesn't contain is delivered. diff --git a/net/turfwar/turfwar.c b/net/turfwar/turfwar.c index 020dd427d4a..1567ab10477 100644 --- a/net/turfwar/turfwar.c +++ b/net/turfwar/turfwar.c @@ -29,6 +29,7 @@ #include "libc/errno.h" #include "libc/fmt/conv.h" #include "libc/fmt/itoa.h" +#include "libc/intrin/atomic.h" #include "libc/intrin/bits.h" #include "libc/intrin/kprintf.h" #include "libc/intrin/strace.internal.h" @@ -86,24 +87,27 @@ * @fileoverview production webserver for turfwar online game */ -#define PORT 8080 // default server listening port -#define WORKERS 1001 // size of http client thread pool -#define SUPERVISE_MS 1000 // how often to stat() asset files -#define KEEPALIVE_MS 60000 // max time to keep idle conn open -#define MELTALIVE_MS 2000 // panic keepalive under heavy load -#define DATE_UPDATE_MS 500 // how often to do tzdata crunching -#define SCORE_UPDATE_MS 90000 // how often to regenerate /score -#define SCORE_H_UPDATE_MS 10000 // how often to regenerate /score/hour -#define SCORE_D_UPDATE_MS 15000 // how often to regenerate /score/day -#define SCORE_W_UPDATE_MS 30000 // how often to regenerate /score/week -#define SCORE_M_UPDATE_MS 60000 // how often to regenerate /score/month -#define CLAIM_DEADLINE_MS 50 // how long /claim may block if queue is full -#define PANIC_LOAD .85 // meltdown if this percent of pool connected -#define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown -#define QUEUE_MAX 800 // maximum pending claim items in queue -#define BATCH_MAX 64 // max claims to insert per transaction -#define NICK_MAX 40 // max length of user nickname string -#define MSG_BUF 512 // small response lookaside +#define PORT 8080 // default server listening port +#define CPUS 256 // number of cpus to actually use +#define WORKERS 1000 // size of http client thread pool +#define SUPERVISE_MS 1000 // how often to stat() asset files +#define KEEPALIVE_MS 60000 // max time to keep idle conn open +#define MELTALIVE_MS 2000 // panic keepalive under heavy load +#define SCORE_UPDATE_MS 90000 // how often to regenerate /score +#define SCORE_H_UPDATE_MS 10000 // how often to regenerate /score/hour +#define SCORE_D_UPDATE_MS 15000 // how often to regenerate /score/day +#define SCORE_W_UPDATE_MS 30000 // how often to regenerate /score/week +#define SCORE_M_UPDATE_MS 60000 // how often to regenerate /score/month +#define ACCEPT_DEADLINE_MS 100 // how long accept() can take to find worker +#define CLAIM_DEADLINE_MS 100 // how long /claim may block if queue is full +#define CONCERN_LOAD .75 // avoid keepalive, upon this connection load +#define PANIC_LOAD .85 // meltdown if this percent of pool connected +#define PANIC_MSGS 10 // msgs per conn can't exceed it in meltdown +#define QUEUE_MAX 800 // maximum pending claim items in queue +#define BATCH_MAX 64 // max claims to insert per transaction +#define NICK_MAX 40 // max length of user nickname string +#define SOCK_MAX 100 // max length of socket queue +#define MSG_BUF 512 // small response lookaside #define INBUF_SIZE PAGESIZE #define OUTBUF_SIZE 8192 @@ -229,11 +233,11 @@ int g_workers = WORKERS; int g_keepalive = KEEPALIVE_MS; // lifecycle vars +pthread_t g_listener; nsync_time g_started; nsync_counter g_ready; -nsync_note g_shutdown; -nsync_note g_terminate; atomic_int g_connections; +nsync_note g_shutdown[3]; // whitebox metrics atomic_long g_accepts; @@ -242,6 +246,7 @@ atomic_long g_proxied; atomic_long g_messages; atomic_long g_memfails; atomic_long g_sysfails; +atomic_long g_rejected; atomic_long g_unproxied; atomic_long g_readfails; atomic_long g_notfounds; @@ -257,8 +262,10 @@ atomic_long g_plainclaims; atomic_long g_imageclaims; atomic_long g_invalidnames; atomic_long g_ipv6forwards; -atomic_long g_claimrequests; atomic_long g_assetrequests; +atomic_long g_claimrequests; +atomic_long g_claimsenqueued; +atomic_long g_claimsprocessed; atomic_long g_statuszrequests; // http worker objects @@ -297,6 +304,20 @@ struct Assets { struct Asset favicon; } g_asset; +// queues ListenWorker() to HttpWorker() +struct Clients { + int pos; + int count; + nsync_mu mu; + nsync_cv non_full; + nsync_cv non_empty; + struct Client { + int sock; + uint32_t size; + struct sockaddr_in addr; + } data[SOCK_MAX]; +} g_clients; + // queues /claim to ClaimWorker() struct Claims { int pos; @@ -316,25 +337,30 @@ ssize_t Write(int fd, const char *s) { return write(fd, s, strlen(s)); } +// turns relative timeout into an absolute timeout +struct timespec WaitFor(int millis) { + return _timespec_add(_timespec_real(), _timespec_frommillis(millis)); +} + // helper functions for check macro implementation bool CheckMem(const char *file, int line, void *ptr) { if (ptr) return true; - kprintf("%s:%d: out of memory: %s\n", file, line, strerror(errno)); + kprintf("%s:%d: %P: out of memory: %s\n", file, line, strerror(errno)); return false; } bool CheckSys(const char *file, int line, long rc) { if (rc != -1) return true; - kprintf("%s:%d: %s\n", file, line, strerror(errno)); + kprintf("%s:%d: %P: %s\n", file, line, strerror(errno)); return false; } bool CheckSql(const char *file, int line, int rc) { if (rc == SQLITE_OK) return true; - kprintf("%s:%d: %s\n", file, line, sqlite3_errstr(rc)); + kprintf("%s:%d: %P: %s\n", file, line, sqlite3_errstr(rc)); return false; } bool CheckDb(const char *file, int line, int rc, sqlite3 *db) { if (rc == SQLITE_OK) return true; - kprintf("%s:%d: %s: %s\n", file, line, sqlite3_errstr(rc), + kprintf("%s:%d: %P: %s: %s\n", file, line, sqlite3_errstr(rc), sqlite3_errmsg(db)); return false; } @@ -398,7 +424,7 @@ char *FormatUnixHttpDateTime(char *s, int64_t t) { void UpdateNow(void) { int64_t secs; struct tm tm; - clock_gettime(CLOCK_REALTIME, &g_nowish.ts); + g_nowish.ts = _timespec_real(); secs = g_nowish.ts.tv_sec; gmtime_r(&secs, &tm); //!//!//!//!//!//!//!//!//!//!//!//!//!/ @@ -420,6 +446,57 @@ char *FormatDate(char *p) { return p; } +bool AddClient(struct Clients *q, const struct Client *v, nsync_time dead) { + bool wake = false; + bool added = false; + nsync_mu_lock(&q->mu); + while (q->count == ARRAYLEN(q->data)) { + if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, + g_shutdown[0])) { + break; // must be ETIMEDOUT or ECANCELED + } + } + if (q->count != ARRAYLEN(q->data)) { + int i = q->pos + q->count; + if (ARRAYLEN(q->data) <= i) i -= ARRAYLEN(q->data); + memcpy(q->data + i, v, sizeof(*v)); + if (!q->count) wake = true; + q->count++; + added = true; + } + nsync_mu_unlock(&q->mu); + if (wake) { + nsync_cv_broadcast(&q->non_empty); + } + return added; +} + +int GetClient(struct Clients *q, struct Client *out) { + int got = 0; + int len = 1; + nsync_mu_lock(&q->mu); + while (!q->count) { + if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, + nsync_time_no_deadline, g_shutdown[1])) { + break; // must be ECANCELED + } + } + while (got < len && q->count) { + memcpy(out + got, q->data + q->pos, sizeof(*out)); + if (q->count == ARRAYLEN(q->data)) { + nsync_cv_broadcast(&q->non_full); + } + ++got; + q->pos++; + q->count--; + if (q->pos == ARRAYLEN(q->data)) { + q->pos = 0; + } + } + nsync_mu_unlock(&q->mu); + return got; +} + // inserts ip:name claim into blocking message queue // may be interrupted by absolute deadline // may be cancelled by server shutdown @@ -428,7 +505,8 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { bool added = false; nsync_mu_lock(&q->mu); while (q->count == ARRAYLEN(q->data)) { - if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, g_shutdown)) { + if (nsync_cv_wait_with_deadline(&q->non_full, &q->mu, dead, + g_shutdown[1])) { break; // must be ETIMEDOUT or ECANCELED } } @@ -448,14 +526,14 @@ bool AddClaim(struct Claims *q, const struct Claim *v, nsync_time dead) { } // removes batch of ip:name claims from blocking message queue -// may be interrupted by absolute deadline -// may be cancelled by server termination -int GetClaims(struct Claims *q, struct Claim *out, int len, nsync_time dead) { +// has no deadline or cancellation; enqueued must be processed +int GetClaims(struct Claims *q, struct Claim *out, int len) { int got = 0; nsync_mu_lock(&q->mu); while (!q->count) { - if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, dead, g_terminate)) { - break; // must be ETIMEDOUT or ECANCELED + if (nsync_cv_wait_with_deadline(&q->non_empty, &q->mu, + nsync_time_no_deadline, g_shutdown[2])) { + break; // must be ECANCELED } } while (got < len && q->count) { @@ -516,26 +594,30 @@ void FreeSafeBuffer(void *p) { } void OnlyRunOnCpu(int i) { + int n; cpu_set_t cpus; - if (GetCpuCount() > i + 1) { - CPU_ZERO(&cpus); - CPU_SET(i, &cpus); - CHECK_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus)); - } + _Static_assert(CPUS > 0, ""); + n = GetCpuCount(); + n = MIN(CPUS, n); + i = MIN(i, n - 1); + CPU_ZERO(&cpus); + CPU_SET(i, &cpus); + CHECK_NE(0, CPU_COUNT(&cpus)); + pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus); } void DontRunOnFirstCpus(int i) { int n; cpu_set_t cpus; - if ((n = GetCpuCount()) > 1) { - CPU_ZERO(&cpus); - for (; i < n; ++i) { - CPU_SET(i, &cpus); - } - CHECK_EQ(0, pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus)); - } else { - notpossible; + _Static_assert(CPUS > 0, ""); + n = GetCpuCount(); + n = MIN(CPUS, n); + i = MIN(i, n - 1); + CPU_ZERO(&cpus); + for (; i < n; ++i) { + CPU_SET(i, &cpus); } + pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus); } // signals by default get delivered to any random thread @@ -565,9 +647,9 @@ char *Statusz(char *p, const char *s, long x) { // public /statusz endpoint for monitoring server internals void ServeStatusz(int client, char *outbuf) { char *p; - nsync_time now; struct rusage ru; - now = nsync_time_now(); + struct timespec now; + now = _timespec_real(); p = outbuf; p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" @@ -575,7 +657,7 @@ void ServeStatusz(int client, char *outbuf) { "Connection: close\r\n" "\r\n"); p = Statusz(p, "qps", - g_messages / MAX(1, nsync_time_sub(now, g_started).tv_sec)); + g_messages / MAX(1, _timespec_sub(now, g_started).tv_sec)); p = Statusz(p, "started", g_started.tv_sec); p = Statusz(p, "now", now.tv_sec); p = Statusz(p, "connections", g_connections); @@ -586,6 +668,7 @@ void ServeStatusz(int client, char *outbuf) { p = Statusz(p, "proxied", g_proxied); p = Statusz(p, "memfails", g_memfails); p = Statusz(p, "sysfails", g_sysfails); + p = Statusz(p, "rejected", g_rejected); p = Statusz(p, "unproxied", g_unproxied); p = Statusz(p, "readfails", g_readfails); p = Statusz(p, "notfounds", g_notfounds); @@ -601,8 +684,10 @@ void ServeStatusz(int client, char *outbuf) { p = Statusz(p, "imageclaims", g_imageclaims); p = Statusz(p, "invalidnames", g_invalidnames); p = Statusz(p, "ipv6forwards", g_ipv6forwards); - p = Statusz(p, "claimrequests", g_claimrequests); p = Statusz(p, "assetrequests", g_assetrequests); + p = Statusz(p, "claimrequests", g_claimrequests); + p = Statusz(p, "claimsenqueued", g_claimsenqueued); + p = Statusz(p, "claimsprocessed", g_claimsprocessed); p = Statusz(p, "statuszrequests", g_statuszrequests); if (!getrusage(RUSAGE_SELF, &ru)) { p = Statusz(p, "ru_utime.tv_sec", ru.ru_utime.tv_sec); @@ -627,56 +712,74 @@ void ServeStatusz(int client, char *outbuf) { write(client, outbuf, p - outbuf); } -// make thousands of http client handler threads -// load balance incoming connections for port 8080 across all threads -// hangup on any browser clients that lag for more than a few seconds -void *HttpWorker(void *arg) { +void *ListenWorker(void *arg) { int server; int yes = 1; - int id = (intptr_t)arg; - char *msgbuf = _gc(xmalloc(MSG_BUF)); - char *inbuf = NewSafeBuffer(INBUF_SIZE); - char *outbuf = NewSafeBuffer(OUTBUF_SIZE); + struct Client client; struct timeval timeo = {g_keepalive / 1000, g_keepalive % 1000}; - struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage))); struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(g_port)}; - - BlockSignals(); - DontRunOnFirstCpus(1); + AllowSigusr1(); + OnlyRunOnCpu(0); + pthread_setname_np(pthread_self(), "Listener"); CHECK_NE(-1, (server = socket(AF_INET, SOCK_STREAM, 0))); - pthread_setname_np(pthread_self(), _gc(xasprintf("HTTP #%d", id))); setsockopt(server, SOL_SOCKET, SO_RCVTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo)); setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); - setsockopt(server, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); setsockopt(server, SOL_TCP, TCP_FASTOPEN, &yes, sizeof(yes)); setsockopt(server, SOL_TCP, TCP_QUICKACK, &yes, sizeof(yes)); + setsockopt(server, SOL_TCP, TCP_NODELAY, &yes, sizeof(yes)); CHECK_NE(-1, bind(server, &addr, sizeof(addr))); CHECK_NE(-1, listen(server, 1)); + while (!nsync_note_is_notified(g_shutdown[0])) { + client.size = sizeof(client.addr); + client.sock = accept(server, (struct sockaddr *)&client.addr, &client.size); + if (client.sock == -1) { + if (errno != EAGAIN) { // spinning on SO_RCVTIMEO + ++g_acceptfails; + } + continue; + } + if (!AddClient(&g_clients, &client, WaitFor(ACCEPT_DEADLINE_MS))) { + ++g_rejected; + LOG("502 Accept Queue Full\n"); + Write(client.sock, "HTTP/1.1 502 Accept Queue Full\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n" + "\r\n" + "Accept Queue Full\n"); + close(client.sock); + } + } + close(server); + nsync_note_notify(g_shutdown[1]); + return 0; +} + +// make thousands of http client handler threads +// load balance incoming connections for port 8080 across all threads +// hangup on any browser clients that lag for more than a few seconds +void *HttpWorker(void *arg) { + struct Client client; + int id = (intptr_t)arg; + char *msgbuf = _gc(xmalloc(MSG_BUF)); + char *inbuf = NewSafeBuffer(INBUF_SIZE); + char *outbuf = NewSafeBuffer(OUTBUF_SIZE); + struct HttpMessage *msg = _gc(xmalloc(sizeof(struct HttpMessage))); + + BlockSignals(); + DontRunOnFirstCpus(1); + pthread_setname_np(pthread_self(), _gc(xasprintf("HTTP%d", id))); // connection loop - while (!nsync_note_is_notified(g_shutdown)) { + while (GetClient(&g_clients, &client)) { struct Data d; struct Url url; ssize_t got, sent; uint32_t ip, clientip; - uint32_t clientaddrsize; - int client, inmsglen, outmsglen; + int inmsglen, outmsglen; char ipbuf[32], *p, *q, cashbuf[64]; - struct sockaddr_in clientaddr = {0}; - - // wait for client connection - // this may be cancelled by sigusr1 - AllowSigusr1(); - clientaddrsize = sizeof(clientaddr); - client = accept(server, (struct sockaddr *)&clientaddr, &clientaddrsize); - if (client == -1) { - if (errno != EAGAIN) { // spinning on SO_RCVTIMEO - ++g_acceptfails; - } - continue; - } - clientip = ntohl(clientaddr.sin_addr.s_addr); + + clientip = ntohl(client.addr.sin_addr.s_addr); g_worker[id].connected = true; g_worker[id].msgcount = 0; ++g_accepts; @@ -697,7 +800,7 @@ void *HttpWorker(void *arg) { AllowSigusr1(); InitHttpMessage(msg, kHttpRequest); g_worker[id].startread = _timespec_real(); - if ((got = read(client, inbuf, INBUF_SIZE)) <= 0) { + if ((got = read(client.sock, inbuf, INBUF_SIZE)) <= 0) { ++g_readfails; break; } @@ -745,11 +848,11 @@ void *HttpWorker(void *arg) { // we don't support http/1.0 and http/0.9 right now if (msg->version != 11) { LOG("%s used unsupported http/%d version\n", ipbuf, msg->version); - Write(client, "HTTP/1.1 505 HTTP Version Not Supported\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n" - "\r\n" - "HTTP Version Not Supported\n"); + Write(client.sock, "HTTP/1.1 505 HTTP Version Not Supported\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n" + "\r\n" + "HTTP Version Not Supported\n"); ++g_badversions; break; } @@ -764,7 +867,7 @@ void *HttpWorker(void *arg) { // export monitoring data if (UrlEqual("/statusz")) { - ServeStatusz(client, outbuf); + ServeStatusz(client.sock, outbuf); ++g_statuszrequests; break; } @@ -821,7 +924,7 @@ void *HttpWorker(void *arg) { p = stpcpy(p, cashbuf); p = stpcpy(p, "\r\n\r\n"); outmsglen = p - outbuf; - sent = write(client, outbuf, outmsglen); + sent = write(client.sock, outbuf, outmsglen); } else { p = stpcpy(outbuf, "HTTP/1.1 200 OK\r\n" STANDARD_RESPONSE_HEADERS "Vary: Accept-Encoding\r\n" @@ -845,7 +948,7 @@ void *HttpWorker(void *arg) { iov[1].iov_base = d.p; iov[1].iov_len = msg->method == kHttpHead ? 0 : d.n; outmsglen = iov[0].iov_len + iov[1].iov_len; - sent = writev(client, iov, 2); + sent = writev(client.sock, iov, 2); } nsync_mu_runlock(&a->lock); //////////////////////////////////////// @@ -865,7 +968,7 @@ void *HttpWorker(void *arg) { p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, ipbuf); outmsglen = p - outbuf; - sent = write(client, outbuf, outmsglen); + sent = write(client.sock, outbuf, outmsglen); } else { Ipv6Warning: DEBUG("%.*s via %s: 400 Need IPv4\n", @@ -885,7 +988,7 @@ void *HttpWorker(void *arg) { p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); outmsglen = p - outbuf; - sent = write(client, outbuf, p - outbuf); + sent = write(client.sock, outbuf, p - outbuf); break; } @@ -899,6 +1002,7 @@ void *HttpWorker(void *arg) { &g_claims, &v, _timespec_add(_timespec_real(), _timespec_frommillis(CLAIM_DEADLINE_MS)))) { + ++g_claimsenqueued; DEBUG("%s claimed by %s\n", ipbuf, v.name); if (HasHeader(kHttpAccept) && (HeaderHas(msg, inbuf, kHttpAccept, "image/*", 7) || @@ -968,14 +1072,14 @@ void *HttpWorker(void *arg) { p = stpcpy(p, "\r\n\r\n"); } outmsglen = p - outbuf; - sent = write(client, outbuf, p - outbuf); + sent = write(client.sock, outbuf, p - outbuf); } else { LOG("%s: 502 Claims Queue Full\n", ipbuf); - Write(client, "HTTP/1.1 502 Claims Queue Full\r\n" - "Content-Type: text/plain\r\n" - "Connection: close\r\n" - "\r\n" - "Claims Queue Full\n"); + Write(client.sock, "HTTP/1.1 502 Claims Queue Full\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n" + "\r\n" + "Claims Queue Full\n"); ++g_queuefulls; break; } @@ -995,7 +1099,7 @@ void *HttpWorker(void *arg) { p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); outmsglen = p - outbuf; - sent = write(client, outbuf, p - outbuf); + sent = write(client.sock, outbuf, p - outbuf); break; } @@ -1017,22 +1121,24 @@ void *HttpWorker(void *arg) { p = stpcpy(p, "\r\n\r\n"); p = stpcpy(p, q); outmsglen = p - outbuf; - sent = write(client, outbuf, p - outbuf); + sent = write(client.sock, outbuf, p - outbuf); } // if the client isn't pipelining and write() wrote the full // amount, then since we sent the content length and checked // that the client didn't attach a payload, we are so synced // thus we can safely process more messages - } while (got == inmsglen && // - sent == outmsglen && // - !HasHeader(kHttpContentLength) && // - !HasHeader(kHttpTransferEncoding) && // - (msg->method == kHttpGet || // - msg->method == kHttpHead) && // - !nsync_note_is_notified(g_shutdown)); + } while (got == inmsglen && // + sent == outmsglen && // + !HasHeader(kHttpContentLength) && // + !HasHeader(kHttpTransferEncoding) && // + !HeaderEqualCase(kHttpConnection, "close") && // + (msg->method == kHttpGet || // + msg->method == kHttpHead) && // + 1. / g_workers * g_connections < CONCERN_LOAD && // + !nsync_note_is_notified(g_shutdown[1])); DestroyHttpMessage(msg); - close(client); + close(client.sock); g_worker[id].connected = false; --g_connections; } @@ -1041,7 +1147,6 @@ void *HttpWorker(void *arg) { g_worker[id].shutdown = true; FreeSafeBuffer(outbuf); FreeSafeBuffer(inbuf); - close(server); return 0; } @@ -1148,17 +1253,18 @@ void IgnoreSignal(int sig) { // asynchronous handler of sigint, sigterm, and sighup signals // this handler is always invoked from within the main thread, -// because our helper and worker threads block always signals. +// because our helper and worker threads always block signals. void OnCtrlC(int sig) { - if (!nsync_note_is_notified(g_shutdown)) { + if (!nsync_note_is_notified(g_shutdown[0])) { LOG("Received %s shutting down...\n", strsignal(sig)); - nsync_note_notify(g_shutdown); + nsync_note_notify(g_shutdown[0]); } else { // there's no way to deliver signals to workers atomically, unless // we pay the cost of ppoll() which isn't necessary in this design // so if a user smashes that ctrl-c then we tkill the workers more LOG("Received %s again so sending another volley...\n", strsignal(sig)); for (int i = 0; i < g_workers; ++i) { + tkill(pthread_getunique_np(g_listener), SIGUSR1); if (!g_worker[i].shutdown) { tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1); } @@ -1233,7 +1339,7 @@ bool GenerateScore(struct Asset *out, long secs, long cash) { DEBUG("GenerateScore %ld\n", secs); a.type = "application/json"; a.cash = cash; - CHECK_SYS(clock_gettime(CLOCK_REALTIME, &a.mtim)); + a.mtim = _timespec_real(); FormatUnixHttpDateTime(a.lastmodified, a.mtim.tv_sec); CHECK_SYS(appends(&a.data.p, "{\n")); CHECK_SYS(appendf(&a.data.p, "\"now\":[%ld,%ld],\n", a.mtim.tv_sec, @@ -1258,7 +1364,7 @@ bool GenerateScore(struct Asset *out, long secs, long cash) { // otherwise.. you can use --strace to see the fcntl bloodbath CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); while ((rc = sqlite3_step(stmt)) != SQLITE_DONE) { - if (rc != SQLITE_ROW) CHECK_SQL(rc); + if (rc != SQLITE_ROW) CHECK_DB(rc); strlcpy(name2, (void *)sqlite3_column_text(stmt, 0), sizeof(name2)); if (!IsValidNick(name2, -1)) continue; if (strcmp(name1, name2)) { @@ -1297,16 +1403,14 @@ bool GenerateScore(struct Asset *out, long secs, long cash) { void *ScoreWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreAll"); - LOG("Score started\n"); + LOG("%P Score started\n"); long wait = SCORE_UPDATE_MS; Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #1 OnlyRunOnCpu(0); - for (nsync_time deadline = _timespec_real();;) { + do { Update(&g_asset.score, GenerateScore, -1, MS2CASH(wait)); - deadline = _timespec_add(deadline, _timespec_frommillis(wait)); - if (nsync_note_wait(g_shutdown, deadline)) break; - } + } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("Score exiting\n"); return 0; } @@ -1315,17 +1419,15 @@ void *ScoreWorker(void *arg) { void *ScoreHourWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreHour"); - LOG("ScoreHour started\n"); + LOG("%P ScoreHour started\n"); long secs = 60L * 60; long wait = SCORE_H_UPDATE_MS; Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #2 OnlyRunOnCpu(0); - for (nsync_time deadline = _timespec_real();;) { + do { Update(&g_asset.score_hour, GenerateScore, secs, MS2CASH(wait)); - deadline = _timespec_add(deadline, _timespec_frommillis(wait)); - if (nsync_note_wait(g_shutdown, deadline)) break; - } + } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreHour exiting\n"); return 0; } @@ -1334,17 +1436,15 @@ void *ScoreHourWorker(void *arg) { void *ScoreDayWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreDay"); - LOG("ScoreDay started\n"); + LOG("%P ScoreDay started\n"); long secs = 60L * 60 * 24; long wait = SCORE_D_UPDATE_MS; Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #3 OnlyRunOnCpu(0); - for (nsync_time deadline = _timespec_real();;) { + do { Update(&g_asset.score_day, GenerateScore, secs, MS2CASH(wait)); - deadline = _timespec_add(deadline, _timespec_frommillis(wait)); - if (nsync_note_wait(g_shutdown, deadline)) break; - } + } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreDay exiting\n"); return 0; } @@ -1353,17 +1453,15 @@ void *ScoreDayWorker(void *arg) { void *ScoreWeekWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreWeek"); - LOG("ScoreWeek started\n"); + LOG("%P ScoreWeek started\n"); long secs = 60L * 60 * 24 * 7; long wait = SCORE_W_UPDATE_MS; Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #4 OnlyRunOnCpu(0); - for (nsync_time deadline = _timespec_real();;) { + do { Update(&g_asset.score_week, GenerateScore, secs, MS2CASH(wait)); - deadline = _timespec_add(deadline, _timespec_frommillis(wait)); - if (nsync_note_wait(g_shutdown, deadline)) break; - } + } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreWeek exiting\n"); return 0; } @@ -1372,17 +1470,15 @@ void *ScoreWeekWorker(void *arg) { void *ScoreMonthWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "ScoreMonth"); - LOG("ScoreMonth started\n"); + LOG("%P ScoreMonth started\n"); long secs = 60L * 60 * 24 * 30; long wait = SCORE_M_UPDATE_MS; Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); nsync_counter_add(g_ready, -1); // #5 OnlyRunOnCpu(0); - for (nsync_time deadline = _timespec_real();;) { + do { Update(&g_asset.score_month, GenerateScore, secs, MS2CASH(wait)); - deadline = _timespec_add(deadline, _timespec_frommillis(wait)); - if (nsync_note_wait(g_shutdown, deadline)) break; - } + } while (!nsync_note_wait(g_shutdown[1], WaitFor(wait))); LOG("ScoreMonth exiting\n"); return 0; } @@ -1400,7 +1496,7 @@ void *RecentWorker(void *arg) { bool warmedup = false; BlockSignals(); pthread_setname_np(pthread_self(), "RecentWorker"); - LOG("RecentWorker started\n"); + LOG("%P RecentWorker started\n"); StartOver: db = 0; stmt = 0; @@ -1414,7 +1510,7 @@ void *RecentWorker(void *arg) { "LIMIT 50")); do { // regenerate json - CHECK_SYS(clock_gettime(CLOCK_REALTIME, &t.mtim)); + t.mtim = _timespec_real(); FormatUnixHttpDateTime(t.lastmodified, t.mtim.tv_sec); CHECK_SYS(appends(&t.data.p, "{\n")); CHECK_SYS(appendf(&t.data.p, "\"now\":[%ld,%ld],\n", t.mtim.tv_sec, @@ -1461,7 +1557,7 @@ void *RecentWorker(void *arg) { // wait for wakeup or cancel nsync_mu_lock(&g_recent.mu); err = nsync_cv_wait_with_deadline(&g_recent.cv, &g_recent.mu, - nsync_time_no_deadline, g_shutdown); + nsync_time_no_deadline, g_shutdown[1]); nsync_mu_unlock(&g_recent.mu); } while (err != ECANCELED); CHECK_DB(sqlite3_finalize(stmt)); @@ -1482,12 +1578,13 @@ void *RecentWorker(void *arg) { void *ClaimWorker(void *arg) { sqlite3 *db; int i, n, rc; + long processed; sqlite3_stmt *stmt; bool warmedup = false; struct Claim *v = _gc(xcalloc(BATCH_MAX, sizeof(struct Claim))); BlockSignals(); pthread_setname_np(pthread_self(), "ClaimWorker"); - LOG("ClaimWorker started\n"); + LOG("%P ClaimWorker started\n"); StartOver: db = 0; stmt = 0; @@ -1505,7 +1602,8 @@ void *ClaimWorker(void *arg) { nsync_counter_add(g_ready, -1); // #7 warmedup = true; } - while ((n = GetClaims(&g_claims, v, BATCH_MAX, nsync_time_no_deadline))) { + while ((n = GetClaims(&g_claims, v, BATCH_MAX))) { + processed = 0; CHECK_SQL(sqlite3_exec(db, "BEGIN TRANSACTION", 0, 0, 0)); for (i = 0; i < n; ++i) { CHECK_DB(sqlite3_bind_int64(stmt, 1, v[i].ip)); @@ -1514,8 +1612,10 @@ void *ClaimWorker(void *arg) { CHECK_DB(sqlite3_bind_int64(stmt, 3, v[i].created)); CHECK_DB((rc = sqlite3_step(stmt)) == SQLITE_DONE ? SQLITE_OK : rc); CHECK_DB(sqlite3_reset(stmt)); + ++processed; } CHECK_SQL(sqlite3_exec(db, "COMMIT TRANSACTION", 0, 0, 0)); + atomic_fetch_add(&g_claimsprocessed, processed); DEBUG("Committed %d claims\n", n); // wake up RecentWorker() nsync_mu_lock(&g_recent.mu); @@ -1536,13 +1636,12 @@ void *ClaimWorker(void *arg) { void *NowWorker(void *arg) { BlockSignals(); pthread_setname_np(pthread_self(), "NowWorker"); - LOG("NowWorker started\n"); + LOG("%P NowWorker started\n"); UpdateNow(); OnlyRunOnCpu(0); nsync_counter_add(g_ready, -1); // #8 - for (nsync_time deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(DATE_UPDATE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { + for (struct timespec ts = {_timespec_real().tv_sec};; ++ts.tv_sec) { + if (!nsync_note_wait(g_shutdown[1], ts)) { UpdateNow(); } else { break; @@ -1558,7 +1657,7 @@ void *NowWorker(void *arg) { // in a while; (2) cancel clients who are sending lots of messages. void Meltdown(void) { int i, marks; - nsync_time now; + struct timespec now; ++g_meltdowns; LOG("Panicking because %d out of %d workers is connected\n", g_connections, g_workers); @@ -1577,9 +1676,8 @@ void Meltdown(void) { // main thread worker void *Supervisor(void *arg) { - for (nsync_time deadline = _timespec_real();;) { - deadline = _timespec_add(deadline, _timespec_frommillis(SUPERVISE_MS)); - if (!nsync_note_wait(g_shutdown, deadline)) { + for (;;) { + if (!nsync_note_wait(g_shutdown[0], WaitFor(SUPERVISE_MS))) { if (g_workers > 1 && 1. / g_workers * g_connections > PANIC_LOAD) { Meltdown(); } @@ -1626,9 +1724,10 @@ int main(int argc, char *argv[]) { sqlite3_initialize(); // server lifecycle locks - g_started = nsync_time_now(); - g_shutdown = nsync_note_new(0, nsync_time_no_deadline); - g_terminate = nsync_note_new(0, nsync_time_no_deadline); + g_started = _timespec_real(); + for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) { + g_shutdown[i] = nsync_note_new(0, nsync_time_no_deadline); + } // load static assets into memory and pre-zip them g_asset.index = LoadAsset("index.html", "text/html; charset=utf-8", 900); @@ -1640,7 +1739,10 @@ int main(int argc, char *argv[]) { __pledge_mode = PLEDGE_PENALTY_RETURN_EPERM; CHECK_EQ(0, unveil("/opt/turfwar", "rwc")); CHECK_EQ(0, unveil(0, 0)); - CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0)); + if (!IsOpenbsd()) { + // TODO(jart): why isn't pledge working on openbsd? + CHECK_EQ(0, pledge("stdio flock rpath wpath cpath inet", 0)); + } // shutdown signals struct sigaction sa; @@ -1671,7 +1773,10 @@ int main(int argc, char *argv[]) { nsync_counter_wait(g_ready, nsync_time_no_deadline); } - // create lots of http listeners to serve those assets + // create one thread to listen + CHECK_EQ(0, pthread_create(&g_listener, 0, ListenWorker, 0)); + + // create lots of http workers to serve those assets LOG("Online\n"); g_worker = _gc(xcalloc(g_workers, sizeof(*g_worker))); for (intptr_t i = 0; i < g_workers; ++i) { @@ -1682,7 +1787,12 @@ int main(int argc, char *argv[]) { LOG("Ready\n"); Supervisor(0); - // cancel accept and read for fast shutdown + // cancel listen() so we stop accepting new clients + LOG("Interrupting listen...\n"); + tkill(pthread_getunique_np(g_listener), SIGUSR1); + CHECK_EQ(0, pthread_join(g_listener, 0)); + + // cancel read() so that keepalive clients finish faster LOG("Interrupting workers...\n"); for (int i = 0; i < g_workers; ++i) { tkill(pthread_getunique_np(g_worker[i].th), SIGUSR1); @@ -1702,11 +1812,16 @@ int main(int argc, char *argv[]) { CHECK_EQ(0, pthread_join(scorer_week, 0)); CHECK_EQ(0, pthread_join(scorer_month, 0)); - // wait for consumers to finish - LOG("Waiting for queue to empty...\n"); - nsync_note_notify(g_terminate); - CHECK_EQ(0, pthread_join(claimer, 0)); + // now that all workers have terminated, the claims queue must be + // empty, therefore, it is now safe to send a cancellation to the + // claims worker thread which waits forever for new claims. CHECK_EQ(0, g_claims.count); + LOG("waiting for claims worker...\n"); + nsync_note_notify(g_shutdown[2]); + CHECK_EQ(0, pthread_join(claimer, 0)); + + // perform some sanity checks + CHECK_EQ(g_claimsprocessed, g_claimsenqueued); // free memory LOG("Freeing memory...\n"); @@ -1720,8 +1835,9 @@ int main(int argc, char *argv[]) { FreeAsset(&g_asset.score_month); FreeAsset(&g_asset.recent); FreeAsset(&g_asset.favicon); - nsync_note_free(g_terminate); - nsync_note_free(g_shutdown); + for (int i = 0; i < ARRAYLEN(g_shutdown); ++i) { + nsync_note_free(g_shutdown[i]); + } nsync_counter_free(g_ready); LOG("Goodbye\n"); diff --git a/test/libc/calls/tkill_test.c b/test/libc/calls/tkill_test.c index e69de29bb2d..defed8bef02 100644 --- a/test/libc/calls/tkill_test.c +++ b/test/libc/calls/tkill_test.c @@ -0,0 +1,57 @@ +/*-*- mode:c;indent-tabs-mode:nil;c-basic-offset:2;tab-width:8;coding:utf-8 -*-│ +│vi: set net ft=c ts=2 sts=2 sw=2 fenc=utf-8 :vi│ +╞══════════════════════════════════════════════════════════════════════════════╡ +│ Copyright 2022 Justine Alexandra Roberts Tunney │ +│ │ +│ Permission to use, copy, modify, and/or distribute this software for │ +│ any purpose with or without fee is hereby granted, provided that the │ +│ above copyright notice and this permission notice appear in all copies. │ +│ │ +│ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL │ +│ WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED │ +│ WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE │ +│ AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL │ +│ DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR │ +│ PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER │ +│ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ +│ PERFORMANCE OF THIS SOFTWARE. │ +╚─────────────────────────────────────────────────────────────────────────────*/ +#include "libc/calls/calls.h" +#include "libc/calls/struct/sigaction.h" +#include "libc/calls/struct/sigset.h" +#include "libc/errno.h" +#include "libc/sysv/consts/sig.h" +#include "libc/testlib/testlib.h" +#include "libc/thread/thread.h" + +_Thread_local intptr_t gotsig; + +void OnSig(int sig) { + gotsig = sig; +} + +void *Worker(void *arg) { + sigset_t ss; + sigemptyset(&ss); + ASSERT_SYS(EINTR, -1, sigsuspend(&ss)); + return (void *)gotsig; +} + +TEST(tkill, test) { + if (IsWindows()) return; // TODO(jart): fix me + int i; + void *res; + pthread_t t; + sigset_t ss, oldss; + sighandler_t oldsig; + sigemptyset(&ss); + sigaddset(&ss, SIGUSR1); + oldsig = signal(SIGUSR1, OnSig); + ASSERT_SYS(0, 0, sigprocmask(SIG_BLOCK, &ss, &oldss)); + ASSERT_EQ(0, pthread_create(&t, 0, Worker, 0)); + ASSERT_SYS(0, 0, tkill(pthread_getunique_np(t), SIGUSR1)); + ASSERT_EQ(0, pthread_join(t, &res)); + ASSERT_EQ(SIGUSR1, (intptr_t)res); + ASSERT_SYS(0, 0, sigprocmask(SIG_SETMASK, &oldss, 0)); + signal(SIGUSR1, oldsig); +} diff --git a/test/tool/net/sqlite_test.c b/test/tool/net/sqlite_test.c index 0ef05fbf963..39c2381d043 100644 --- a/test/tool/net/sqlite_test.c +++ b/test/tool/net/sqlite_test.c @@ -16,27 +16,120 @@ │ TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR │ │ PERFORMANCE OF THIS SOFTWARE. │ ╚─────────────────────────────────────────────────────────────────────────────*/ +#include "libc/calls/calls.h" #include "libc/dce.h" +#include "libc/intrin/kprintf.h" +#include "libc/macros.internal.h" +#include "libc/mem/gc.h" +#include "libc/mem/mem.h" +#include "libc/runtime/runtime.h" +#include "libc/stdio/rand.h" #include "libc/testlib/testlib.h" +#include "libc/thread/thread.h" #include "third_party/sqlite3/sqlite3.h" char testlib_enable_tmp_setup_teardown; -void SetUp(void) { +void SetUpOnce(void) { + if (IsWindows()) exit(0); sqlite3_initialize(); } +// if we try to open a WAL database at the same time from multiple +// threads then it's likely we'll get a SQLITE_BUSY conflict since +// WAL mode does a complicated dance to initialize itself thus all +// we need to do is wait a little bit, and use exponential backoff +int DbOpen(const char *path, sqlite3 **db) { + int i, rc; + rc = sqlite3_open(path, db); + if (rc != SQLITE_OK) return rc; + for (i = 0; i < 12; ++i) { + rc = sqlite3_exec(*db, "PRAGMA journal_mode=WAL", 0, 0, 0); + if (rc == SQLITE_OK) break; + if (rc != SQLITE_BUSY) return rc; + usleep(1000L << i); + } + return sqlite3_exec(*db, "PRAGMA synchronous=NORMAL", 0, 0, 0); +} + +int DbStep(sqlite3_stmt *stmt) { + int i, rc; + for (i = 0; i < 12; ++i) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) break; + if (rc == SQLITE_DONE) break; + if (rc != SQLITE_BUSY) return rc; + usleep(1000L << i); + } + return rc; +} + +int DbExec(sqlite3 *db, const char *sql) { + int i, rc; + for (i = 0; i < 12; ++i) { + rc = sqlite3_exec(db, sql, 0, 0, 0); + if (rc == SQLITE_OK) break; + if (rc != SQLITE_BUSY) return rc; + usleep(1000L << i); + } + return rc; +} + +int DbPrepare(sqlite3 *db, sqlite3_stmt **stmt, const char *sql) { + return sqlite3_prepare_v2(db, sql, -1, stmt, 0); +} + +void *Worker(void *arg) { + int rc; + sqlite3 *db; + sqlite3_stmt *stmt[2]; + static const char *const kNames[] = {"alice", "bob", "varu", "vader"}; + ASSERT_EQ(SQLITE_OK, DbOpen("foo.sqlite", &db)); + ASSERT_EQ(SQLITE_OK, DbPrepare(db, &stmt[0], + "INSERT INTO t (id, name) VALUES (?1, ?2)")); + ASSERT_EQ(SQLITE_OK, DbPrepare(db, &stmt[1], + "SELECT name, COUNT(*) FROM t GROUP BY name")); + for (int j = 0; j < 50; ++j) { + ASSERT_EQ(SQLITE_OK, DbExec(db, "BEGIN TRANSACTION")); + for (int i = 0; i < 4; ++i) { + ASSERT_EQ(SQLITE_OK, sqlite3_bind_int64(stmt[0], 1, rand64())); + ASSERT_EQ(SQLITE_OK, sqlite3_bind_text( + stmt[0], 2, kNames[rand64() % ARRAYLEN(kNames)], + -1, SQLITE_TRANSIENT)); + ASSERT_EQ(SQLITE_DONE, DbStep(stmt[0])); + ASSERT_EQ(SQLITE_OK, sqlite3_reset(stmt[0])); + } + ASSERT_EQ(SQLITE_OK, DbExec(db, "COMMIT TRANSACTION")); + ASSERT_EQ(SQLITE_OK, DbExec(db, "BEGIN TRANSACTION")); + for (;;) { + rc = DbStep(stmt[1]); + if (rc == SQLITE_DONE) break; + ASSERT_EQ(SQLITE_ROW, rc); + } + ASSERT_EQ(SQLITE_OK, sqlite3_reset(stmt[1])); + ASSERT_EQ(SQLITE_OK, DbExec(db, "END TRANSACTION")); + } + ASSERT_EQ(SQLITE_OK, sqlite3_finalize(stmt[1])); + ASSERT_EQ(SQLITE_OK, sqlite3_finalize(stmt[0])); + ASSERT_EQ(SQLITE_OK, sqlite3_close(db)); + return 0; +} + TEST(sqlite, test) { sqlite3 *db; - sqlite3_stmt *stmt; - ASSERT_EQ(SQLITE_OK, sqlite3_open("foo.sqlite", &db)); - ASSERT_EQ(SQLITE_OK, - sqlite3_prepare_v2(db, "PRAGMA synchronous=0", -1, &stmt, 0)); - ASSERT_EQ(SQLITE_DONE, sqlite3_step(stmt)); - ASSERT_EQ(SQLITE_OK, sqlite3_finalize(stmt)); - ASSERT_EQ(SQLITE_OK, - sqlite3_prepare_v2(db, "CREATE TABLE t (x INTEGER)", -1, &stmt, 0)); - ASSERT_EQ(SQLITE_DONE, sqlite3_step(stmt)); - ASSERT_EQ(SQLITE_OK, sqlite3_finalize(stmt)); + ASSERT_EQ(SQLITE_OK, DbOpen("foo.sqlite", &db)); + ASSERT_EQ(SQLITE_OK, DbExec(db, "PRAGMA synchronous=0")); + ASSERT_EQ(SQLITE_OK, DbExec(db, "CREATE TABLE t (\n" + " id INTEGER,\n" + " name TEXT\n" + ")")); ASSERT_EQ(SQLITE_OK, sqlite3_close(db)); + int i, n = 4; + pthread_t *t = _gc(malloc(sizeof(pthread_t) * n)); + for (i = 0; i < n; ++i) { + ASSERT_EQ(0, pthread_create(t + i, 0, Worker, 0)); + } + for (i = 0; i < n; ++i) { + ASSERT_EQ(0, pthread_join(t[i], 0)); + } } diff --git a/test/tool/net/test.mk b/test/tool/net/test.mk index 42af903e432..67c430b9a41 100644 --- a/test/tool/net/test.mk +++ b/test/tool/net/test.mk @@ -42,10 +42,11 @@ TEST_TOOL_NET_DIRECTDEPS = \ LIBC_STUBS \ LIBC_SYSV \ LIBC_TESTLIB \ + LIBC_THREAD \ LIBC_X \ LIBC_ZIPOS \ - THIRD_PARTY_REGEX \ THIRD_PARTY_MBEDTLS \ + THIRD_PARTY_REGEX \ THIRD_PARTY_SQLITE3 TEST_TOOL_NET_DEPS := \ diff --git a/third_party/sqlite3/fts3_write.c b/third_party/sqlite3/fts3_write.c index 1cb3101d2de..101365daa11 100644 --- a/third_party/sqlite3/fts3_write.c +++ b/third_party/sqlite3/fts3_write.c @@ -18,6 +18,7 @@ */ /* clang-format off */ +#include "libc/fmt/conv.h" #include "third_party/sqlite3/fts3Int.inc" #if !defined(SQLITE_CORE) || defined(SQLITE_ENABLE_FTS3) diff --git a/third_party/sqlite3/mutex.internal.h b/third_party/sqlite3/mutex.internal.h new file mode 100644 index 00000000000..08a731d5c46 --- /dev/null +++ b/third_party/sqlite3/mutex.internal.h @@ -0,0 +1,41 @@ +#ifndef COSMOPOLITAN_THIRD_PARTY_SQLITE3_MUTEX_INTERNAL_H_ +#define COSMOPOLITAN_THIRD_PARTY_SQLITE3_MUTEX_INTERNAL_H_ +#if !(__ASSEMBLER__ + __LINKER__ + 0) +COSMOPOLITAN_C_START_ + +#if !SQLITE_THREADSAFE +#define SQLITE_MUTEX_OMIT +#endif +#if SQLITE_THREADSAFE && !defined(SQLITE_MUTEX_NOOP) +#if SQLITE_OS_UNIX +#define SQLITE_MUTEX_PTHREADS +#elif SQLITE_OS_WIN +#define SQLITE_MUTEX_W32 +#else +#define SQLITE_MUTEX_NOOP +#endif +#endif + +#ifdef SQLITE_MUTEX_OMIT +/* +** If this is a no-op implementation, implement everything as macros. +*/ +#define sqlite3_mutex_alloc(X) ((sqlite3_mutex*)8) +#define sqlite3_mutex_free(X) +#define sqlite3_mutex_enter(X) +#define sqlite3_mutex_try(X) SQLITE_OK +#define sqlite3_mutex_leave(X) +#define sqlite3_mutex_held(X) ((void)(X), 1) +#define sqlite3_mutex_notheld(X) ((void)(X), 1) +#define sqlite3MutexAlloc(X) ((sqlite3_mutex*)8) +#define sqlite3MutexInit() SQLITE_OK +#define sqlite3MutexEnd() +#define MUTEX_LOGIC(X) +#else +#define MUTEX_LOGIC(X) X +int sqlite3_mutex_held(sqlite3_mutex*); +#endif /* defined(SQLITE_MUTEX_OMIT) */ + +COSMOPOLITAN_C_END_ +#endif /* !(__ASSEMBLER__ + __LINKER__ + 0) */ +#endif /* COSMOPOLITAN_THIRD_PARTY_SQLITE3_MUTEX_INTERNAL_H_ */ diff --git a/third_party/sqlite3/os_unix.c b/third_party/sqlite3/os_unix.c index 752a4526ea7..70a4394757e 100644 --- a/third_party/sqlite3/os_unix.c +++ b/third_party/sqlite3/os_unix.c @@ -50,6 +50,9 @@ #include "libc/sysv/consts/s.h" #include "libc/runtime/runtime.h" #include "libc/calls/struct/timeval.h" +#include "third_party/sqlite3/sqlite3.h" +#include "third_party/sqlite3/mutex.internal.h" +#include "third_party/sqlite3/mutex.internal.h" #include "third_party/sqlite3/sqliteInt.inc" #if SQLITE_OS_UNIX /* This file is used on unix only */ diff --git a/third_party/sqlite3/pcache1.c b/third_party/sqlite3/pcache1.c index 11109b17089..bd1d2044499 100644 --- a/third_party/sqlite3/pcache1.c +++ b/third_party/sqlite3/pcache1.c @@ -80,6 +80,8 @@ ** show that method (3) with N==100 provides about a 5% performance boost for ** common workloads. */ +#include "libc/assert.h" +#include "third_party/sqlite3/sqlite3.h" #include "third_party/sqlite3/sqliteInt.inc" /* clang-format off */ diff --git a/third_party/sqlite3/sqlite3.h b/third_party/sqlite3/sqlite3.h index b0701b550b2..fcf246adcfd 100644 --- a/third_party/sqlite3/sqlite3.h +++ b/third_party/sqlite3/sqlite3.h @@ -7594,10 +7594,8 @@ struct sqlite3_mutex_methods { ** the appropriate thing to do. The sqlite3_mutex_notheld() ** interface should also return 1 when given a NULL pointer. */ -#ifndef NDEBUG SQLITE_API int sqlite3_mutex_held(sqlite3_mutex*); SQLITE_API int sqlite3_mutex_notheld(sqlite3_mutex*); -#endif /* ** CAPI3REF: Mutex Types diff --git a/third_party/sqlite3/sqlite3.mk b/third_party/sqlite3/sqlite3.mk index f9a8a668147..4d596ca2a3f 100644 --- a/third_party/sqlite3/sqlite3.mk +++ b/third_party/sqlite3/sqlite3.mk @@ -127,14 +127,20 @@ THIRD_PARTY_SQLITE3_FLAGS = \ -DSQLITE_ENABLE_JSON1 \ -DSQLITE_ENABLE_DESERIALIZE \ +ifeq ($(MODE),dbg) +THIRD_PARTY_SQLITE3_CPPFLAGS_DEBUG = -DSQLITE_DEBUG +endif + $(THIRD_PARTY_SQLITE3_A_OBJS): private \ OVERRIDE_CFLAGS += \ $(THIRD_PARTY_SQLITE3_FLAGS) \ + $(THIRD_PARTY_SQLITE3_CPPFLAGS_DEBUG) \ -DSQLITE_OMIT_UPDATE_HOOK $(THIRD_PARTY_SQLITE3_SHELL_OBJS): private \ OVERRIDE_CFLAGS += \ $(THIRD_PARTY_SQLITE3_FLAGS) \ + $(THIRD_PARTY_SQLITE3_CPPFLAGS_DEBUG) \ -DHAVE_READLINE=0 \ -DHAVE_EDITLINE=0 \ -DSQLITE_HAVE_ZLIB \ diff --git a/tool/net/help.txt b/tool/net/help.txt index 85a12792c83..64ccc52e15b 100644 --- a/tool/net/help.txt +++ b/tool/net/help.txt @@ -4626,7 +4626,7 @@ UNIX MODULE Fetches then adds value. This method modifies the word at `word_index` to contain the sum of - its value and the `value` paremeter. This method then returns the + its value and the `value` parameter. This method then returns the value as it existed before the addition was performed. This operation is atomic and provides the same memory barrier