Skip to content

Commit

Permalink
Inline aio allocation for fdc, and remove timeout for negotiation.
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 26, 2023
1 parent b8a08ce commit 26328ac
Showing 1 changed file with 32 additions and 30 deletions.
62 changes: 32 additions & 30 deletions src/sp/transport/fdconn/fdconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ struct fdc_tran_pipe {
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_aio txaio;
nni_aio rxaio;
nni_aio negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
Expand Down Expand Up @@ -107,9 +107,9 @@ fdc_tran_pipe_close(void *arg)
p->closed = true;
nni_mtx_unlock(&p->mtx);

nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
nni_aio_close(&p->rxaio);
nni_aio_close(&p->txaio);
nni_aio_close(&p->negoaio);

nng_stream_close(p->conn);
}
Expand All @@ -119,9 +119,9 @@ fdc_tran_pipe_stop(void *arg)
{
fdc_tran_pipe *p = arg;

nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
nni_aio_stop(&p->negoaio);
}

static int
Expand Down Expand Up @@ -150,9 +150,9 @@ fdc_tran_pipe_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
}

nni_aio_free(p->rxaio);
nni_aio_free(p->txaio);
nni_aio_free(p->negoaio);
nni_aio_fini(&p->rxaio);
nni_aio_fini(&p->txaio);
nni_aio_fini(&p->negoaio);
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
nni_mtx_fini(&p->mtx);
Expand All @@ -174,19 +174,14 @@ static int
fdc_tran_pipe_alloc(fdc_tran_pipe **pipep)
{
fdc_tran_pipe *p;
int rv;

if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);

Check warning on line 179 in src/sp/transport/fdconn/fdconn.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/fdconn/fdconn.c#L179

Added line #L179 was not covered by tests
}
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_alloc(&p->txaio, fdc_tran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rxaio, fdc_tran_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->negoaio, fdc_tran_pipe_nego_cb, p)) !=
0)) {
fdc_tran_pipe_fini(p);
return (rv);
}
nni_aio_init(&p->txaio, fdc_tran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, fdc_tran_pipe_recv_cb, p);
nni_aio_init(&p->negoaio, fdc_tran_pipe_nego_cb, p);
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);
Expand Down Expand Up @@ -219,7 +214,7 @@ fdc_tran_pipe_nego_cb(void *arg)
{
fdc_tran_pipe *p = arg;
fdc_tran_ep *ep = p->ep;
nni_aio *aio = p->negoaio;
nni_aio *aio = &p->negoaio;
nni_aio *uaio;
int rv;

Expand Down Expand Up @@ -302,7 +297,7 @@ fdc_tran_pipe_send_cb(void *arg)
nni_aio *aio;
size_t n;
nni_msg *msg;
nni_aio *txaio = p->txaio;
nni_aio *txaio = &p->txaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
Expand Down Expand Up @@ -349,7 +344,7 @@ fdc_tran_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg *msg;
nni_aio *rxaio = p->rxaio;
nni_aio *rxaio = &p->rxaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);
Expand Down Expand Up @@ -445,7 +440,7 @@ fdc_tran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the txaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->sendq) == aio) {
nni_aio_abort(p->txaio, rv);
nni_aio_abort(&p->txaio, rv);
nni_mtx_unlock(&p->mtx);
return;

Check warning on line 445 in src/sp/transport/fdconn/fdconn.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/fdconn/fdconn.c#L442-L445

Added lines #L442 - L445 were not covered by tests
}
Expand Down Expand Up @@ -483,7 +478,7 @@ fdc_tran_pipe_send_start(fdc_tran_pipe *p)

NNI_PUT64(p->txlen, len);

txaio = p->txaio;
txaio = &p->txaio;
niov = 0;
iov[0].iov_buf = p->txlen;
iov[0].iov_len = sizeof(p->txlen);
Expand Down Expand Up @@ -542,7 +537,7 @@ fdc_tran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the rxaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->recvq) == aio) {
nni_aio_abort(p->rxaio, rv);
nni_aio_abort(&p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
Expand Down Expand Up @@ -571,7 +566,7 @@ fdc_tran_pipe_recv_start(fdc_tran_pipe *p)
}

// Schedule a read of the header.
rxaio = p->rxaio;
rxaio = &p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
Expand Down Expand Up @@ -642,11 +637,18 @@ fdc_tran_pipe_start(fdc_tran_pipe *p, nng_stream *conn, fdc_tran_ep *ep)
p->wanttxhead = 8;
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(p->negoaio, 1, &iov);
nni_aio_set_iov(&p->negoaio, 1, &iov);
nni_list_append(&ep->negopipes, p);

nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, p->negoaio);
// No timeouts here -- the purpose of timeouts was to guard against
// untrusted callers forcing us to burn files. In this case the
// application is providing us with a file, and should be reasonably
// trusted not to be doing a DoS against itself! :-) Part of the
// rationale is that it may take a while for a child process to reach
// the point where it is ready to negotiate the other side of a
// connection.
nni_aio_set_timeout(&p->negoaio, NNG_DURATION_INFINITE);
nng_stream_send(p->conn, &p->negoaio);
}

static void
Expand Down

0 comments on commit 26328ac

Please sign in to comment.