diff --git a/README b/README index 481969d6d..568f35fa1 100644 --- a/README +++ b/README @@ -1,4 +1,4 @@ -libtrace 4.0.22 +libtrace 4.0.23 Code and documentation added since version 4.0.20 is Copyright (c) 2023 Shane Alcock and has been contributed as per diff --git a/configure.in b/configure.in index e745661ad..06ba372cb 100644 --- a/configure.in +++ b/configure.in @@ -3,11 +3,11 @@ # Now you only need to update the version number in two places - below, # and in the README -AC_INIT([libtrace],[4.0.22],[shane@alcock.co.nz],[libtrace]) +AC_INIT([libtrace],[4.0.23],[shane@alcock.co.nz],[libtrace]) LIBTRACE_MAJOR=4 LIBTRACE_MID=0 -LIBTRACE_MINOR=22 +LIBTRACE_MINOR=23 # OpenSolaris hides libraries like libncurses in /usr/gnu/lib, which is not # searched by default - add it to LDFLAGS so we at least have a chance of @@ -102,8 +102,10 @@ ADD_LDFLAGS="$ADD_LDFLAGS -L\$(abs_top_srcdir)/lib" LIBTRACE_LIBS="" TOOLS_LIBS="" -CFLAGS="$CFLAGS -Wall -Wmissing-prototypes -Wextra -DLT_BUILDING_DLL=1" -CXXFLAGS="$CXXFLAGS -Wall -DLT_BUILDING_DLL=1" +CFLAGS="$CFLAGS -Wall -Wmissing-prototypes -Wextra -DLT_BUILDING_DLL=1" +CXXFLAGS="$CXXFLAGS -Wall -DLT_BUILDING_DLL=1" +#CFLAGS="$CFLAGS -fsanitize=address -Wall -Wmissing-prototypes -Wextra -DLT_BUILDING_DLL=1" +#CXXFLAGS="$CXXFLAGS -fsanitize=address -Wall -DLT_BUILDING_DLL=1" AC_ARG_ENABLE(address-san, AS_HELP_STRING(--enable-address-san, Enable address and memory sanitisation), [ diff --git a/debian/changelog b/debian/changelog index 159002f2c..fdf18b27d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,19 @@ +libtrace4 (4.0.23-1) unstable; urgency=medium + + * Add new tool: traceucast, a TCP unicast variant of tracemcast. + * Add new input format: ndagtcp, for receiving packets sent by + traceucast. + * libpacketdump: fix premature free when decoding IPMM IRIs + received via an etsilive input. + * tracemcast: fix bug where the sequence number was not being + incremented for each sent datagram. + * object cache data structure: fix potential segfault after + resizing the cache. + * pcapfile: fix issue where packets owned by "dead" pcapfile trace + would have an invalid pcap version. + + -- Shane Alcock Fri, 10 Nov 2023 09:40:41 +1300 + libtrace4 (4.0.22-1) unstable; urgency=medium * Fix segmentation fault when closing an ndag input that had diff --git a/lib/Makefile.am b/lib/Makefile.am index d14b0126e..fafa62b52 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -146,7 +146,7 @@ endif AM_CPPFLAGS= @ADD_INCLS@ libtrace_la_LIBADD = @LIBTRACE_LIBS@ @LTLIBOBJS@ $(DPDKLIBS) -libtrace_la_LDFLAGS=-version-info 7:6:0 @ADD_LDFLAGS@ +libtrace_la_LDFLAGS=-version-info 7:7:0 @ADD_LDFLAGS@ dagapi.c: cp @DAG_TOOLS_DIR@/dagapi.c . diff --git a/lib/data-struct/object_cache.c b/lib/data-struct/object_cache.c index 4babd88fe..e864cb676 100644 --- a/lib/data-struct/object_cache.c +++ b/lib/data-struct/object_cache.c @@ -146,7 +146,7 @@ static void resize_memory_caches(struct local_caches *lcs) { fprintf(stderr, "Expected lcs->t_mem_caches_total to be greater or equal to 0 in resize_memory_caches()\n"); return; } - lcs->t_mem_caches += 0x10; + lcs->t_mem_caches_total += 0x10; lcs->t_mem_caches = realloc(lcs->t_mem_caches, lcs->t_mem_caches_total * sizeof(struct local_cache)); } diff --git a/lib/format_ndag.c b/lib/format_ndag.c index 36c76ecef..c6eb35c10 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -35,6 +35,7 @@ #include "format_helper.h" #include "format_erf.h" +#include #include #include #include @@ -61,6 +62,11 @@ static struct libtrace_format_t ndag; volatile int ndag_paused = 0; +enum { + NDAG_SOCKET_TYPE_MULTICAST, + NDAG_SOCKET_TYPE_TCP, +}; + typedef struct monitor { uint16_t monitorid; uint64_t laststart; @@ -75,6 +81,7 @@ typedef struct streamsource { } streamsource_t; typedef struct streamsock { + uint8_t socktype; char *groupaddr; int sock; struct addrinfo *srcaddr; @@ -85,20 +92,22 @@ typedef struct streamsock { char *nextread; int nextreadind; int nextwriteind; + uint16_t nextrlen; int savedsize[ENCAP_BUFFERS]; - uint8_t rectype[ENCAP_BUFFERS]; + int expectedreccount; + uint8_t rectype; uint64_t nextts; uint32_t startidle; - uint64_t recordcount; + uint64_t total_recordcount; + int reccount; int bufavail; int bufwaiting; #if HAVE_DECL_RECVMMSG struct mmsghdr mmsgbufs[RECV_BATCH_SIZE]; -#else - struct msghdr singlemsg; #endif + struct msghdr singlemsg; } streamsock_t; @@ -115,6 +124,7 @@ typedef struct recvstream { uint64_t received_packets; int maxfd; + uint8_t halted; } recvstream_t; typedef struct ndag_format_data { @@ -124,6 +134,7 @@ typedef struct ndag_format_data { uint16_t nextthreadid; recvstream_t *receivers; int receiver_cnt; + uint8_t socktype; pthread_t controlthread; libtrace_message_queue_t controlqueue; @@ -142,6 +153,16 @@ typedef struct ndagreadermessage { } ndag_internal_message_t; +#define NEXT_BUFFER(ssock, nr) \ + ssock->savedsize[nr] = 0; \ + ssock->bufwaiting ++; \ + nr ++; \ + if (nr == ENCAP_BUFFERS) { \ + nr = 0; \ + } \ + ssock->nextread = ssock->saved[nr]; \ + ssock->nextreadind = nr; + static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) { /* Calculate seq_a - seq_b, taking wraparound into account */ @@ -180,6 +201,17 @@ static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) { return header->type; } +static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) { + + int i; + for (i = 0; i < rt->sourcecount; i++) { + if (rt->sources[i].monitorptr == mon) { + rt->sources[i].expectedseq = 0; + } + } + +} + static int join_multicast_group(char *groupaddr, char *localiface, char *portstr, uint16_t portnum, struct addrinfo **srcinfo) { @@ -224,7 +256,10 @@ static int join_multicast_group(char *groupaddr, char *localiface, return -1; } - *srcinfo = gotten; + if (srcinfo) { + *srcinfo = gotten; + } + sock = socket(gotten->ai_family, gotten->ai_socktype, 0); if (sock < 0) { fprintf(stderr, @@ -365,6 +400,7 @@ static void new_group_alert(libtrace_t *libtrace, uint16_t threadid, ndag_internal_message_t alert; + memset(&alert, 0, sizeof(alert)); alert.type = NDAG_CLIENT_NEWGROUP; alert.contents.groupaddr = FORMAT_DATA->multicastgroup; alert.contents.localiface = FORMAT_DATA->localiface; @@ -442,54 +478,138 @@ static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf, } -static void *ndag_controller_run(void *tdata) { +static inline int select_on_sock(int sock) { + int r; + fd_set read_fds; + struct timeval timeout; - libtrace_t *libtrace = (libtrace_t *)tdata; + FD_ZERO(&read_fds); + FD_SET(sock, &read_fds); + + timeout.tv_sec = 0; + timeout.tv_usec = 500000; + + r = select(sock + 1, &read_fds, NULL, NULL, &timeout); + if (r == -1) { + return -1; + } + if (!FD_ISSET(sock, &read_fds)) { + return 0; + } + + return 1; +} + +static int accept_ndagtcp_connection(libtrace_t *libtrace, + char *ipstr, char *portstr) { + + struct addrinfo hints, *listenai; + int sock, consock=-1, reuse=1, r; + struct sockaddr_storage sa; + socklen_t addrsize; + + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + sock = -1; + listenai = NULL; + if (getaddrinfo(ipstr, portstr, &hints, &listenai) != 0) { + fprintf(stderr, + "Call to getaddrinfo failed for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + + sock = socket(listenai->ai_family, listenai->ai_socktype, 0); + if (sock < 0) { + fprintf(stderr, "Failed to create socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) + < 0) { + + fprintf(stderr, "Failed to configure socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + + goto failed; + } + + if (bind(sock, (struct sockaddr *)listenai->ai_addr, + listenai->ai_addrlen) < 0) { + + fprintf(stderr, "Failed to bind socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + + if (listen(sock, 10) < 0) { + fprintf(stderr, "Failed to listen on socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + + freeaddrinfo(listenai); + listenai = NULL; + + fcntl(sock, F_SETFL, O_NONBLOCK); + + while (is_halted(libtrace) == -1) { + r = select_on_sock(sock); + if (r < 0) { + fprintf(stderr, "Error in select while accepting connection on socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + consock = -1; + break; + } else if (r == 0) { + consock = 0; + continue; + } + + consock = accept(sock, (struct sockaddr *)&sa, &addrsize); + if (consock < 0) { + fprintf(stderr, + "Failed to accept connection on socket for %s:%s -- %s\n", + ipstr, portstr, strerror(errno)); + goto failed; + } + break; + } + +failed: + if (sock >= 0) { + close(sock); + } + if (listenai) { + freeaddrinfo(listenai); + } + return consock; +} + +static void _ndag_controller_run(libtrace_t *libtrace, int sock) { uint16_t ptmap[65536]; - int sock = -1; - struct addrinfo *receiveaddr = NULL; - fd_set listening; - struct timeval timeout; + int ret; /* ptmap is a dirty hack to allow us to quickly check if we've already * assigned a stream to a thread. */ memset(ptmap, 0xff, 65536 * sizeof(uint16_t)); - sock = join_multicast_group(FORMAT_DATA->multicastgroup, - FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0, - &receiveaddr); - if (sock == -1) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, - "Unable to join multicast group for nDAG control channel"); - trace_interrupt(); - pthread_exit(NULL); - } - ndag_paused = 0; while ((is_halted(libtrace) == -1) && !ndag_paused) { - int ret; char buf[CTRL_BUF_SIZE]; - FD_ZERO(&listening); - FD_SET(sock, &listening); - - timeout.tv_sec = 0; - timeout.tv_usec = 500000; - - ret = select(sock + 1, &listening, NULL, NULL, &timeout); + ret = select_on_sock(sock); if (ret < 0) { fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno)); break; - } - - if (!FD_ISSET(sock, &listening)) { + } else if (ret == 0) { continue; } - ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0, - receiveaddr->ai_addr, - &(receiveaddr->ai_addrlen)); + ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0, NULL, NULL); if (ret < 0) { fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno)); break; @@ -509,19 +629,57 @@ static void *ndag_controller_run(void *tdata) { close(sock); } - freeaddrinfo(receiveaddr); - /* Control channel has fallen over, should probably encourage libtrace * to halt the receiver threads as well. */ - if (!is_halted(libtrace)) { + if (ret < 0 && is_halted(libtrace) == -1) { trace_interrupt(); } +} + +static void *ndagtcp_controller_run(void *tdata) { + libtrace_t *libtrace = (libtrace_t *)tdata; + int sock = -1; + + while (is_halted(libtrace) == -1 && !ndag_paused) { + sock = accept_ndagtcp_connection(libtrace, + FORMAT_DATA->multicastgroup, + FORMAT_DATA->portstr); + if (sock == -1) { + trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, + "Unable to setup control channel for nDAG TCP"); + trace_interrupt(); + } else if (sock == 0) { + continue; + } else { + _ndag_controller_run(libtrace, sock); + } + } pthread_exit(NULL); } -static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) +static void *ndag_controller_run(void *tdata) { + + libtrace_t *libtrace = (libtrace_t *)tdata; + int sock = -1; + + sock = join_multicast_group(FORMAT_DATA->multicastgroup, + FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0, + NULL); + if (sock == -1) { + trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, + "Unable to setup control channel for nDAG TCP"); + trace_interrupt(); + } else { + _ndag_controller_run(libtrace, sock); + } + + pthread_exit(NULL); +} + +static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads, + uint8_t socktype) { int ret; uint32_t i; @@ -537,6 +695,7 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) for (i = 0; i < maxthreads; i++) { FORMAT_DATA->receivers[i].sources = NULL; FORMAT_DATA->receivers[i].sourcecount = 0; + FORMAT_DATA->receivers[i].halted = 0; FORMAT_DATA->receivers[i].knownmonitors = NULL; FORMAT_DATA->receivers[i].monitorcount = 0; FORMAT_DATA->receivers[i].threadindex = i; @@ -553,8 +712,16 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) /* Start the controller thread */ /* TODO consider affinity of this thread? */ - ret = pthread_create(&(FORMAT_DATA->controlthread), NULL, - ndag_controller_run, libtrace); + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + ret = pthread_create(&(FORMAT_DATA->controlthread), NULL, + ndag_controller_run, libtrace); + } else if (socktype == NDAG_SOCKET_TYPE_TCP) { + ret = pthread_create(&(FORMAT_DATA->controlthread), NULL, + ndagtcp_controller_run, libtrace); + } else { + ret = -1; + } + if (ret != 0) { return -1; } @@ -562,46 +729,76 @@ static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads) } static int ndag_start_input(libtrace_t *libtrace) { - return ndag_start_threads(libtrace, 1); + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_MULTICAST; + return ndag_start_threads(libtrace, 1, NDAG_SOCKET_TYPE_MULTICAST); +} + +static int ndagtcp_start_input(libtrace_t *libtrace) { + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_TCP; + return ndag_start_threads(libtrace, 1, NDAG_SOCKET_TYPE_TCP); } static int ndag_pstart_input(libtrace_t *libtrace) { - if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) == - libtrace->perpkt_thread_count) + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_MULTICAST; + if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count, + NDAG_SOCKET_TYPE_MULTICAST) == + libtrace->perpkt_thread_count) { return 0; + } return -1; } -static void halt_ndag_receiver(recvstream_t *receiver) { - int j, i; - libtrace_message_queue_destroy(&(receiver->mqueue)); +static int ndagtcp_pstart_input(libtrace_t *libtrace) { + FORMAT_DATA->socktype = NDAG_SOCKET_TYPE_TCP; + if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count, + NDAG_SOCKET_TYPE_TCP) == + libtrace->perpkt_thread_count) { + return 0; + } + return -1; +} - if (receiver->sources == NULL) - return; - for (i = 0; i < receiver->sourcecount; i++) { - streamsock_t src = receiver->sources[i]; - if (src.saved) { - for (j = 0; j < ENCAP_BUFFERS; j++) { - if (src.saved[j]) { - free(src.saved[j]); - } +static void free_streamsock_data(streamsock_t *src) { + int j; + if (src->saved) { + for (j = 0; j < ENCAP_BUFFERS; j++) { + if (src->saved[j]) { + free(src->saved[j]); } - free(src.saved); } + free(src->saved); + } #if HAVE_DECL_RECVMMSG - for (j = 0; j < RECV_BATCH_SIZE; j++) { - if (src.mmsgbufs[j].msg_hdr.msg_iov) { - free(src.mmsgbufs[j].msg_hdr.msg_iov); - } + for (j = 0; j < RECV_BATCH_SIZE; j++) { + if (src->mmsgbufs[j].msg_hdr.msg_iov) { + free(src->mmsgbufs[j].msg_hdr.msg_iov); } -#else - free(src.singlemsg.msg_iov); + } #endif + if (src->singlemsg.msg_iov) { + free(src->singlemsg.msg_iov); + } - if (src.sock != -1) { - close(src.sock); - } + if (src->sock != -1) { + close(src->sock); + } +} + + +static void halt_ndag_receiver(recvstream_t *receiver) { + int i; + + receiver->halted = 1; + usleep(200000); + + libtrace_message_queue_destroy(&(receiver->mqueue)); + + if (receiver->sources == NULL) + return; + for (i = 0; i < receiver->sourcecount; i++) { + streamsock_t *src = &(receiver->sources[i]); + free_streamsock_data(src); } if (receiver->knownmonitors) { free(receiver->knownmonitors); @@ -615,12 +812,12 @@ static void halt_ndag_receiver(recvstream_t *receiver) { static int ndag_pause_input(libtrace_t *libtrace) { int i; + ndag_paused = 1; + pthread_join(FORMAT_DATA->controlthread, NULL); /* Close the existing receiver sockets */ for (i = 0; i < FORMAT_DATA->receiver_cnt; i++) { halt_ndag_receiver(&(FORMAT_DATA->receivers[i])); } - ndag_paused = 1; - pthread_join(FORMAT_DATA->controlthread, NULL); return 0; } @@ -662,61 +859,226 @@ static int ndag_get_framing_length(const libtrace_packet_t *packet) { return 0; } -static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, - recvstream_t *restrict rt, - streamsock_t *restrict ssock, - libtrace_packet_t *restrict packet) { +static int got_complete_packet(streamsock_t *ssock) { + unsigned int required, available; + int nr = ssock->nextreadind; + int next; + + if (ssock->rectype == NDAG_PKT_ENCAPERF || + ssock->rectype == NDAG_PKT_CORSAROTAG) { + required = ssock->nextrlen; + } else { + required = 0; + } + if (required == 0) { + return 0; + } + + next = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + while (required > available) { + next = ((next + 1) % ENCAP_BUFFERS); + if (ssock->savedsize[next] == 0) { + /* no more data available */ + return 0; + } + available += ssock->savedsize[next]; + } + return 1; +} + +static unsigned int copy_tmp_buffer(streamsock_t *ssock, char *tmpbuf, + unsigned int required) { + + char *ptr = tmpbuf; + int next, first = 1; + unsigned int available; + int nr = ssock->nextreadind; + next = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + + while (required > available) { + + if (first && available > 0) { + memcpy(ptr, ssock->nextread, available); + ptr += available; + first = 0; + } + next = ((next + 1) % ENCAP_BUFFERS); + if (ssock->savedsize[next] == 0) { + /* no more data available */ + return 0; + } + memcpy(ptr, ssock->saved[next], ssock->savedsize[next]); + available += ssock->savedsize[next]; + } + return available; +} + +static inline void consume_streamsock_data(streamsock_t *ssock, + unsigned int required) { + + int nr = ssock->nextreadind; + unsigned int available = (ssock->savedsize[nr] - + (ssock->nextread - ssock->saved[nr])); + + while (available < required) { + required -= available; + NEXT_BUFFER(ssock, nr) + nr = ssock->nextreadind; + available = ssock->savedsize[nr]; + assert(available > 0); + } + ssock->nextread += required; + /* handle the case where we use up the current buffer exactly */ + if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { + NEXT_BUFFER(ssock, ssock->nextreadind); + } +} + +static int process_ndag_encap_headers(streamsock_t *ssock, recvstream_t *rt) { + + ndag_encap_t *encaphdr; + ndag_monitor_t *mon; + uint8_t rectype; + + int nr; + unsigned int available, required; + char tmpbuf[ENCAP_BUFSIZE * 2]; + char *usebuf; + + nr = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + required = sizeof(ndag_common_t) + sizeof(ndag_encap_t); + + usebuf = ssock->nextread; + if (available < required) { + available = copy_tmp_buffer(ssock, tmpbuf, required); + if (available == 0) { + return 0; + } + usebuf = tmpbuf; + } + + rectype = check_ndag_header(usebuf, available); + if (rectype == NDAG_PKT_KEEPALIVE) { + consume_streamsock_data(ssock, required); + return process_ndag_encap_headers(ssock, rt); + } else if (rectype != NDAG_PKT_ENCAPERF && + rectype != NDAG_PKT_CORSAROTAG) { + fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", + ssock->groupaddr, ssock->port); + return -1; + } + + ssock->rectype = rectype; + encaphdr=(ndag_encap_t *)(usebuf + sizeof(ndag_common_t)); + ssock->expectedreccount = ntohs(encaphdr->recordcount); + mon = ssock->monitorptr; + + if (mon->laststart == 0) { + mon->laststart = bswap_be_to_host64(encaphdr->started); + } else if (mon->laststart != bswap_be_to_host64( + encaphdr->started)) { + mon->laststart = bswap_be_to_host64(encaphdr->started); + reset_expected_seqs(rt, mon); + } + if (ssock->expectedseq != 0) { + rt->missing_records += seq_cmp( + ntohl(encaphdr->seqno), + ssock->expectedseq); + } + ssock->expectedseq = ntohl(encaphdr->seqno) + 1; + if (ssock->expectedseq == 0) { + ssock->expectedseq ++; + } + consume_streamsock_data(ssock, required); + return 1; +} + +static int process_ndag_corsaro_header(streamsock_t *ssock, + recvstream_t *rt UNUSED) { corsaro_tagged_packet_header_t *taghdr; + int nr; - uint16_t rlen; + unsigned int available, required; + char tmpbuf[ENCAP_BUFSIZE * 2]; + char *usebuf; - packet->buf_control = TRACE_CTRL_EXTERNAL; + nr = ssock->nextreadind; - packet->trace = libtrace; - packet->buffer = ssock->nextread; - packet->header = ssock->nextread; - packet->type = TRACE_RT_DATA_CORSARO_TAGGED; + if (ssock->savedsize[nr] == 0) { + /* no data available? I don't think we should be able to + * get here in that case... */ + return 0; + } - taghdr = (corsaro_tagged_packet_header_t *)packet->header; + assert(ssock->expectedreccount != 0); + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + required = sizeof(corsaro_tagged_packet_header_t); + usebuf = ssock->nextread; - packet->payload = &(taghdr->tags); + if (available < required) { + available = copy_tmp_buffer(ssock, tmpbuf, required); + if (available == 0) { + return 0; + } + usebuf = tmpbuf; + } - rlen = ntohs(taghdr->pktlen) + sizeof(corsaro_tagged_packet_header_t); - rt->received_packets ++; - ssock->recordcount += 1; + taghdr = (corsaro_tagged_packet_header_t *)usebuf; + + ssock->nextrlen = ntohs(taghdr->pktlen) + + sizeof(corsaro_tagged_packet_header_t); + ssock->nextts = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; + ssock->nextts += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; + return 1; +} + +static int process_ndag_erf_headers(streamsock_t *ssock, recvstream_t *rt) { + + dag_record_t *erfptr; + + int nr; + unsigned int available, required; + char tmpbuf[ENCAP_BUFSIZE * 2]; + char *usebuf; nr = ssock->nextreadind; - ssock->nextread += rlen; - ssock->nextts = 0; - if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the " - "nDAG receive buffer, probably due to a invalid taghdr->pktlen, in ndag_prepare_packet_stream_corsarotag()"); - return -1; - } + if (ssock->savedsize[nr] == 0) { + /* no data available? I don't think we should be able to + * get here in that case... */ + return 0; + } - if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { - /* Read everything from this buffer, mark as empty and - * move on. */ - ssock->savedsize[nr] = 0; - ssock->bufwaiting ++; - - nr ++; - if (nr == ENCAP_BUFFERS) { - nr = 0; + assert(ssock->expectedreccount != 0); + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + required = dag_record_size; + usebuf = ssock->nextread; + + if (available < required) { + available = copy_tmp_buffer(ssock, tmpbuf, required); + if (available == 0) { + return 0; } - ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + - sizeof(ndag_encap_t); - ssock->nextreadind = nr; + usebuf = tmpbuf; } - packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; - packet->order += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; - packet->error = rlen; - packet->cached.link_type = TRACE_TYPE_CORSAROTAG; - return rlen; + erfptr = (dag_record_t *)usebuf; + + ssock->nextts = bswap_le_to_host64(erfptr->ts); + ssock->nextrlen = ntohs(erfptr->rlen); + assert(ssock->nextrlen != 0); + if (rt->received_packets > 0) { + rt->dropped_upstream += ntohs(erfptr->lctr); + } + + /* don't consume the erf header, we'll need it later on */ + + return 1; } static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, @@ -727,26 +1089,35 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, /* XXX flags is constant, so we can tell the compiler to not * bother copying over the parameter */ - + char tmpbuf[ENCAP_BUFSIZE * 2]; + int nr, rlen; + unsigned int available; + char *usebuf = ssock->nextread; dag_record_t *erfptr; - ndag_encap_t *encaphdr; - uint16_t ndag_reccount = 0; - int nr; - uint16_t rlen; - /* - if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) { - packet->buf_control = TRACE_CTRL_PACKET; - } else { - packet->buf_control = TRACE_CTRL_EXTERNAL; + nr = ssock->nextreadind; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); + + if (ssock->nextrlen == 0 || ssock->nextrlen > ENCAP_BUFSIZE) { + return -1; } - */ - packet->buf_control = TRACE_CTRL_EXTERNAL; + if (available < ssock->nextrlen) { + if (copy_tmp_buffer(ssock, tmpbuf, ssock->nextrlen) == 0) { + return 0; + } + usebuf = tmpbuf; + } + + if (packet->buf_control != TRACE_CTRL_PACKET || !packet->buffer) { + packet->buf_control = TRACE_CTRL_PACKET; + packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE); + } packet->trace = libtrace; - packet->buffer = ssock->nextread; - packet->header = ssock->nextread; + memcpy(packet->buffer, usebuf, ssock->nextrlen); + packet->header = packet->buffer; packet->type = TRACE_RT_DATA_ERF; + packet->error = ssock->nextrlen; erfptr = (dag_record_t *)packet->header; @@ -767,59 +1138,88 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, } } - /* Update upstream drops using lctr */ + rt->received_packets ++; + ssock->total_recordcount += 1; - if (erfptr->type == TYPE_DSM_COLOR_ETH) { - /* TODO */ - } else { - if (rt->received_packets > 0) { - rt->dropped_upstream += ntohs(erfptr->lctr); - } + consume_streamsock_data(ssock, ssock->nextrlen); + + rlen = ssock->nextrlen; + + ssock->nextts = 0; + ssock->reccount ++; + ssock->nextrlen = 0; + + if (ssock->reccount >= ssock->expectedreccount) { + ssock->expectedreccount = 0; + ssock->reccount = 0; + ssock->rectype = 0; } - rt->received_packets ++; - ssock->recordcount += 1; + packet->order = erf_get_erf_timestamp(packet); + packet->cached.link_type = erf_get_link_type(packet); + return rlen; +} + +static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, + recvstream_t *restrict rt, + streamsock_t *restrict ssock, + libtrace_packet_t *restrict packet) { + + + corsaro_tagged_packet_header_t *taghdr; + char tmpbuf[ENCAP_BUFSIZE * 2]; + int nr, rlen; + unsigned int available; + char *usebuf = ssock->nextread; nr = ssock->nextreadind; - encaphdr = (ndag_encap_t *)(ssock->saved[nr] + - sizeof(ndag_common_t)); - - ndag_reccount = ntohs(encaphdr->recordcount); - if ((ndag_reccount & 0x8000) != 0) { - /* Record was truncated -- update rlen appropriately */ - rlen = ssock->savedsize[nr] - - (ssock->nextread - ssock->saved[nr]); - erfptr->rlen = htons(rlen); - } else { - rlen = ntohs(erfptr->rlen); - } - ssock->nextread += rlen; - ssock->nextts = 0; + available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); - if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) { - trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the " - "nDAG receive buffer, probably due to a invalid rlen, in ndag_prepare_packet_stream()"); - return -1; - } + if (ssock->nextrlen == 0 || ssock->nextrlen > ENCAP_BUFSIZE) { + return -1; + } - if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) { - /* Read everything from this buffer, mark as empty and - * move on. */ - ssock->savedsize[nr] = 0; - ssock->bufwaiting ++; - - nr ++; - if (nr == ENCAP_BUFFERS) { - nr = 0; + if (available < ssock->nextrlen) { + if (copy_tmp_buffer(ssock, tmpbuf, ssock->nextrlen) == 0) { + return 0; } - ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) + - sizeof(ndag_encap_t); - ssock->nextreadind = nr; + usebuf = tmpbuf; + } + + if (packet->buf_control != TRACE_CTRL_PACKET || !packet->buffer) { + packet->buf_control = TRACE_CTRL_PACKET; + packet->buffer = malloc(LIBTRACE_PACKET_BUFSIZE); + } + packet->trace = libtrace; + memcpy(packet->buffer, usebuf, ssock->nextrlen); + packet->header = packet->buffer; + packet->type = TRACE_RT_DATA_CORSARO_TAGGED; + packet->error = ssock->nextrlen; + + taghdr = (corsaro_tagged_packet_header_t *)packet->header; + + packet->payload = &(taghdr->tags); + packet->order = ((uint64_t) ntohl(taghdr->ts_sec)) << 32; + packet->order += (((uint64_t) ntohl(taghdr->ts_usec)) << 32) / 1000000; + packet->cached.link_type = TRACE_TYPE_CORSAROTAG; + + rt->received_packets ++; + ssock->total_recordcount += 1; + + consume_streamsock_data(ssock, ssock->nextrlen); + + rlen = ssock->nextrlen; + + ssock->nextts = 0; + ssock->reccount ++; + ssock->nextrlen = 0; + + if (ssock->reccount >= ssock->expectedreccount) { + ssock->expectedreccount = 0; + ssock->reccount = 0; + ssock->rectype = 0; } - packet->order = erf_get_erf_timestamp(packet); - packet->error = rlen; - packet->cached.link_type = erf_get_link_type(packet); return rlen; } @@ -829,12 +1229,12 @@ static int ndag_prepare_packet_stream(libtrace_t *restrict libtrace, libtrace_packet_t *restrict packet, uint32_t flags UNUSED) { - if (ssock->rectype[ssock->nextreadind] == NDAG_PKT_ENCAPERF) { + if (ssock->rectype == NDAG_PKT_ENCAPERF) { return ndag_prepare_packet_stream_encaperf(libtrace, rt, ssock, packet); } - if (ssock->rectype[ssock->nextreadind] == NDAG_PKT_CORSAROTAG) { + if (ssock->rectype == NDAG_PKT_CORSAROTAG) { return ndag_prepare_packet_stream_corsarotag(libtrace, rt, ssock, packet); } @@ -870,24 +1270,58 @@ static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) { mon->monitorid = monid; mon->laststart = 0; - rt->monitorcount ++; - return mon; + rt->monitorcount ++; + return mon; +} + +static void realign_sources(recvstream_t *rt) { + + /* remove any "dead" socket entries in a source list, so + * we can reclaim the memory and not just continuously grow + * when sources come and go. + */ + + streamsock_t *newsrcs; + int i, newcnt = 0; + + newsrcs = calloc(rt->sourcecount, sizeof(streamsock_t)); + + for (i = 0; i < rt->sourcecount; i++) { + streamsock_t *src = &(rt->sources[i]); + if (src->sock == -1) { + free_streamsock_data(src); + continue; + } + memcpy(&(newsrcs[newcnt]), &(rt->sources[i]), + sizeof(streamsock_t)); + newcnt ++; + } + + /* If we haven't removed any, then we'll need to extend the + * source array to fit in the new streamsock. + */ + if (newcnt == rt->sourcecount) { + newsrcs = (streamsock_t *)realloc(rt->sources, + sizeof(streamsock_t) * (rt->sourcecount + 10)); + } + + free(rt->sources); + rt->sources = newsrcs; + rt->sourcecount = newcnt; } -static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { +static int add_new_streamsock(libtrace_t *libtrace, + recvstream_t *rt, streamsource_t src, uint8_t socktype) { streamsock_t *ssock = NULL; ndag_monitor_t *mon = NULL; int i; + char portstr[16]; - /* TODO consider replacing this with a list or vector so we can - * easily remove sources that are no longer in use, rather than - * just setting the sock to -1 and having to check them every - * time we want to read a packet. - */ if (rt->sourcecount == 0) { rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10); } else if ((rt->sourcecount % 10) == 0) { + realign_sources(rt); rt->sources = (streamsock_t *)realloc(rt->sources, sizeof(streamsock_t) * (rt->sourcecount + 10)); } @@ -905,6 +1339,7 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { mon = add_new_knownmonitor(rt, src.monitor); } + ssock->socktype = socktype; ssock->port = src.port; ssock->groupaddr = src.groupaddr; ssock->expectedseq = 0; @@ -914,14 +1349,29 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { ssock->bufwaiting = 0; ssock->startidle = 0; ssock->nextts = 0; + ssock->nextrlen = 0; + ssock->reccount = 0; + ssock->expectedreccount = 0; + ssock->rectype = 0; for (i = 0; i < ENCAP_BUFFERS; i++) { ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE); ssock->savedsize[i] = 0; } + ssock->nextread = ssock->saved[0]; + ssock->nextreadind = 0; + ssock->nextwriteind = 0; + ssock->total_recordcount = 0; - ssock->sock = join_multicast_group(src.groupaddr, src.localiface, - NULL, src.port, &(ssock->srcaddr)); + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + ssock->sock = join_multicast_group(src.groupaddr, + src.localiface, NULL, src.port, + &(ssock->srcaddr)); + } else { + snprintf(portstr, 16, "%u", src.port); + ssock->sock = accept_ndagtcp_connection(libtrace, + src.groupaddr, portstr); + } if (ssock->sock < 0) { return -1; @@ -932,33 +1382,35 @@ static int add_new_streamsock(recvstream_t *rt, streamsource_t src) { } #if HAVE_DECL_RECVMMSG - for (i = 0; i < RECV_BATCH_SIZE; i++) { - ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *) - malloc(sizeof(struct iovec)); - ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr; - ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen; - ssock->mmsgbufs[i].msg_hdr.msg_control = NULL; - ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0; - ssock->mmsgbufs[i].msg_hdr.msg_flags = 0; - ssock->mmsgbufs[i].msg_len = 0; + if (ssock->socktype == NDAG_SOCKET_TYPE_MULTICAST) { + assert(ssock->srcaddr != NULL); + for (i = 0; i < RECV_BATCH_SIZE; i++) { + ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *) + malloc(sizeof(struct iovec)); + ssock->mmsgbufs[i].msg_hdr.msg_name = + ssock->srcaddr->ai_addr; + ssock->mmsgbufs[i].msg_hdr.msg_namelen = + ssock->srcaddr->ai_addrlen; + ssock->mmsgbufs[i].msg_hdr.msg_control = NULL; + ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0; + ssock->mmsgbufs[i].msg_hdr.msg_flags = 0; + ssock->mmsgbufs[i].msg_len = 0; + } + } else { + memset(ssock->mmsgbufs, 0, + sizeof(struct mmsghdr) * RECV_BATCH_SIZE); } -#else - ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec)); #endif + memset(&(ssock->singlemsg), 0, sizeof(ssock->singlemsg)); + ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec)); - ssock->nextread = NULL;; - ssock->nextreadind = 0; - ssock->nextwriteind = 0; - ssock->recordcount = 0; rt->sourcecount += 1; - fprintf(stderr, "Added new stream %s:%u to thread %d\n", - ssock->groupaddr, ssock->port, rt->threadindex); - return ssock->port; } -static int receiver_read_messages(recvstream_t *rt) { +static int receiver_read_messages(libtrace_t *libtrace, + recvstream_t *rt, uint8_t socktype) { ndag_internal_message_t msg; @@ -966,12 +1418,13 @@ static int receiver_read_messages(recvstream_t *rt) { (void *)&msg) != LIBTRACE_MQ_FAILED) { switch(msg.type) { case NDAG_CLIENT_NEWGROUP: - if (add_new_streamsock(rt, msg.contents) < 0) { - return -1; + if (add_new_streamsock(libtrace, rt, + msg.contents, socktype) < 0) { + return READ_ERROR; } break; case NDAG_CLIENT_HALT: - return 0; + return READ_EOF; } } return 1; @@ -980,32 +1433,15 @@ static int receiver_read_messages(recvstream_t *rt) { static inline int readable_data(streamsock_t *ssock) { - if (ssock->sock == -1) { + if (ssock->bufavail == ENCAP_BUFFERS) { return 0; } + if (ssock->savedsize[ssock->nextreadind] == 0) { return 0; } - /* - if (ssock->nextread - ssock->saved[ssock->nextreadind] >= - ssock->savedsize[ssock->nextreadind]) { - return 0; - } - */ - return 1; - - -} - -static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) { - - int i; - for (i = 0; i < rt->sourcecount; i++) { - if (rt->sources[i].monitorptr == mon) { - rt->sources[i].expectedseq = 0; - } - } + return 1; } static int init_receivers(streamsock_t *ssock, int required) { @@ -1013,8 +1449,16 @@ static int init_receivers(streamsock_t *ssock, int required) { int wind = ssock->nextwriteind; int i = 1; + if (required <= 0) { + fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n"); + return TRACE_ERR_INIT_FAILED; + } #if HAVE_DECL_RECVMMSG for (i = 0; i < required; i++) { + if (ssock->socktype != NDAG_SOCKET_TYPE_MULTICAST) { + i = 1; + break; + } if (i >= RECV_BATCH_SIZE) { break; } @@ -1024,183 +1468,122 @@ static int init_receivers(streamsock_t *ssock, int required) { } ssock->mmsgbufs[i].msg_len = 0; - ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind]; + ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = + ssock->saved[wind]; ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE; ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1; wind ++; } -#else - if (required <= 0) { - fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n"); - return TRACE_ERR_INIT_FAILED; - } +#endif ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind]; ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE; ssock->singlemsg.msg_iovlen = 1; -#endif return i; } -static int check_ndag_received(streamsock_t *ssock, int index, - unsigned int msglen, recvstream_t *rt) { - - ndag_encap_t *encaphdr; - ndag_monitor_t *mon; - uint8_t rectype; +static int is_buffered_data_available(streamsock_t *ssock, struct timeval *tv, + int *gottime) { - /* Check that we have a valid nDAG encap record */ - rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen); + int toret = 0; + if (readable_data(ssock)) { + toret = 1; + } + if (!(*gottime)) { + gettimeofday(tv, NULL); + *gottime = 1; + } + if (ssock->startidle == 0) { + ssock->startidle = tv->tv_sec; + } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { + fprintf(stderr, + "Closing channel %s:%u due to inactivity.\n", + ssock->groupaddr, + ssock->port); - if (rectype == NDAG_PKT_KEEPALIVE) { - /* Keep-alive, reset startidle and carry on. Don't - * change nextwrite -- we want to overwrite the - * keep-alive with usable content. */ - return 0; - } else if (rectype != NDAG_PKT_ENCAPERF && - rectype != NDAG_PKT_CORSAROTAG) { - fprintf(stderr, "Received invalid record on the channel for %s:%u.\n", - ssock->groupaddr, ssock->port); close(ssock->sock); ssock->sock = -1; - return -1; - } - - ssock->rectype[index] = rectype; - ssock->savedsize[index] = msglen; - ssock->nextwriteind ++; - ssock->bufavail --; - - if (ssock->bufavail < 0) { - fprintf(stderr, "No space in buffer in check_ndag_received()\n"); - return -1; - } - if (ssock->nextwriteind >= ENCAP_BUFFERS) { - ssock->nextwriteind = 0; } + return toret; +} - /* Get the useful info from the encap header */ - encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t)); - - mon = ssock->monitorptr; - - if (mon->laststart == 0) { - mon->laststart = bswap_be_to_host64(encaphdr->started); - } else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) { - mon->laststart = bswap_be_to_host64(encaphdr->started); - reset_expected_seqs(rt, mon); - - /* TODO what is a good way to indicate this to clients? - * set the loss counter in the ERF header? a bit rude? - * use another bit in the ERF header? - * add a queryable flag to libtrace_packet_t? - */ +#if HAVE_DECL_RECVMMSG +static int receive_from_single_socket_recvmmsg(streamsock_t *ssock, + struct timeval *tv, int *gottime) { - } + int ret, avail; + int i; - if (ssock->expectedseq != 0) { - rt->missing_records += seq_cmp( - ntohl(encaphdr->seqno), ssock->expectedseq); + avail = init_receivers(ssock, ssock->bufavail); - } - ssock->expectedseq = ntohl(encaphdr->seqno) + 1; - if (ssock->expectedseq == 0) { - ssock->expectedseq ++; + ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail, + MSG_DONTWAIT, NULL); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return is_buffered_data_available(ssock, tv, gottime); + } + fprintf(stderr, + "Error receiving encapsulated records from %s:%u -- %s \n", + ssock->groupaddr, ssock->port, + strerror(errno)); + close(ssock->sock); + ssock->sock = -1; + return 0; } - if (ssock->nextread == NULL) { - /* If this is our first read, set up 'nextread' - * by skipping past the nDAG headers */ - ssock->nextread = ssock->saved[0] + - sizeof(ndag_common_t) + sizeof(ndag_encap_t); + ssock->startidle = 0; + for (i = 0; i < ret; i++) { + ssock->savedsize[ssock->nextwriteind] = + ssock->mmsgbufs[i].msg_len; + ssock->nextwriteind = (ssock->nextwriteind + 1) % ENCAP_BUFFERS; + ssock->bufavail --; } return 1; - } +#endif static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, - int *gottime, recvstream_t *rt) { - - int ret, ndagstat, avail; - int toret = 0; + int *gottime) { -#if HAVE_DECL_RECVMMSG - int i; -#endif + int ret, avail; avail = init_receivers(ssock, ssock->bufavail); -#if HAVE_DECL_RECVMMSG - ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail, - MSG_DONTWAIT, NULL); -#else if (avail != 1) { return 0; } ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT); -#endif - if (ret < 0) { - /* Nothing to receive right now, but we should still - * count as 'ready' if at least one buffer is full */ - if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (readable_data(ssock)) { - toret = 1; - } - if (!(*gottime)) { - gettimeofday(tv, NULL); - *gottime = 1; - } - if (ssock->startidle == 0) { - ssock->startidle = tv->tv_sec; - } else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) { - fprintf(stderr, - "Closing channel %s:%u due to inactivity.\n", - ssock->groupaddr, - ssock->port); - - close(ssock->sock); - ssock->sock = -1; - } + if (ret == 0 || (ret == -1 && (errno == EAGAIN || + errno == EWOULDBLOCK))) { + if (ret == 0 && ssock->sock != -1) { + close(ssock->sock); + ssock->sock = -1; + } + if (is_buffered_data_available(ssock, tv, gottime)) { + return 1; } else { - - fprintf(stderr, + return 0; + } + } else if (ret == -1) { + fprintf(stderr, "Error receiving encapsulated records from %s:%u -- %s \n", ssock->groupaddr, ssock->port, strerror(errno)); - close(ssock->sock); - ssock->sock = -1; - } - return toret; + close(ssock->sock); + ssock->sock = -1; + return 0; } ssock->startidle = 0; -#if HAVE_DECL_RECVMMSG - for (i = 0; i < ret; i++) { - ndagstat = check_ndag_received(ssock, ssock->nextwriteind, - ssock->mmsgbufs[i].msg_len, rt); - if (ndagstat == -1) { - break; - } - - if (ndagstat == 1) { - toret = 1; - } - } -#else - ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt); - if (ndagstat <= 0) { - toret = 0; - } else { - toret = 1; - } -#endif - - return toret; + ssock->savedsize[ssock->nextwriteind] = ret; + ssock->nextwriteind = (ssock->nextwriteind + 1) % ENCAP_BUFFERS; + ssock->bufavail --; + return 1; } -static int receive_from_sockets(recvstream_t *rt) { +static int receive_from_sockets(recvstream_t *rt, uint8_t socktype) { int i, readybufs, gottime; struct timeval tv; @@ -1212,9 +1595,11 @@ static int receive_from_sockets(recvstream_t *rt) { gottime = 0; if (rt->maxfd == -1) { - return 0; + return READ_MESSAGE; } + FD_ZERO(&fds); + for (i = 0; i < rt->sourcecount; i++) { if (rt->sources[i].sock == -1) { continue; @@ -1222,19 +1607,18 @@ static int receive_from_sockets(recvstream_t *rt) { #if HAVE_DECL_RECVMMSG /* Plenty of full buffers, just use the packets in those */ - if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) { - readybufs ++; - continue; + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) { + readybufs ++; + continue; + } } -#else +#endif if (rt->sources[i].bufavail == 0) { readybufs ++; continue; } -#endif - if (maxfd == 0) { - FD_ZERO(&fds); - } + FD_SET(rt->sources[i].sock, &fds); if (maxfd < rt->sources[i].sock) { maxfd = rt->sources[i].sock; @@ -1242,26 +1626,36 @@ static int receive_from_sockets(recvstream_t *rt) { } - if (maxfd <= 0) { - return readybufs; - } + if (maxfd > 0) { - zerotv.tv_sec = 0; - zerotv.tv_usec = 0; - if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) { - /* log the error? XXX */ - return -1; + zerotv.tv_sec = 0; + zerotv.tv_usec = 0; + if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) { + /* log the error? XXX */ + fprintf(stderr, + "select() failed for recvstream %d: %s\n", + rt->threadindex, strerror(errno)); + return READ_ERROR; + } } for (i = 0; i < rt->sourcecount; i++) { - if (!FD_ISSET(rt->sources[i].sock, &fds)) { + if (rt->sources[i].sock == -1 || + !FD_ISSET(rt->sources[i].sock, &fds)) { if (rt->sources[i].bufavail < ENCAP_BUFFERS) { readybufs ++; } continue; } - readybufs += receive_from_single_socket(&(rt->sources[i]), - &tv, &gottime, rt); +#if HAVE_DECL_RECVMMSG + if (socktype == NDAG_SOCKET_TYPE_MULTICAST) { + readybufs += receive_from_single_socket_recvmmsg( + &(rt->sources[i]), &tv, &gottime); + continue; + } +#endif + readybufs += receive_from_single_socket( + &(rt->sources[i]), &tv, &gottime); } return readybufs; @@ -1286,7 +1680,8 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, } /* Check for any messages from the control thread */ - iserr = receiver_read_messages(rt); + iserr = receiver_read_messages(libtrace, rt, + FORMAT_DATA->socktype); if (iserr <= 0) { return iserr; @@ -1300,7 +1695,8 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, continue; } - if ((iserr = receive_from_sockets(rt)) < 0) { + if ((iserr = receive_from_sockets(rt, + FORMAT_DATA->socktype)) == READ_ERROR) { return iserr; } else if (iserr > 0) { /* At least one of our input sockets has available @@ -1309,9 +1705,8 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, } /* if we have access to the message queue check for a message - * otherwise we need to return and let libtrace check for a message */ - if ((msg && libtrace_message_queue_count(msg) > 0) || !msg) { + if ((msg && libtrace_message_queue_count(msg) > 0)) { return READ_MESSAGE; } @@ -1322,8 +1717,11 @@ static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt, usleep(100); } - } while (1); + } while (!rt->halted); + if (rt->halted) { + return 0; + } return iserr; } @@ -1347,62 +1745,82 @@ static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt return 0; } - return receive_from_sockets(rt); + return receive_from_sockets(rt, FORMAT_DATA->socktype); } static streamsock_t *select_next_packet(recvstream_t *rt) { - int i; + int i, r; streamsock_t *ssock = NULL; uint64_t earliest = 0; uint64_t currentts = 0; - dag_record_t *daghdr; - - /* If we only have one source, then no need to do any - * timestamp parsing or byteswapping. - */ - if (rt->sourcecount == 1) { - if (readable_data(&(rt->sources[0]))) { - return &(rt->sources[0]); - } - return NULL; - } - for (i = 0; i < rt->sourcecount; i ++) { if (!readable_data(&(rt->sources[i]))) { continue; } + if (rt->sources[i].rectype == 0) { + r = process_ndag_encap_headers(&(rt->sources[i]), rt); + if (r < 0) { + return NULL; + } + if (r == 0) { + continue; + } + } + if (rt->sources[i].nextts == 0) { - daghdr = (dag_record_t *)(rt->sources[i].nextread); - currentts = bswap_le_to_host64(daghdr->ts); - rt->sources[i].nextts = currentts; - } else { - currentts = rt->sources[i].nextts; + if (rt->sources[i].rectype == NDAG_PKT_ENCAPERF) { + r = process_ndag_erf_headers( + &(rt->sources[i]), rt); + } else if (rt->sources[i].rectype == + NDAG_PKT_CORSAROTAG) { + r = process_ndag_corsaro_header( + &(rt->sources[i]), rt); + } else { + r = 0; + } + + if (r < 0) { + return NULL; + } + if (r == 0) { + continue; + } } + assert(rt->sources[i].nextts != 0); + currentts = rt->sources[i].nextts; if (earliest == 0 || earliest > currentts) { earliest = currentts; ssock = &(rt->sources[i]); } } - return ssock; + if (ssock && got_complete_packet(ssock)) { + return ssock; + } + return NULL; } static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { int rem, ret; streamsock_t *nextavail = NULL; + + if (FORMAT_DATA->receivers[0].halted) { + return 0; + } + rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]), packet, NULL); - if (rem <= 0) { + if (rem == READ_ERROR || rem == READ_EOF) { return rem; } nextavail = select_next_packet(&(FORMAT_DATA->receivers[0])); if (nextavail == NULL) { - return 0; + return READ_MESSAGE; } /* nextread should point at an ERF header, so prepare 'packet' to be @@ -1420,15 +1838,19 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t **packets, size_t nb_packets) { recvstream_t *rt; - int rem, i; + int rem, i, r; size_t read_packets = 0; streamsock_t *nextavail = NULL; rt = (recvstream_t *)t->format_data; + if (rt->halted) { + return 0; + } + do { /* Only check for messages once per batch */ - if (read_packets == 0) { + if (nextavail == NULL) { rem = receive_encap_records_block(libtrace, rt, packets[read_packets], &t->messages); if (rem < 0) { @@ -1439,21 +1861,31 @@ static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t, break; } } - nextavail = select_next_packet(rt); if (nextavail == NULL) { - break; + continue; } - ndag_prepare_packet_stream(libtrace, rt, nextavail, + if ((r = ndag_prepare_packet_stream(libtrace, rt, nextavail, packets[read_packets], - TRACE_PREP_DO_NOT_OWN_BUFFER); + TRACE_PREP_DO_NOT_OWN_BUFFER)) < 0) { + return -1; + } + + if (r == 0) { + nextavail = NULL; + continue; + } read_packets ++; if (read_packets >= nb_packets) { break; } - } while (1); + } while (!rt->halted); + + if (rt->halted) { + return 0; + } for (i = 0; i < rt->sourcecount; i++) { streamsock_t *src = &(rt->sources[i]); @@ -1477,8 +1909,13 @@ static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace, int rem, i; streamsock_t *nextavail = NULL; + if (FORMAT_DATA->receivers[0].halted) { + event.type = TRACE_EVENT_TERMINATE; + return event; + } /* Only check for messages once per call */ - rem = receiver_read_messages(&(FORMAT_DATA->receivers[0])); + rem = receiver_read_messages(libtrace, &(FORMAT_DATA->receivers[0]), + FORMAT_DATA->socktype); if (rem <= 0) { event.type = TRACE_EVENT_TERMINATE; return event; @@ -1774,6 +2211,62 @@ static struct libtrace_format_t ndag = { ndag_get_thread_stats /* per-thread stats */ }; +static struct libtrace_format_t ndagtcp = { + + "ndagtcp", + "", + TRACE_FORMAT_NDAG_TCP, + NULL, /* probe filename */ + NULL, /* probe magic */ + ndag_init_input, /* init_input */ + ndag_config_input, /* config_input */ + ndagtcp_start_input, /* start_input */ + ndag_pause_input, /* pause_input */ + NULL, /* init_output */ + NULL, /* config_output */ + NULL, /* start_output */ + ndag_fin_input, /* fin_input */ + NULL, /* fin_output */ + ndag_read_packet, /* read_packet */ + ndag_prepare_packet, /* prepare_packet */ + NULL, /* fin_packet */ + NULL, /* can_hold_packet */ + NULL, /* write_packet */ + NULL, /* flush_output */ + ndag_get_link_type, /* get_link_type */ + ndag_get_direction, /* get_direction */ + ndag_set_direction, /* set_direction */ + ndag_get_erf_timestamp, /* get_erf_timestamp */ + NULL, /* get_timeval */ + NULL, /* get_seconds */ + NULL, /* get_timespec */ + NULL, /* get_meta_section */ + NULL, /* seek_erf */ + NULL, /* seek_timeval */ + NULL, /* seek_seconds */ + ndag_get_capture_length,/* get_capture_length */ + ndag_get_wire_length, /* get_wire_length */ + ndag_get_framing_length,/* get_framing_length */ + ndag_set_capture_length,/* set_capture_length */ + NULL, /* get_received_packets */ + NULL, /* get_filtered_packets */ + NULL, /* get_dropped_packets */ + ndag_get_statistics, /* get_statistics */ + NULL, /* get_fd */ + trace_event_ndag, /* trace_event */ + NULL, /* help */ + NULL, /* next pointer */ + {true, 0}, /* live packet capture */ + ndagtcp_pstart_input, /* parallel start */ + ndag_pread_packets, /* parallel read */ + ndag_pause_input, /* parallel pause */ + NULL, + ndag_pregister_thread, /* register thread */ + NULL, + ndag_get_thread_stats /* per-thread stats */ +}; + void ndag_constructor(void) { register_format(&ndag); + register_format(&ndagtcp); } diff --git a/lib/format_pcapfile.c b/lib/format_pcapfile.c index 518b43147..67bb9d867 100644 --- a/lib/format_pcapfile.c +++ b/lib/format_pcapfile.c @@ -138,6 +138,8 @@ static int pcapfile_probe_magic(io_t *io) static int pcapfile_init_input(libtrace_t *libtrace) { + pcapfile_header_t *pcaphdr; + libtrace->format_data = malloc(sizeof(struct pcapfile_format_data_t)); if (!libtrace->format_data) { trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for " @@ -147,6 +149,16 @@ static int pcapfile_init_input(libtrace_t *libtrace) { IN_OPTIONS.real_time = 0; DATA(libtrace)->started = false; + + /* set defaults to support "dummy" trace instances */ + pcaphdr = &(DATA(libtrace)->header); + pcaphdr->magic_number = 0xa1b2c3d4; + pcaphdr->version_major = 2; + pcaphdr->version_minor = 4; + pcaphdr->thiszone = 0; + pcaphdr->sigfigs = 0; + pcaphdr->snaplen = 65536; + pcaphdr->network = TRACE_DLT_EN10MB; return 0; } diff --git a/lib/libtrace.h.in b/lib/libtrace.h.in index 4873bba39..407e3dde6 100644 --- a/lib/libtrace.h.in +++ b/lib/libtrace.h.in @@ -464,6 +464,7 @@ enum base_format_t { TRACE_FORMAT_PFRINGOLD =26, TRACE_FORMAT_PFRING =27, TRACE_FORMAT_ETSIFILE =28, /**< ETSI LI in a binary file */ + TRACE_FORMAT_NDAG_TCP =29, /**< DAG unicast over a network */ }; /** RT protocol packet types */ @@ -554,6 +555,8 @@ typedef enum { TRACE_RT_DLT_PFLOG =TRACE_RT_DATA_DLT+TRACE_DLT_PFLOG, /** RT is encapsulating a PCAP capture record with an AAL5 linktype */ TRACE_RT_DLT_ATM_RFC1483 =TRACE_RT_DATA_DLT+TRACE_DLT_ATM_RFC1483, + /** RT is encapsulating a PCAP capture record with a Raw IP linkttype */ + TRACE_RT_DLT_RAW =TRACE_RT_DATA_DLT+TRACE_DLT_RAW, /** Unused value marking the end of the valid range for PCAP RT * encapsulation */ TRACE_RT_DATA_DLT_END = 2999, diff --git a/libpacketdump/Makefile.am b/libpacketdump/Makefile.am index cb3dd3f10..41c667a2b 100644 --- a/libpacketdump/Makefile.am +++ b/libpacketdump/Makefile.am @@ -198,7 +198,7 @@ AM_CPPFLAGS= @ADD_INCLS@ -I../lib # a shared library. libpacketdump_la_LIBADD = @LIBPKTDUMP_LIBS@ libpacketdump_la_LDFLAGS=\ - -version-info 5:7:0 \ + -version-info 5:8:0 \ @ADD_LDFLAGS@ AM_CXXFLAGS=-g -Wall -DDIRNAME=\"$(plugindir)\" $(AM_CPPFLAGS) diff --git a/libpacketdump/link_22.c b/libpacketdump/link_22.c index 394438f4d..6ec0dea44 100644 --- a/libpacketdump/link_22.c +++ b/libpacketdump/link_22.c @@ -65,7 +65,6 @@ DLLEXPORT void decode(int link_type UNUSED, const char *packet, unsigned len) { printf(" "); } printf("%s: ...\n", namesp); - wandder_free_etsili_decoder(dec); if (ident == WANDDER_IRI_CONTENT_IP) { decode_next((const char *)iricontents, rem, "eth", ((*iricontents) & 0xf0) == 0x40 ? @@ -75,6 +74,7 @@ DLLEXPORT void decode(int link_type UNUSED, const char *packet, unsigned len) { decode_next((const char *)iricontents, rem, "udp", 5060); } + wandder_free_etsili_decoder(dec); } return; diff --git a/rpm/libtrace4.spec b/rpm/libtrace4.spec index 8b30d0d7a..11e8935f7 100644 --- a/rpm/libtrace4.spec +++ b/rpm/libtrace4.spec @@ -1,5 +1,5 @@ Name: libtrace4 -Version: 4.0.22 +Version: 4.0.23 Release: 1%{?dist} Summary: C Library for capturing and analysing network packets @@ -127,6 +127,9 @@ find $RPM_BUILD_ROOT -name '*.la' -exec rm -f {} ';' %changelog +* Fri Nov 10 2023 Shane Alcock - 4.0.23-1 +- Updated for 4.0.23 release + * Wed Jun 14 2023 Shane Alcock - 4.0.22-1 - Updated for 4.0.22 release diff --git a/tools/tracemcast/Makefile.am b/tools/tracemcast/Makefile.am index 02cfd3c39..3f72261c5 100644 --- a/tools/tracemcast/Makefile.am +++ b/tools/tracemcast/Makefile.am @@ -1,6 +1,8 @@ -bin_PROGRAMS = tracemcast -man_MANS = tracemcast.1 +bin_PROGRAMS = tracemcast traceucast +man_MANS = tracemcast.1 traceucast.1 EXTRA_DIST = $(man_MANS) include ../Makefile.tools tracemcast_SOURCES = tracemcast.c + +traceucast_SOURCEs = traceucast.c diff --git a/tools/tracemcast/tracemcast.1 b/tools/tracemcast/tracemcast.1 index 1cdb70d33..215fcf7c2 100644 --- a/tools/tracemcast/tracemcast.1 +++ b/tools/tracemcast/tracemcast.1 @@ -64,7 +64,7 @@ https://github.com/LibtraceTeam/libtrace/wiki libtrace(3), tracesplit(1), tracesplit_dir(1), tracefilter(1), traceconvert(1), tracereport(1), tracertstats(1), tracestats(1), tracepktdump(1), traceanon(1), tracesummary(1), tracereplay(1), -tracediff(1), traceends(1), tracetopends(1), tracemerge(1) +tracediff(1), traceends(1), tracetopends(1), tracemerge(1), traceucast(1) .SH AUTHORS Shane Alcock diff --git a/tools/tracemcast/tracemcast.c b/tools/tracemcast/tracemcast.c index b5ff85e31..1a64950e2 100644 --- a/tools/tracemcast/tracemcast.c +++ b/tools/tracemcast/tracemcast.c @@ -382,6 +382,7 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, rdata->encaphdr->recordcount = 0; rdata->reccount = 0; + rdata->seqno ++; } if (rem > gparams->mtu - (rdata->writeptr - rdata->pbuffer) diff --git a/tools/tracemcast/traceucast.1 b/tools/tracemcast/traceucast.1 new file mode 100644 index 000000000..7e244472f --- /dev/null +++ b/tools/tracemcast/traceucast.1 @@ -0,0 +1,63 @@ +.TH TRACEUCAST "1" "Oct 2023" "traceucast (libtrace)" "User Commands" +.SH +traceucast \- stream captured packets to a libtrace client +.SH SYNOPSIS +.B traceucast +[ \-m ] +[ \-f ] +[ \-c ] +[ \-p ] +[ \-t ] +inputuri +.SH DESCRIPTION +traceucast reads packets from a single live packet source (e.g. an interface +or hardware capture card) and then emits those packets to a specified +libtrace client program that can process the packets off-site. + +This allows the redirection of packets captured on one host to another. +Unlike tracemcast, traceucast uses TCP to transfer packets, so the packet +stream is reliable and ordered. The downside is that traceucast can only +maintain a connection to a single receiving client at a time. + +The streaming protocol is the nDAG protocol, which libtrace supports as +an input format, so libtrace programs can natively receive packets from a +traceucast instance without any additional modifications. + +.TP +\fB\-m\fR +set a unique identifier that will be included in the nDAG header. This is used +by the recipient to tell which traceucast instance emitted the packet. + +.TP +\fB\-f\fR +only emit packets that match this BPF filter expression. + +.TP +\fB\-c\fR
+transmit captured packets to this client address. The client should be +a libtrace program that is using an ndagtcp: input URI. + +.TP +\fB\-p\fR +send nDAG beacon messages to this port number. + +.TP +\fB\-t\fR +sets the number of threads to use for streaming packets. +Each thread will produce a separate stream on a unique port. + +.SH LINKS +More details about traceucast (and libtrace) can be found at +https://github.com/LibtraceTeam/libtrace/wiki + +.SH SEE ALSO +libtrace(3), tracesplit(1), tracesplit_dir(1), tracefilter(1), +traceconvert(1), tracereport(1), tracertstats(1), tracestats(1), +tracepktdump(1), traceanon(1), tracesummary(1), tracereplay(1), +tracediff(1), traceends(1), tracetopends(1), tracemerge(1), +tracemcast(1) + +.SH AUTHORS +Shane Alcock + + diff --git a/tools/tracemcast/traceucast.c b/tools/tracemcast/traceucast.c new file mode 100644 index 000000000..0b115a9fc --- /dev/null +++ b/tools/tracemcast/traceucast.c @@ -0,0 +1,755 @@ +/* + * + * Copyright (c) 2023 Shane Alcock. + * All rights reserved. + * + * This file is part of libtrace. + * + * Libtrace was originally developed by the University of Waikato WAND + * research group. For further information please see http://www.wand.net.nz/ + * + * libtrace is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * libtrace is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + * + * + */ + +/* Author: Shane Alcock, SearchLight */ + +/* Given a single live capture input, e.g. 'ring:' or 'dpdk:', this tool + * will re-transmit the packets received across a network to a listening + * host. The resulting traffic stream matches the expected format + * for an 'ndagtcp:' client, so you can use libtrace to receive the + * packets and process them as if you had captured from the source + * directly. + * + * Effectively, this tool is intended to provide a means of pushing packets + * from a capture source to a secondary client so that you can run libtrace + * tools and programs on a remote host. Unlike tracemcast, traceucast uses + * TCP to ensure the packets reach their destination but, as a result, can + * only support a single recipient. + * + * Inspired by (and borrowing somewhat from) the DAG multicaster tool that + * I developed for the STARDUST project. The DAG multicaster is optimised + * for use with a DAG card only. It is highly recommended if you are using a + * DAG card for your initial capture *and* your use case is academic and + * non-commercial. + * + * traceucast is generalised for use with other live capture formats and + * therefore loses some of the optimizations that come from being DAG-specific. + * It is also licensed under the LGPL, so can be used for commercial purposes + * (provided the terms of the LGPL are met). + */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "lib/format_erf.h" +#include "lib/format_ndag.h" +#include "lib/lt_bswap.h" + +#ifndef HAVE_DAG_API +#include "lib/dagformat.h" +#endif + +#include "lib/libtrace_int.h" + +struct libtrace_t *currenttrace = NULL; + +struct global_params { + + uint16_t monitorid ; + char *clientaddr ; + uint64_t starttime; + uint16_t firstport; + int readercount; +}; + +struct beacon_params { + uint16_t beaconport; + struct global_params *gparams; + uint32_t frequency; +}; + +typedef struct read_thread_data { + int threadid; + uint16_t streamport; + int streamfd; + + uint8_t *pbuffer; + ndag_encap_t *encaphdr; + uint8_t *writeptr; + uint32_t seqno; + uint16_t reccount; + struct addrinfo *target; + uint32_t lastsend; + + bool livesource; + uint8_t failed; + +} read_thread_data_t; + +#define MAX_PACKET_SIZE 10000 + +volatile int halted = 0; + +static void cleanup_signal(int signal UNUSED) { + if (currenttrace) { + trace_pstop(currenttrace); + } + halted = 1; +} + +static int create_stream_socket(uint16_t port, char *clientaddr, + struct addrinfo **targetinfo, uint8_t block) { + + struct addrinfo hints; + struct addrinfo *gotten; + char portstr[16]; + int sock; + int bufsize, reuse=1; + + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + hints.ai_protocol = 0; + + snprintf(portstr, 15, "%u", port); + + if (getaddrinfo(clientaddr, portstr, &hints, &gotten) != 0) { + fprintf(stderr, + "traceucast: Call to getaddrinfo failed for %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + return -1; + } + if (targetinfo) { + *targetinfo = gotten; + } + + sock = socket(gotten->ai_family, gotten->ai_socktype, 0); + if (sock < 0) { + fprintf(stderr, + "traceucast: Failed to create TCP socket for %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + goto sockcreateover; + } + + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) + < 0) { + + fprintf(stderr, "traceucast: Failed to configure socket for %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + + close(sock); + sock = -1; + goto sockcreateover; + } + + bufsize = 32 * 1024 * 1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, + (socklen_t)sizeof(int)) != 0) { + fprintf(stderr, + "traceucast: Failed to increase buffer size on streaming interface %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + close(sock); + sock = -1; + goto sockcreateover; + } + + + while (!halted) { + if (connect(sock, gotten->ai_addr, gotten->ai_addrlen) == -1) { + if (errno == ECONNREFUSED) { + if (block) { + sleep(1); + continue; + } else { + close(sock); + sock = 0; + break; + } + } + fprintf(stderr, + "traceucast: Failed to connect to %s:%s -- %s\n", + clientaddr, portstr, strerror(errno)); + close(sock); + sock = -1; + break; + } else { + fprintf(stderr, "traceucast connected to %s:%s\n", clientaddr, + portstr); + break; + } + } + +sockcreateover: + if (targetinfo == NULL) { + freeaddrinfo(gotten); + } + return sock; +} + +static inline char *fill_common_header(char *bufstart, uint16_t monitorid, + uint8_t pkttype) { + + ndag_common_t *hdr = (ndag_common_t *)bufstart; + + hdr->magic = htonl(NDAG_MAGIC_NUMBER); + hdr->version = NDAG_EXPORT_VERSION; + hdr->type = pkttype; + hdr->monitorid = htons(monitorid); + + return bufstart + sizeof(ndag_common_t); +} + +static void *init_reader_thread(libtrace_t *trace, + libtrace_thread_t *t, void *global) { + + read_thread_data_t *rdata = NULL; + struct global_params *gparams = (struct global_params *)global; + libtrace_info_t *info = trace_get_information(trace); + + rdata = calloc(1, sizeof(read_thread_data_t)); + if (info) { + rdata->livesource = info->live; + } else { + rdata->livesource = false; + } + rdata->threadid = trace_get_perpkt_thread_id(t); + rdata->streamport = gparams->firstport + rdata->threadid; + rdata->streamfd = -1; + rdata->pbuffer = calloc(MAX_PACKET_SIZE, sizeof(uint8_t)); + rdata->writeptr = rdata->pbuffer; + rdata->seqno = 1; + rdata->target = NULL; + rdata->lastsend = 0; + rdata->encaphdr = NULL; + rdata->reccount = 0; + rdata->failed = 0; + + rdata->streamfd = -1; + + return rdata; +} + +static int send_ndag_packet(read_thread_data_t *rdata) { + + int s; + int rem = (rdata->writeptr - rdata->pbuffer); + int sentsofar = 0; + int ret = 0; + int attempts = 0; + int backoff = 5000; + + rdata->encaphdr->recordcount = ntohs(rdata->reccount); + + while (rem > 0) { + s = send(rdata->streamfd, rdata->pbuffer + sentsofar, rem, MSG_DONTWAIT); + + if (s < 0) { + if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 20) { + attempts ++; + usleep(backoff); + backoff = backoff * 2; + if (backoff > 1000000) { + backoff = 1000000; + } + continue; + } + fprintf(stderr, "traceucast: thread %d failed to send streamed ERF packet: %s\n", + rdata->threadid, strerror(errno)); + fprintf(stderr, "%u\n", rdata->seqno); + ret = -1; + break; + } + + sentsofar += s; + rem -= s; + } + + rdata->writeptr = rdata->pbuffer; + rdata->encaphdr = NULL; + rdata->reccount = 0; + return ret; +} + +static void halt_reader_thread(libtrace_t *trace UNUSED, + libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { + + read_thread_data_t *rdata = (read_thread_data_t *)tls; + + if (rdata->writeptr > rdata->pbuffer) { + send_ndag_packet(rdata); + } + + if (rdata->pbuffer) { + free(rdata->pbuffer); + } + if (rdata->target) { + freeaddrinfo(rdata->target); + } + if (rdata->streamfd != -1) { + close(rdata->streamfd); + } + free(rdata); +} + +static uint16_t construct_erf_header(read_thread_data_t *rdata, + libtrace_packet_t *packet, libtrace_linktype_t ltype, uint32_t rem, + uint64_t erfts) { + + uint16_t framing = 0; + dag_record_t *drec = (dag_record_t *)(rdata->writeptr); + + drec->ts = bswap_host_to_le64(erfts); + + if (ltype == TRACE_TYPE_ETH) { + drec->type = TYPE_ETH; + } else if (ltype == TRACE_TYPE_NONE) { + drec->type = TYPE_IPV4; // sorry if you're using IPv6 raw */ + } else { + drec->type = 255; + } + + if (drec->type == TYPE_ETH) { + framing = dag_record_size + 2; + } else { + framing = dag_record_size; + } + drec->rlen = htons(rem + framing); + drec->wlen = htons(trace_get_wire_length(packet)); + drec->lctr = htons(0); + memset(&(drec->flags), 0, sizeof(drec->flags)); + + if (trace_get_direction(packet) != TRACE_DIR_UNKNOWN) { + drec->flags.iface = trace_get_direction(packet); + } else { + drec->flags.iface = 0; + } + + return framing; +} + +static void tick_reader_thread(libtrace_t *trace UNUSED, + libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls, + uint64_t order) { + + read_thread_data_t *rdata = (read_thread_data_t *)tls; + + if (rdata->writeptr > rdata->pbuffer && + (order >> 32) >= rdata->lastsend + 3) { + + if (send_ndag_packet(rdata) < 0) { + rdata->failed = 1; + } + rdata->lastsend = (order >> 32); + } +} + +static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, + libtrace_thread_t *t UNUSED, void *global, void *tls, + libtrace_packet_t *packet) { + + read_thread_data_t *rdata = (read_thread_data_t *)tls; + struct global_params *gparams = (struct global_params *)global; + libtrace_linktype_t ltype; + uint32_t rem; + void *l2; + uint64_t erfts; + + if (rdata->failed) { + trace_interrupt(); + return packet; + } + + if (IS_LIBTRACE_META_PACKET(packet)) { + return packet; + } + + if (rdata->streamfd == -1) { + int fd; + uint8_t block; + + if (rdata->livesource) { + block = 0; + } else { + block = 1; + } + fd = create_stream_socket(rdata->streamport, + gparams->clientaddr, &(rdata->target), block); + + if (fd == 0) { + return packet; + } + if (fd == -1) { + fprintf(stderr, "traceucast: failed to create TCP socket for reader thread %d\n", rdata->threadid); + trace_interrupt(); + return packet; + + } else if (rdata->target == NULL) { + fprintf(stderr, "traceucast: failed to get addrinfo for reader socket %d\n", rdata->threadid); + close(rdata->streamfd); + rdata->streamfd = -1; + trace_interrupt(); + return packet; + } + rdata->streamfd = fd; + } + + /* first, check if there is going to be space in the buffer for this + * packet + an ERF header */ + l2 = trace_get_layer2(packet, <ype, &rem); + erfts = trace_get_erf_timestamp(packet); + + if (MAX_PACKET_SIZE - (rdata->writeptr - rdata->pbuffer) < + rem + dag_record_size) { + + /* if not and if there is already something in the buffer, send it then + * create a new one. + */ + if (rdata->writeptr > rdata->pbuffer + sizeof(ndag_common_t) + + sizeof(ndag_encap_t)) { + + if (send_ndag_packet(rdata) < 0) { + rdata->failed = 1; + return packet; + } + rdata->lastsend = (erfts >> 32); + } + } + + /* append this packet to the buffer (truncate if necessary) */ + + /* if the buffer is empty, put on a common and encap header on the + * front, before adding any packets */ + if (rdata->writeptr == rdata->pbuffer) { + rdata->encaphdr = (ndag_encap_t *)(fill_common_header( + (char *)rdata->writeptr, + gparams->monitorid, NDAG_PKT_ENCAPERF)); + rdata->writeptr = ((uint8_t *)rdata->encaphdr) + sizeof(ndag_encap_t); + + rdata->encaphdr->started = gparams->starttime; + rdata->encaphdr->seqno = htonl(rdata->seqno); + rdata->encaphdr->streamid = htons(rdata->threadid); + rdata->encaphdr->recordcount = 0; + + rdata->reccount = 0; + rdata->seqno ++; + } + + /* put an ERF header in at writeptr */ + rdata->writeptr += construct_erf_header(rdata, packet, ltype, rem, erfts); + + /* copy packet contents into writeptr */ + memcpy(rdata->writeptr, l2, rem); + rdata->writeptr += rem; + rdata->reccount ++; + + /* if the buffer is close to full, just send the buffer anyway */ + if (MAX_PACKET_SIZE - (rdata->writeptr - rdata->pbuffer) - + (dag_record_size + 2) < 64) { + if (send_ndag_packet(rdata) < 0) { + rdata->failed = 1; + } + rdata->lastsend = (erfts >> 32); + } + + return packet; +} + +static void start_libtrace_reader(struct global_params *gparams, char *uri, + char *filterstring) { + + + libtrace_filter_t *filt = NULL; + libtrace_callback_set_t *pktcbs = NULL; + + currenttrace = trace_create(uri); + if (trace_is_err(currenttrace)) { + trace_perror(currenttrace, "trace_create"); + goto failmode; + } + + trace_set_perpkt_threads(currenttrace, gparams->readercount); + + pktcbs = trace_create_callback_set(); + trace_set_starting_cb(pktcbs, init_reader_thread); + trace_set_stopping_cb(pktcbs, halt_reader_thread); + trace_set_packet_cb(pktcbs, packet_reader_thread); + trace_set_tick_interval_cb(pktcbs, tick_reader_thread); + + if (trace_get_information(currenttrace)->live) { + trace_set_tick_interval(currenttrace, 1000); + } else { + trace_set_tracetime(currenttrace, true); + } + + if (filterstring) { + filt = trace_create_filter(filterstring); + + if (trace_config(currenttrace, TRACE_OPTION_FILTER, filt) < 0) { + trace_perror(currenttrace, "Failed to configure filter"); + goto failmode; + } + } + + + if (trace_pstart(currenttrace, gparams, pktcbs, NULL) == -1) { + trace_perror(currenttrace, "Failed to start trace"); + goto failmode; + } + + trace_join(currenttrace); + + if (trace_is_err(currenttrace)) { + trace_perror(currenttrace, "Reading packets"); + } + +failmode: + if (filt) { + trace_destroy_filter(filt); + } + if (currenttrace) { + trace_destroy(currenttrace); + } + if (pktcbs) { + trace_destroy_callback_set(pktcbs); + } + +} + + +static uint32_t form_beacon(char **buffer, struct beacon_params *bparams) { + + uint32_t bsize = sizeof(ndag_common_t) + (sizeof(uint16_t) * + (bparams->gparams->readercount + 1)); + char *bptr; + uint16_t *next; + int i; + + if (bsize > MAX_PACKET_SIZE) { + fprintf(stderr, "traceucast: beacon is too large to fit in a single datagram, either increase MTU or reduce number of threads\n"); + return 0; + } + + bptr = (char *)malloc(bsize); + next = (uint16_t *)(fill_common_header(bptr, bparams->gparams->monitorid, + NDAG_PKT_BEACON)); + + *next = htons(bparams->gparams->readercount); + next ++; + + for (i = 0; i < bparams->gparams->readercount; i++) { + *next = htons(bparams->gparams->firstport + (i)); + next ++; + } + + *buffer = bptr; + return bsize; +} + +static void *beaconer_thread(void *tdata) { + + struct beacon_params *bparams = (struct beacon_params *)tdata; + int sock; + char *beaconpacket = NULL; + uint32_t beaconsize; + struct addrinfo *targetinfo = NULL; + + sock = create_stream_socket(bparams->beaconport, + bparams->gparams->clientaddr, &targetinfo, 1); + + if (sock == -1) { + fprintf(stderr, "traceucast: failed to create TCP socket for beaconer thread\n"); + halted = 1; + } else if (targetinfo == NULL) { + fprintf(stderr, "traceucast: failed to get addrinfo for beaconer socket\n"); + halted = 1; + } + + beaconsize = form_beacon(&beaconpacket, bparams); + + if (beaconsize <= 0 || beaconpacket == NULL) { + halted = 1; + } + + while (!halted) { + if (send(sock, beaconpacket, beaconsize, 0) != beaconsize) { + fprintf(stderr, "traceucast: failed to send a beacon packet: %s\n", + strerror(errno)); + break; + } + usleep(1000 * bparams->frequency); + } + + if (beaconpacket) { + free(beaconpacket); + } + if (targetinfo) { + free(targetinfo); + } + if (sock >= 0) { + close(sock); + } + + pthread_exit(NULL); + +} + +static void usage(char *prog) { + fprintf(stderr, "Usage:\n" + "%s [ options ] libtraceURI\n\n", prog); + fprintf(stderr, "Options:\n" + " -f --filter=bpffilter Only emit packets that match this BPF filter\n" + " -m --monitorid=idnum Tag all streamed packets with the given identifier\n" + " -c --clientaddr=address Connect to a ndagtcp receiver at this address/hostname\n" + " -p --beaconport=port Send beacons to the receiver on this port number\n" + " -t --threads=count Use this number of packet processing threads\n" + " -h --help Show this usage statement\n"); +} + + +int main(int argc, char *argv[]) { + struct sigaction sigact; + char *filterstring = NULL; + uint16_t beaconport = 9999; + struct global_params gparams; + struct beacon_params bparams; + int threads = 1; + struct timeval tv; + pthread_t beacontid = 0; + sigset_t sig_before, sig_block_all; + + gparams.monitorid = 0; + gparams.clientaddr = NULL; + + while (1) { + int optindex; + struct option long_options[] = { + { "filter", 1, 0, 'f' }, + { "monitorid", 1, 0, 'm' }, + { "clientaddr", 1, 0, 'c' }, + { "beaconport", 1, 0, 'p' }, + { "threads", 1, 0, 't' }, + { "help", 0, 0, 'h' }, + { NULL, 0, 0, 0 }, + }; + + int c = getopt_long(argc, argv, "t:f:m:c:p:h", long_options, + &optindex); + if (c == -1) { + break; + } + + switch (c) { + case 'f': + filterstring = optarg; + break; + case 'm': + gparams.monitorid = (uint16_t)strtoul(optarg, NULL, 0); + break; + case 'c': + gparams.clientaddr = optarg; + break; + case 'p': + beaconport = (uint16_t)strtoul(optarg, NULL, 0); + break; + case 't': + threads = (int)strtoul(optarg, NULL, 0); + break; + case 'h': + default: + usage(argv[0]); + return 1; + } + } + + if (optind >= argc) { + usage(argv[0]); + fprintf(stderr, + "traceucast: No URI specified as an input source. Exiting\n"); + return 1; + } + + sigact.sa_handler = cleanup_signal; + sigemptyset(&sigact.sa_mask); + sigact.sa_flags = SA_RESTART; + + sigaction(SIGINT, &sigact, NULL); + sigaction(SIGTERM, &sigact, NULL); + + if (gparams.clientaddr == NULL) { + fprintf(stderr, + "traceucast: no client address specified to receive our streams. Exiting\n"); + return 1; + } + + gettimeofday(&tv, NULL); + gparams.starttime = bswap_host_to_le64(((tv.tv_sec - 1509494400) * 1000) + + (tv.tv_usec / 1000.0)); + gparams.readercount = threads; + + gparams.firstport = 10000 + (rand() % 52000); + + fprintf(stderr, "Streaming %s to %s:%u \n", + argv[optind], gparams.clientaddr, beaconport); + fprintf(stderr, "Monitor ID is set to %u\n", gparams.monitorid); + + /* Start up the beaconing */ + bparams.beaconport = beaconport; + bparams.gparams = &(gparams); + bparams.frequency = 1000; + + sigemptyset(&sig_block_all); + if (pthread_sigmask(SIG_SETMASK, &sig_block_all, &sig_before) < 0) { + fprintf(stderr, "Unable to disable signals before starting beaconer.\n"); + goto endmcast; + } + + if (pthread_create(&beacontid, NULL, beaconer_thread, &bparams) != 0) { + fprintf(stderr, "Error while creating beaconer thread: %s", + strerror(errno)); + goto endmcast; + } + + if (pthread_sigmask(SIG_SETMASK, &sig_before, NULL)) { + fprintf(stderr, "Unable to re-enable signals after beaconer creation.\n"); + goto endmcast; + } + + start_libtrace_reader(&gparams, argv[optind], filterstring); + +endmcast: + halted = 1; + + if (beacontid != 0) { + pthread_join(beacontid, NULL); + } + + return 0; +} + +// vim: set sw=4 tabstop=4 softtabstop=4 expandtab :