Skip to content

Commit

Permalink
Add support for socket:// in NUTS marry function.
Browse files Browse the repository at this point in the history
This also adds a HUGE test for REP using socket so that we can
discriminate failures that might exist using sockets instead of inproc.
  • Loading branch information
gdamore committed Feb 18, 2024
1 parent ee697a2 commit b4512a2
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 9 deletions.
59 changes: 59 additions & 0 deletions src/sp/protocol/reqrep0/rep_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "nng/nng.h"
#include <nuts.h>

static void
Expand Down Expand Up @@ -240,6 +241,63 @@ test_rep_huge_send(void)
NUTS_CLOSE(req);
}

void
test_rep_huge_send_socket(void)
{
nng_socket rep;
nng_socket req;
nng_msg *m;
nng_msg *d;
nng_aio *aio;

NUTS_PASS(nng_rep_open(&rep));
NUTS_PASS(nng_req_open(&req));
NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(rep, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(req, NNG_OPT_SENDTIMEO, 1000));
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
NUTS_PASS(nng_msg_alloc(&m, 10 << 20)); // 10 MB
NUTS_PASS(nng_socket_set_size(req, NNG_OPT_RECVMAXSZ, 1 << 30));
NUTS_PASS(nng_socket_set_size(rep, NNG_OPT_RECVMAXSZ, 1 << 30));
NUTS_MARRY_EX(req, rep, "socket://", NULL, NULL);
char *body = nng_msg_body(m);

NUTS_ASSERT(nng_msg_len(m) == 10 << 20);
for (size_t i = 0; i < nng_msg_len(m); i++) {
body[i] = i % 16 + 'A';
}
NUTS_PASS(nng_msg_dup(&d, m));
NUTS_SEND(req, "R");
NUTS_RECV(rep, "R");
nng_aio_set_msg(aio, m);
nng_send_aio(rep, aio);
nng_aio_wait(aio);
NUTS_PASS(nng_aio_result(aio));
nng_aio_set_msg(aio, NULL);
m = NULL;
nng_recv_aio(req, aio);
nng_aio_wait(aio);
NUTS_PASS(nng_aio_result(aio));
m = nng_aio_get_msg(aio);
NUTS_ASSERT(m != NULL);
NUTS_ASSERT(nng_msg_len(m) == nng_msg_len(d));
NUTS_ASSERT(
memcmp(nng_msg_body(m), nng_msg_body(d), nng_msg_len(m)) == 0);

// make sure other messages still flow afterwards
NUTS_SEND(req, "E");
NUTS_RECV(rep, "E");
NUTS_SEND(rep, "E");
NUTS_RECV(req, "E");

nng_aio_free(aio);
nng_msg_free(m);
nng_msg_free(d);
NUTS_CLOSE(rep);
NUTS_CLOSE(req);
}

void
test_rep_close_pipe_before_send(void)
{
Expand Down Expand Up @@ -708,6 +766,7 @@ NUTS_TESTS = {
{ "rep context does not poll", test_rep_context_no_poll },
{ "rep validate peer", test_rep_validate_peer },
{ "rep huge send", test_rep_huge_send },
{ "rep huge send socket", test_rep_huge_send_socket },
{ "rep double recv", test_rep_double_recv },
{ "rep send nonblock", test_rep_send_nonblock },
{ "rep close pipe before send", test_rep_close_pipe_before_send },
Expand Down
54 changes: 45 additions & 9 deletions src/testing/marry.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "nng/nng.h"
#ifdef _WIN32

#ifndef WIN32_LEAN_AND_MEAN
Expand Down Expand Up @@ -99,12 +100,12 @@ nuts_scratch_addr(const char *scheme, size_t sz, char *addr)
uint16_t
nuts_next_port(void)
{
char * name;
FILE * f;
char *name;
FILE *f;
uint16_t port;
uint16_t base;
uint16_t end;
char * str;
char *str;
#ifdef _WIN32
OVERLAPPED olp;
HANDLE h;
Expand Down Expand Up @@ -180,7 +181,7 @@ nuts_next_port(void)

struct marriage_notice {
nng_mtx *mx;
nng_cv * cv;
nng_cv *cv;
int s1;
int s2;
int cnt1;
Expand Down Expand Up @@ -249,6 +250,7 @@ nuts_marry_ex(
char addr[64];
nng_listener l;
int port;
int fd[2];

if (url == NULL) {
(void) snprintf(addr, sizeof(addr),
Expand All @@ -268,14 +270,47 @@ nuts_marry_ex(
((rv = nng_pipe_notify(
s1, NNG_PIPE_EV_ADD_POST, married, &note)) != 0) ||
((rv = nng_pipe_notify(
s2, NNG_PIPE_EV_ADD_POST, married, &note)) != 0) ||
((rv = nng_listen(s1, url, &l, 0)) != 0)) {
s2, NNG_PIPE_EV_ADD_POST, married, &note)) != 0)) {
goto done;
}

// If a TCP port of zero was selected, let's ask for the actual
// port bound.
// If socket:// is requested we will try to use that, otherwise we
// fake it with a TCP loopback socket.
if (strcmp(url, "socket://") == 0) {
rv = nng_socket_pair(fd);
if (rv == 0) {
nng_listener l2;
if (((rv = nng_listen(s1, url, &l, 0)) != 0) ||
((rv = nng_listen(s2, url, &l2, 0)) != 0) ||
((rv = nng_listener_set_int(
l, NNG_OPT_SOCKET_FD, fd[0])) != 0) ||
((rv = nng_listener_set_int(
l2, NNG_OPT_SOCKET_FD, fd[1])) != 0)) {
#ifdef _WIN32
CloseHandle((HANDLE) fd[0]);
CloseHandle((HANDLE) fd[1]);
#else
close(fd[0]);
close(fd[1]);
#endif
return (rv);
}
} else if (rv == NNG_ENOTSUP) {
url = "tcp://127.0.0.1:0";
rv = 0;
} else {
return (rv);
}
}

if (strcmp(url, "socket://") != 0) {
if ((rv = nng_listen(s1, url, &l, 0)) != 0) {
return (rv);
}
}
if ((strstr(url, ":0") != NULL) &&
// If a TCP port of zero was selected, let's ask for the actual
// port bound.
(nng_listener_get_int(l, NNG_OPT_TCP_BOUND_PORT, &port) == 0) &&
(port > 0)) {
replace_port_zero(url, addr, port);
Expand All @@ -285,7 +320,8 @@ nuts_marry_ex(
((rv = nng_socket_set_ms(s2, NNG_OPT_RECONNMAXT, 10)) != 0)) {
goto done;
}
if ((rv = nng_dial(s2, url, NULL, 0)) != 0) {
if ((strcmp(url, "socket://") != 0) &&
((rv = nng_dial(s2, url, NULL, 0)) != 0)) {
goto done;
}

Expand Down

0 comments on commit b4512a2

Please sign in to comment.