Skip to content

Commit

Permalink
fix multithread zero copy crash
Browse files Browse the repository at this point in the history
sockperf uses a global variable to call free_packet API.
This casued double free in VMA buffers and caused VMA to crash.
This fix adds a map from fd to it zero zopy data to prevent that.

Signed-off-by: Rafi Wiener <[email protected]>
  • Loading branch information
Rafi Wiener authored and igor-ivanov committed Jun 14, 2017
1 parent 4e34c0b commit 010af89
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 54 deletions.
23 changes: 19 additions & 4 deletions src/Defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,27 @@ TicksTime g_cycleStartTime;
debug_level_t g_debug_level = LOG_LVL_INFO;

#ifdef USING_VMA_EXTRA_API
unsigned char* g_pkt_buf = NULL;
struct vma_packets_t* g_pkts = NULL;
unsigned int g_pkt_index = 0;
unsigned int g_pkt_offset = 0;
struct vma_buff_t* g_vma_poll_buff = NULL;
struct vma_completion_t* g_vma_comps;

ZeroCopyData::ZeroCopyData():
m_pkt_buf(NULL),
m_pkts(NULL),
m_pkt_index(0),
m_pkt_offset(0) {

};

void ZeroCopyData::allocate() {
m_pkt_buf = (unsigned char *)MALLOC(Message::getMaxSize());
}

ZeroCopyData::~ZeroCopyData() {
if (m_pkt_buf)
FREE(m_pkt_buf);
}

zeroCopyMap g_zeroCopyData;
#endif


Expand Down
22 changes: 17 additions & 5 deletions src/Defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ typedef uint16_t in_port_t;
#include <fcntl.h>
#include <sys/types.h> /* sockets*/
#include <queue>
#include <map>

#include "Ticks.h"
#include "Message.h"
Expand Down Expand Up @@ -392,13 +393,24 @@ extern TicksTime g_cycleStartTime;

extern debug_level_t g_debug_level;

#ifdef USING_VMA_EXTRA_API
extern unsigned char* g_pkt_buf;
extern struct vma_packets_t* g_pkts;
extern unsigned int g_pkt_index;
extern unsigned int g_pkt_offset;
#ifdef USING_VMA_EXTRA_API

extern struct vma_buff_t* g_vma_poll_buff;
extern struct vma_completion_t* g_vma_comps;

class ZeroCopyData {
public:
ZeroCopyData();
void allocate();
~ZeroCopyData();
unsigned char* m_pkt_buf;
struct vma_packets_t* m_pkts;
unsigned int m_pkt_index;
unsigned int m_pkt_offset;
};
// map from fd to zeroCopyData
typedef std::map<int, ZeroCopyData *> zeroCopyMap;
extern zeroCopyMap g_zeroCopyData;
#endif

class Message;
Expand Down
12 changes: 7 additions & 5 deletions src/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ void close_ifd(int fd,int ifd,fds_data* l_fds_ifd){
fds_data* l_next_fd = g_fds_array[fd];

#ifdef USING_VMA_EXTRA_API
if (g_pkts) {
g_vma_api->free_packets(fd, g_pkts->pkts, g_pkts->n_packet_num);
g_pkts = NULL;
g_pkt_index = 0;
g_pkt_offset = 0;
ZeroCopyData *z_ptr = g_zeroCopyData[fd];
if (z_ptr && z_ptr->m_pkts) {
g_vma_api->free_packets(fd, z_ptr->m_pkts->pkts,
z_ptr->m_pkts->n_packet_num);
z_ptr->m_pkts = NULL;
z_ptr->m_pkt_index = 0;
z_ptr->m_pkt_offset = 0;
}

if (g_vma_api) {
Expand Down
26 changes: 15 additions & 11 deletions src/SockPerf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2220,9 +2220,11 @@ void cleanup()
if(s_user_params.select_timeout) {
FREE(s_user_params.select_timeout);
}
#ifdef USING_VMA_EXTRA_API
if (g_pkt_buf) {
FREE(g_pkt_buf);
#ifdef USING_VMA_EXTRA_API
if (g_vma_api && s_user_params.is_vmazcopyread) {
for (int i = s_fd_min; i < s_fd_max; i++) {
delete g_zeroCopyData[i];
}
}
#endif

Expand Down Expand Up @@ -2377,7 +2379,8 @@ vma_recv_callback_retval_t myapp_vma_recv_pkt_filter_callback(
}

//If there is data in local buffer, then push new packet in TCP queue.Otherwise handle received packet inside callback.
if (g_pkts && g_pkts->n_packet_num > 0) {
if (g_zeroCopyData[fd] && g_zeroCopyData[fd]->m_pkts &&
g_zeroCopyData[fd]->m_pkts->n_packet_num > 0) {
return VMA_PACKET_RECV;
}

Expand Down Expand Up @@ -3289,12 +3292,6 @@ int bringup(const int *p_daemonize)
int _max_buff_size = _max(s_user_params.msg_size + 1, _vma_pkts_desc_size);
_max_buff_size = _max(_max_buff_size, MAX_PAYLOAD_SIZE);

#ifdef USING_VMA_EXTRA_API
if (s_user_params.is_vmazcopyread && g_vma_api){
g_pkt_buf = (unsigned char*)MALLOC(_max_buff_size);
}
#endif

int64_t cycleDurationNsec = NSEC_IN_SEC * s_user_params.burst_size / s_user_params.mps;

if (s_user_params.mps == UINT32_MAX) { // MAX MPS mode
Expand Down Expand Up @@ -3346,7 +3343,14 @@ void do_test()
info.fd_min = s_fd_min;
info.fd_max = s_fd_max;
info.fd_num = s_fd_num;

#ifdef USING_VMA_EXTRA_API
if (g_vma_api && s_user_params.is_vmazcopyread) {
for (int i = s_fd_min; i < s_fd_max; i++) {
g_zeroCopyData[i] = new ZeroCopyData();
g_zeroCopyData[i]->allocate();
}
}
#endif
switch (s_user_params.mode) {
case MODE_CLIENT:
client_handler(&info);
Expand Down
73 changes: 44 additions & 29 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,66 +54,81 @@ static inline int msg_recvfrom(int fd, uint8_t* buf, int nbytes, struct sockaddr
{
int ret = 0;
socklen_t size = sizeof(struct sockaddr_in);
int flags = 0;
int flags = 0;

#ifdef USING_VMA_EXTRA_API
#ifdef USING_VMA_EXTRA_API
int remain_buffer, data_to_copy;
uint8_t* start_addrs;
struct vma_packet_t *pkt;
ZeroCopyData *z_ptr = g_zeroCopyData[fd];

if (g_pApp->m_const_params.is_vmazcopyread && g_vma_api) {
if (z_ptr && g_pApp->m_const_params.is_vmazcopyread) {
remain_buffer = nbytes;
// Receive held data, and free VMA's previously received zero copied packets
if (g_pkts && g_pkts->n_packet_num > 0) {
if (z_ptr->m_pkts && z_ptr->m_pkts->n_packet_num > 0) {

pkt = &g_pkts->pkts[0];
pkt = &z_ptr->m_pkts->pkts[0];

while(g_pkt_index < pkt->sz_iov) {
while(z_ptr->m_pkt_index < pkt->sz_iov) {
start_addrs = buf + (nbytes - remain_buffer);
data_to_copy = _min(remain_buffer, (int)(pkt->iov[g_pkt_index].iov_len - g_pkt_offset));
memcpy(start_addrs, (uint8_t*)pkt->iov[g_pkt_index].iov_base + g_pkt_offset, data_to_copy);
data_to_copy = _min(remain_buffer,
(int)(pkt->iov[z_ptr->m_pkt_index].iov_len -
z_ptr->m_pkt_offset));
memcpy(start_addrs,
(uint8_t*)pkt->iov[z_ptr->m_pkt_index].iov_base +
z_ptr->m_pkt_offset, data_to_copy);
remain_buffer -= data_to_copy;
g_pkt_offset += data_to_copy;
z_ptr->m_pkt_offset += data_to_copy;

//Handled buffer is filled
if (g_pkt_offset < pkt->iov[g_pkt_index].iov_len) return nbytes;
if (z_ptr->m_pkt_offset < pkt->iov[z_ptr->m_pkt_index].iov_len)
return nbytes;

g_pkt_offset = 0;
g_pkt_index++;
z_ptr->m_pkt_offset = 0;
z_ptr->m_pkt_index++;
}

g_vma_api->free_packets(fd, g_pkts->pkts, g_pkts->n_packet_num);
g_pkts = NULL;
g_pkt_index = 0;
g_pkt_offset = 0;
g_vma_api->free_packets(fd, z_ptr->m_pkts->pkts,
z_ptr->m_pkts->n_packet_num);
z_ptr->m_pkts = NULL;
z_ptr->m_pkt_index = 0;
z_ptr->m_pkt_offset = 0;

//Handled buffer is filled
if (remain_buffer == 0) return nbytes;
}

// Receive the next packet with zero copy API
ret = g_vma_api->recvfrom_zcopy(fd, g_pkt_buf, Message::getMaxSize(), &flags, (struct sockaddr*)recvfrom_addr, &size);
ret = g_vma_api->recvfrom_zcopy(fd, &z_ptr->m_pkt_buf,
Message::getMaxSize(),
&flags, (struct sockaddr*)recvfrom_addr,
&size);

if (ret > 0) {
// Zcopy receive is perfomed
// Zcopy receive is performed
if (flags & MSG_VMA_ZCOPY) {
g_pkts = (struct vma_packets_t*)g_pkt_buf;
if (g_pkts->n_packet_num > 0) {
z_ptr->m_pkts = (struct vma_packets_t*)&z_ptr->m_pkt_buf;
if (z_ptr->m_pkts->n_packet_num > 0) {

pkt = &g_pkts->pkts[0];
pkt = &z_ptr->m_pkts->pkts[0];

while(g_pkt_index < pkt->sz_iov) {
while(z_ptr->m_pkt_index < pkt->sz_iov) {
start_addrs = buf + (nbytes - remain_buffer);
data_to_copy = _min(remain_buffer, (int)pkt->iov[g_pkt_index].iov_len);
memcpy(start_addrs, pkt->iov[g_pkt_index].iov_base, data_to_copy);
data_to_copy =
_min(remain_buffer,
(int)pkt->iov[z_ptr->m_pkt_index].iov_len);
memcpy(start_addrs,
pkt->iov[z_ptr->m_pkt_index].iov_base,
data_to_copy);
remain_buffer -= data_to_copy;
g_pkt_offset += data_to_copy;
z_ptr->m_pkt_offset += data_to_copy;

//Handled buffer is filled
if (g_pkt_offset < pkt->iov[g_pkt_index].iov_len) return nbytes;
if (z_ptr->m_pkt_offset < pkt->iov[z_ptr->m_pkt_index].iov_len)
return nbytes;

g_pkt_offset = 0;
g_pkt_index++;
z_ptr->m_pkt_offset = 0;
z_ptr->m_pkt_index++;
}
ret = nbytes-remain_buffer;
}
Expand All @@ -123,7 +138,7 @@ static inline int msg_recvfrom(int fd, uint8_t* buf, int nbytes, struct sockaddr
}
else {
data_to_copy = _min(remain_buffer, ret);
memcpy(buf + (nbytes - remain_buffer), g_pkt_buf, data_to_copy);
memcpy(buf + (nbytes - remain_buffer), &z_ptr->m_pkt_buf[fd], data_to_copy);
ret = nbytes - (remain_buffer - data_to_copy);
}
}
Expand Down

0 comments on commit 010af89

Please sign in to comment.