diff --git a/src/Defs.cpp b/src/Defs.cpp index 5f602ec5..d5d205a0 100644 --- a/src/Defs.cpp +++ b/src/Defs.cpp @@ -42,12 +42,27 @@ TicksTime g_cycleStartTime; debug_level_t g_debug_level = LOG_LVL_INFO; #ifdef USING_VMA_EXTRA_API -unsigned char* g_pkt_buf = NULL; -struct vma_packets_t* g_pkts = NULL; -unsigned int g_pkt_index = 0; -unsigned int g_pkt_offset = 0; struct vma_buff_t* g_vma_poll_buff = NULL; struct vma_completion_t* g_vma_comps; + +ZeroCopyData::ZeroCopyData(): + m_pkt_buf(NULL), + m_pkts(NULL), + m_pkt_index(0), + m_pkt_offset(0) { + +}; + +void ZeroCopyData::allocate() { + m_pkt_buf = (unsigned char *)MALLOC(Message::getMaxSize()); +} + +ZeroCopyData::~ZeroCopyData() { + if (m_pkt_buf) + FREE(m_pkt_buf); +} + +zeroCopyMap g_zeroCopyData; #endif diff --git a/src/Defs.h b/src/Defs.h index d189069e..80955375 100644 --- a/src/Defs.h +++ b/src/Defs.h @@ -90,6 +90,7 @@ typedef uint16_t in_port_t; #include #include /* sockets*/ #include +#include #include "Ticks.h" #include "Message.h" @@ -392,13 +393,24 @@ extern TicksTime g_cycleStartTime; extern debug_level_t g_debug_level; -#ifdef USING_VMA_EXTRA_API -extern unsigned char* g_pkt_buf; -extern struct vma_packets_t* g_pkts; -extern unsigned int g_pkt_index; -extern unsigned int g_pkt_offset; +#ifdef USING_VMA_EXTRA_API + extern struct vma_buff_t* g_vma_poll_buff; extern struct vma_completion_t* g_vma_comps; + +class ZeroCopyData { +public: + ZeroCopyData(); + void allocate(); + ~ZeroCopyData(); + unsigned char* m_pkt_buf; + struct vma_packets_t* m_pkts; + unsigned int m_pkt_index; + unsigned int m_pkt_offset; +}; +// map from fd to zeroCopyData +typedef std::map zeroCopyMap; +extern zeroCopyMap g_zeroCopyData; #endif class Message; diff --git a/src/Server.h b/src/Server.h index efc72d7d..a95876ed 100644 --- a/src/Server.h +++ b/src/Server.h @@ -124,11 +124,13 @@ void close_ifd(int fd,int ifd,fds_data* l_fds_ifd){ fds_data* l_next_fd = g_fds_array[fd]; #ifdef USING_VMA_EXTRA_API - if (g_pkts) { - g_vma_api->free_packets(fd, g_pkts->pkts, g_pkts->n_packet_num); - g_pkts = NULL; - g_pkt_index = 0; - g_pkt_offset = 0; + ZeroCopyData *z_ptr = g_zeroCopyData[fd]; + if (z_ptr && z_ptr->m_pkts) { + g_vma_api->free_packets(fd, z_ptr->m_pkts->pkts, + z_ptr->m_pkts->n_packet_num); + z_ptr->m_pkts = NULL; + z_ptr->m_pkt_index = 0; + z_ptr->m_pkt_offset = 0; } if (g_vma_api) { diff --git a/src/SockPerf.cpp b/src/SockPerf.cpp index 07364292..42146b5f 100644 --- a/src/SockPerf.cpp +++ b/src/SockPerf.cpp @@ -2220,9 +2220,11 @@ void cleanup() if(s_user_params.select_timeout) { FREE(s_user_params.select_timeout); } -#ifdef USING_VMA_EXTRA_API - if (g_pkt_buf) { - FREE(g_pkt_buf); +#ifdef USING_VMA_EXTRA_API + if (g_vma_api && s_user_params.is_vmazcopyread) { + for (int i = s_fd_min; i < s_fd_max; i++) { + delete g_zeroCopyData[i]; + } } #endif @@ -2377,7 +2379,8 @@ vma_recv_callback_retval_t myapp_vma_recv_pkt_filter_callback( } //If there is data in local buffer, then push new packet in TCP queue.Otherwise handle received packet inside callback. - if (g_pkts && g_pkts->n_packet_num > 0) { + if (g_zeroCopyData[fd] && g_zeroCopyData[fd]->m_pkts && + g_zeroCopyData[fd]->m_pkts->n_packet_num > 0) { return VMA_PACKET_RECV; } @@ -3289,12 +3292,6 @@ int bringup(const int *p_daemonize) int _max_buff_size = _max(s_user_params.msg_size + 1, _vma_pkts_desc_size); _max_buff_size = _max(_max_buff_size, MAX_PAYLOAD_SIZE); -#ifdef USING_VMA_EXTRA_API - if (s_user_params.is_vmazcopyread && g_vma_api){ - g_pkt_buf = (unsigned char*)MALLOC(_max_buff_size); - } -#endif - int64_t cycleDurationNsec = NSEC_IN_SEC * s_user_params.burst_size / s_user_params.mps; if (s_user_params.mps == UINT32_MAX) { // MAX MPS mode @@ -3346,7 +3343,14 @@ void do_test() info.fd_min = s_fd_min; info.fd_max = s_fd_max; info.fd_num = s_fd_num; - +#ifdef USING_VMA_EXTRA_API + if (g_vma_api && s_user_params.is_vmazcopyread) { + for (int i = s_fd_min; i < s_fd_max; i++) { + g_zeroCopyData[i] = new ZeroCopyData(); + g_zeroCopyData[i]->allocate(); + } + } +#endif switch (s_user_params.mode) { case MODE_CLIENT: client_handler(&info); diff --git a/src/common.h b/src/common.h index f3b4782c..c3ffad63 100644 --- a/src/common.h +++ b/src/common.h @@ -54,66 +54,81 @@ static inline int msg_recvfrom(int fd, uint8_t* buf, int nbytes, struct sockaddr { int ret = 0; socklen_t size = sizeof(struct sockaddr_in); - int flags = 0; + int flags = 0; -#ifdef USING_VMA_EXTRA_API +#ifdef USING_VMA_EXTRA_API int remain_buffer, data_to_copy; uint8_t* start_addrs; struct vma_packet_t *pkt; + ZeroCopyData *z_ptr = g_zeroCopyData[fd]; - if (g_pApp->m_const_params.is_vmazcopyread && g_vma_api) { + if (z_ptr && g_pApp->m_const_params.is_vmazcopyread) { remain_buffer = nbytes; // Receive held data, and free VMA's previously received zero copied packets - if (g_pkts && g_pkts->n_packet_num > 0) { + if (z_ptr->m_pkts && z_ptr->m_pkts->n_packet_num > 0) { - pkt = &g_pkts->pkts[0]; + pkt = &z_ptr->m_pkts->pkts[0]; - while(g_pkt_index < pkt->sz_iov) { + while(z_ptr->m_pkt_index < pkt->sz_iov) { start_addrs = buf + (nbytes - remain_buffer); - data_to_copy = _min(remain_buffer, (int)(pkt->iov[g_pkt_index].iov_len - g_pkt_offset)); - memcpy(start_addrs, (uint8_t*)pkt->iov[g_pkt_index].iov_base + g_pkt_offset, data_to_copy); + data_to_copy = _min(remain_buffer, + (int)(pkt->iov[z_ptr->m_pkt_index].iov_len - + z_ptr->m_pkt_offset)); + memcpy(start_addrs, + (uint8_t*)pkt->iov[z_ptr->m_pkt_index].iov_base + + z_ptr->m_pkt_offset, data_to_copy); remain_buffer -= data_to_copy; - g_pkt_offset += data_to_copy; + z_ptr->m_pkt_offset += data_to_copy; //Handled buffer is filled - if (g_pkt_offset < pkt->iov[g_pkt_index].iov_len) return nbytes; + if (z_ptr->m_pkt_offset < pkt->iov[z_ptr->m_pkt_index].iov_len) + return nbytes; - g_pkt_offset = 0; - g_pkt_index++; + z_ptr->m_pkt_offset = 0; + z_ptr->m_pkt_index++; } - g_vma_api->free_packets(fd, g_pkts->pkts, g_pkts->n_packet_num); - g_pkts = NULL; - g_pkt_index = 0; - g_pkt_offset = 0; + g_vma_api->free_packets(fd, z_ptr->m_pkts->pkts, + z_ptr->m_pkts->n_packet_num); + z_ptr->m_pkts = NULL; + z_ptr->m_pkt_index = 0; + z_ptr->m_pkt_offset = 0; //Handled buffer is filled if (remain_buffer == 0) return nbytes; } // Receive the next packet with zero copy API - ret = g_vma_api->recvfrom_zcopy(fd, g_pkt_buf, Message::getMaxSize(), &flags, (struct sockaddr*)recvfrom_addr, &size); + ret = g_vma_api->recvfrom_zcopy(fd, &z_ptr->m_pkt_buf, + Message::getMaxSize(), + &flags, (struct sockaddr*)recvfrom_addr, + &size); if (ret > 0) { - // Zcopy receive is perfomed + // Zcopy receive is performed if (flags & MSG_VMA_ZCOPY) { - g_pkts = (struct vma_packets_t*)g_pkt_buf; - if (g_pkts->n_packet_num > 0) { + z_ptr->m_pkts = (struct vma_packets_t*)&z_ptr->m_pkt_buf; + if (z_ptr->m_pkts->n_packet_num > 0) { - pkt = &g_pkts->pkts[0]; + pkt = &z_ptr->m_pkts->pkts[0]; - while(g_pkt_index < pkt->sz_iov) { + while(z_ptr->m_pkt_index < pkt->sz_iov) { start_addrs = buf + (nbytes - remain_buffer); - data_to_copy = _min(remain_buffer, (int)pkt->iov[g_pkt_index].iov_len); - memcpy(start_addrs, pkt->iov[g_pkt_index].iov_base, data_to_copy); + data_to_copy = + _min(remain_buffer, + (int)pkt->iov[z_ptr->m_pkt_index].iov_len); + memcpy(start_addrs, + pkt->iov[z_ptr->m_pkt_index].iov_base, + data_to_copy); remain_buffer -= data_to_copy; - g_pkt_offset += data_to_copy; + z_ptr->m_pkt_offset += data_to_copy; //Handled buffer is filled - if (g_pkt_offset < pkt->iov[g_pkt_index].iov_len) return nbytes; + if (z_ptr->m_pkt_offset < pkt->iov[z_ptr->m_pkt_index].iov_len) + return nbytes; - g_pkt_offset = 0; - g_pkt_index++; + z_ptr->m_pkt_offset = 0; + z_ptr->m_pkt_index++; } ret = nbytes-remain_buffer; } @@ -123,7 +138,7 @@ static inline int msg_recvfrom(int fd, uint8_t* buf, int nbytes, struct sockaddr } else { data_to_copy = _min(remain_buffer, ret); - memcpy(buf + (nbytes - remain_buffer), g_pkt_buf, data_to_copy); + memcpy(buf + (nbytes - remain_buffer), &z_ptr->m_pkt_buf[fd], data_to_copy); ret = nbytes - (remain_buffer - data_to_copy); } }