diff --git a/examples_sw/apps/rdma_service_no_daemon/client/main.cpp b/examples_sw/apps/rdma_service_no_daemon/client/main.cpp new file mode 100644 index 00000000..0c9d7a9b --- /dev/null +++ b/examples_sw/apps/rdma_service_no_daemon/client/main.cpp @@ -0,0 +1,357 @@ +/** + * Copyright (c) 2021, Systems Group, ETH Zurich + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cLib.hpp" +#include "types.hpp" +#include "cThread.hpp" +#include "cBench.hpp" + +using namespace std; +using namespace fpga; + +/* Signal handler */ +std::atomic stalled(false); +void gotInt(int) { + stalled.store(true); +} + +// Runtime +constexpr auto const defDevice = 0; +constexpr auto const defTargetVfid = 0; + +constexpr auto const defOper = false; // read +constexpr auto const defMinSize = 1024; +constexpr auto const defMaxSize = 64 * 1024; +constexpr auto const defNRepsThr = 1000; +constexpr auto const defNRepsLat = 100; +constexpr auto const defVerbose = false; +constexpr auto const def_compression_required = false; +constexpr auto const def_encryption_required = false; +constexpr auto const def_dpi_required = false; + +int main(int argc, char *argv[]) +{ + + // ----------------------------------------------------------------------------------------------------------------------- + // Sig handler + // ----------------------------------------------------------------------------------------------------------------------- + + struct sigaction sa; + memset( &sa, 0, sizeof(sa) ); + sa.sa_handler = gotInt; + sigfillset(&sa.sa_mask); + sigaction(SIGINT,&sa,NULL); + + // ----------------------------------------------------------------------------------------------------------------------- + // ARGS + // ----------------------------------------------------------------------------------------------------------------------- + + // Generates the command-line printout and deals with reading in the user-defined arguments for running the experiments + boost::program_options::options_description programDescription("Options:"); + programDescription.add_options() + ("bitstream,b", boost::program_options::value(), "Shell bitstream") + ("device,d", boost::program_options::value(), "Target device") + ("vfid,i", boost::program_options::value(), "Target vFPGA") + ("tcpaddr,t", boost::program_options::value(), "TCP conn IP") + ("write,w", boost::program_options::value(), "Read(0)/Write(1)") + ("min_size,n", boost::program_options::value(), "Minimal transfer size") + ("max_size,x", boost::program_options::value(), "Maximum transfer size") + ("reps_thr,r", boost::program_options::value(), "Number of reps, throughput") + ("reps_lat,l", boost::program_options::value(), "Number of reps, latency") + ("verbose,v", boost::program_options::value(), "Printout of single messages") + ("encryption,e", boost::program_options::value(), "Encryption required") + ("compression,c", boost::program_options::value(), "Compression required") + ("dpi,d", boost::program_options::value(), "DPI required"); + + boost::program_options::variables_map commandLineArgs; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, programDescription), commandLineArgs); + boost::program_options::notify(commandLineArgs); + + // Set the default values to variables for further usage + string bstream_path = ""; + uint32_t cs_dev = defDevice; + uint32_t vfid = defTargetVfid; + string tcp_ip; + bool oper = defOper; + uint32_t min_size = defMinSize; + uint32_t max_size = defMaxSize; + uint32_t n_reps_thr = defNRepsThr; + uint32_t n_reps_lat = defNRepsLat; + bool verbose = defVerbose; + bool encryption_required = def_encryption_required; + bool compression_required = def_compression_required; + bool dpi_required = def_dpi_required; + + // Read the actual arguments from the command line and parse them to variables for further usage, for setting the experiment correctly + if(commandLineArgs.count("bitstream") > 0) { + bstream_path = commandLineArgs["bitstream"].as(); + + std::cout << std::endl << "Shell loading (path: " << bstream_path << ") ..." << std::endl; + cRnfg crnfg(cs_dev); + crnfg.shellReconfigure(bstream_path); + } + if(commandLineArgs.count("device") > 0) cs_dev = commandLineArgs["device"].as(); + if(commandLineArgs.count("vfid") > 0) vfid = commandLineArgs["vfid"].as(); + if(commandLineArgs.count("tcpaddr") > 0) { + tcp_ip = commandLineArgs["tcpaddr"].as(); + } else { + std::cout << "Provide the TCP/IP address of the server" << std::endl; + return (EXIT_FAILURE); + } + if(commandLineArgs.count("write") > 0) oper = commandLineArgs["write"].as(); + if(commandLineArgs.count("min_size") > 0) min_size = commandLineArgs["min_size"].as(); + if(commandLineArgs.count("max_size") > 0) max_size = commandLineArgs["max_size"].as(); + if(commandLineArgs.count("reps_thr") > 0) n_reps_thr = commandLineArgs["reps_thr"].as(); + if(commandLineArgs.count("reps_lat") > 0) n_reps_lat = commandLineArgs["reps_lat"].as(); + if(commandLineArgs.count("verbose") > 0) verbose = commandLineArgs["verbose"].as(); + if(commandLineArgs.count("encryption") > 0) encryption_required = commandLineArgs["encryption"].as(); + if(commandLineArgs.count("compression") > 0) compression_required = commandLineArgs["compression"].as(); + if(commandLineArgs.count("dpi") > 0) dpi_required = commandLineArgs["dpi"].as(); + + // ----------------------------------------------------------------------------------------------------------------------- + // RDMA client side + // ----------------------------------------------------------------------------------------------------------------------- + + // Get a thread for execution: Has the vFPGA-ID, host-process-ID of this calling process, and device number + # ifdef VERBOSE + std::cout << "rdma_client: Create the cThread-object for the RDMA-server-main-code" << std::endl; + std::cout << "rdma_client: Target-vfid: " << defTargetVfid << std::endl; + std::cout << "rdma_client: Current process ID: " << getpid() << std::endl; + # endif + cThread cthread(defTargetVfid, getpid(), cs_dev, nullptr, nullptr, encryption_required, compression_required, dpi_required); + + // Get memory in the max size of the experiment. Argument is a cs_alloc-struct: Huge Page, max size, is remote + // This operation attaches the buffer to the Thread, which is required for the cLib constructor for RDMA-capabilities + cthread.getMem({CoyoteAlloc::HPF, max_size, true}); + + // Connect to the RDMA server and run the task + + # ifdef VERBOSE + std::cout << "rdma_client: Create an instance of the cLib-class for exchange of QPs etc." << std::endl; + # endif + + // This instantiates the communication library cLib with the name of the socket, function-ID (?), the executing cthread, the target IP-address and the target port + // The constructor of the communication library also automatically does the meta-exchange of information in the beginning to connect the queue pairs from local and remote + cLib clib_rdma("/tmp/coyote-daemon-vfid-0-rdma", + fidRDMA, &cthread, tcp_ip.c_str(), defPort); + + // Issue the iTaks for exchange of experimental parameters + # ifdef VERBOSE + std::cout << "rdma_client: Issue the iTask for exchange of experimental parameters" << std::endl; + # endif + + // No iTask required anymore - the server has its own input of arguments + + // Benchmark the RDMA + + // SG entries + + // Create a Scatter-Gather-Entry, save it in memory - size of the rdmaSg + // How is this sg-element connected to the thread-attached buffer? Should be the vaddr, shouldn't it? + // There has to be a connection, since sg is handed over to the invoke-function, where the local_dest and offset is accessed + # ifdef VERBOSE + std::cout << "rdma_client: Create a sg-Entry for the RDMA-operation." << std::endl; + # endif + + sgEntry sg; + memset(&sg, 0, sizeof(rdmaSg)); + + // Set properties of the Scatter-Gather-Entry: Min-Size (size to start the experiment with), Stream Host as origin of data to be used for the RDMA-experiment + sg.rdma.len = min_size; + sg.rdma.local_stream = strmHost; + + // Get a hMem to write values into the payload of the RDMA-packets + uint64_t *hMem = (uint64_t*)(cthread.getQpair()->local.vaddr); + + // Set the Coyote Operation, which can either be a REMOTE_WRITE or a REMOTE_READ, depending on the settings for the experiment + CoyoteOper coper = oper ? CoyoteOper::REMOTE_RDMA_WRITE : CoyoteOper::REMOTE_RDMA_READ;; + + PR_HEADER("RDMA BENCHMARK"); + + // Iterate over the experiment size (for incrementing size up to defined maximum) + while(sg.rdma.len <= max_size) { + + // Sync + // Clear the registers that hold information about completed functions + # ifdef VERBOSE + std::cout << "rdma_client: Perform a clear Completed in cThread." << std::endl; + # endif + cthread.clearCompleted(); + # ifdef VERBOSE + std::cout << "rdma_client: Perform a connection sync in cThread." << std::endl; + # endif + // Initiate a sync between the remote nodes with handshaking via exchanged ACKs + cthread.connSync(true); + // Initialize a benchmark-object to precisely benchmark the RDMA-execution. Number of executions is set to 1 (no further repetitions on this level), no calibration required, no distribution required. + cBench bench(1); + + // Lambda-function for throughput-benchmarking + auto benchmark_thr = [&]() { + + // Reset of the hMem to achieve monotonically increasing payload numbers across a size-sweep + hMem[sg.rdma.len/8-1] = hMem[sg.rdma.len/16-1]; + + // Reset the old hMem-values from previous payloads to make it easier debuggable + hMem[sg.rdma.len/16-1] = 0; + + // For the desired number of repetitions per size, invoke the cThread-Function with the coyote-Operation + for(int i = 0; i < n_reps_thr; i++) { + # ifdef VERBOSE + std::cout << "rdma_client: invoke the operation " << std::endl; + # endif + + if(verbose) { + std::cout << "CLIENT: Sent out message #" << i << " at message-size " << sg.rdma.len << " with content " << hMem[sg.rdma.len/8-1] << std::endl; + } + + // Put the execution to sleep at the critical point of failure + if(hMem[sg.rdma.len/8-1] == 1413) { + // sleep(3); + } + + cthread.invoke(coper, &sg); + + // Increment the hMem-value + hMem[sg.rdma.len/8-1] = hMem[sg.rdma.len/8-1] + 1; + } + + // Check the number of completed RDMA-transactions, wait until all operations have been completed. Check for stalling in-between. + uint32_t number_of_completed_local_writes = 0; + while(cthread.checkCompleted(CoyoteOper::LOCAL_WRITE) < n_reps_thr) { + // Only print if there's an update in the number of received LOCAL WRITEs + if(number_of_completed_local_writes != cthread.checkCompleted(CoyoteOper::LOCAL_WRITE)) { + if(verbose) { + std::cout << "CLIENT: Received " << number_of_completed_local_writes << " LOCAL WRITES so far." << std::endl; + } + number_of_completed_local_writes = cthread.checkCompleted(CoyoteOper::LOCAL_WRITE); + } + + // stalled is an atomic boolean used for event-handling (?) that would indicate a stalled operation + if( stalled.load() ) throw std::runtime_error("Stalled, SIGINT caught"); + } + }; + + // Execution of the throughput-lambda-function through the benchmarking-function to get timing + bench.runtime(benchmark_thr); + + // Generate the required output based on the statistical data from the benchmarking tool + std::cout << std::fixed << std::setprecision(2); + std::cout << std::setw(8) << sg.rdma.len << " [bytes], throughput: " + << std::setw(8) << ((1 + oper) * ((1000 * sg.rdma.len ))) / ((bench.getAvg()) / n_reps_thr) << " [MB/s], latency: "; + + // Sync - reset the completion counter from the thread, sync-up via ACK-handshakes + # ifdef VERBOSE + std::cout << "rdma_client: Perform a clear Completed in cThread." << std::endl; + # endif + cthread.clearCompleted(); + # ifdef VERBOSE + std::cout << "rdma_client: Perform a connection sync in cThread." << std::endl; + # endif + cthread.connSync(true); + + // Lambda-function for latency-benchmarking + auto benchmark_lat = [&]() { + // Different than before: Issue one single command via invoke, then wait for its completion (ping-pong-scheme) + // Repeated for the number of desired repetitions + for(int i = 0; i < n_reps_lat; i++) { + # ifdef VERBOSE + std::cout << "rdma_client: invoke the operation " << std::endl; + # endif + + // Increment the hMem-value + hMem[sg.rdma.len/8-1] = hMem[sg.rdma.len/8-1] + 1; + + // Issue the REMOTE_WRITE + if(verbose) { + std::cout << "CLIENT: Sent out message #" << i << " at message-size " << sg.rdma.len << " with content " << hMem[sg.rdma.len/8-1] << std::endl; + } + cthread.invoke(coper, &sg); + + bool message_written = false; + while(cthread.checkCompleted(CoyoteOper::LOCAL_WRITE) < i+1) { + # ifdef VERBOSE + std::cout << "rdma_client: Current number of completed operations: " << cthread.checkCompleted(CoyoteOper::LOCAL_WRITE) << std::endl; + # endif + + // As long as the completion is not yet received, check for a possible stall-event + if( stalled.load() ) throw std::runtime_error("Stalled, SIGINT caught"); + } + + if(verbose) { + std::cout << "CLIENT: Received an ACK for this message!" << std::endl; + std::cout << "CLIENT: Received the following memory content: " << hMem[sg.rdma.len/8-1] << std::endl; + } + } + }; + + // Execution of the latency-lambda-function through the benchmarking-function to get the timing right + bench.runtime(benchmark_lat); + + // Generate the average time for the latency-test execution + std::cout << (bench.getAvg()) / (n_reps_lat * (1 + oper)) << " [ns]" << std::endl; + + // Scale up the Scatter-Gather-length to get to the next step of the experiment + sg.rdma.len *= 2; + } + + // End the printout + std::cout << std::endl; + + // Final connection sync via the thread-provided function + # ifdef VERBOSE + std::cout << "rdma_client: Perform a connection sync in cThread." << std::endl; + # endif + cthread.connSync(true); + + // No iTask anymore, we end gracefully anyways here. + int ret_val = 0; + + # ifdef VERBOSE + std::cout << "rdma_client: Generated the return value from clib_rdma-completion function " << ret_val << std::endl; + # endif + + return (ret_val); +} diff --git a/examples_sw/apps/rdma_service_no_daemon/include/types.hpp b/examples_sw/apps/rdma_service_no_daemon/include/types.hpp new file mode 100644 index 00000000..f879e4e5 --- /dev/null +++ b/examples_sw/apps/rdma_service_no_daemon/include/types.hpp @@ -0,0 +1,15 @@ +#include + +#include "cDefs.hpp" + +#include +#include +#include + +using namespace fpga; + +// Runtime +constexpr auto const fidRDMA = 1; + +constexpr auto const operatorRDMA = 1; +constexpr auto const opPriority = 1; diff --git a/examples_sw/apps/rdma_service_no_daemon/server/main.cpp b/examples_sw/apps/rdma_service_no_daemon/server/main.cpp new file mode 100644 index 00000000..062c6cad --- /dev/null +++ b/examples_sw/apps/rdma_service_no_daemon/server/main.cpp @@ -0,0 +1,261 @@ +/** + * Copyright (c) 2021, Systems Group, ETH Zurich + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cLib.hpp" +#include "types.hpp" +#include "cThread.hpp" +#include "cBench.hpp" + +using namespace std; +using namespace fpga; + +/* Signal handler */ +std::atomic stalled(false); +void gotInt(int) { + stalled.store(true); +} + +/* Def params */ +constexpr auto const defDevice = 0; +constexpr auto const defTargetVfid = 0; + +constexpr auto const defOper = false; // read +constexpr auto const defMinSize = 1024; +constexpr auto const defMaxSize = 64 * 1024; +constexpr auto const defNRepsThr = 1000; +constexpr auto const defNRepsLat = 100; +constexpr auto const defVerbose = false; +constexpr auto const def_compression_required = false; +constexpr auto const def_encryption_required = false; +constexpr auto const def_dpi_required = false; + +/** + * @brief Main + * + */ +int main(int argc, char *argv[]) +{ + /* Args + * + * Reading of input arguments for experiment execution + */ + boost::program_options::options_description programDescription("Options:"); + programDescription.add_options() + ("bitstream,b", boost::program_options::value(), "Shell bitstream") + ("device,d", boost::program_options::value(), "Target device") + ("vfid,i", boost::program_options::value(), "Target vFPGA") + ("tcpaddr,t", boost::program_options::value(), "TCP conn IP") + ("write,w", boost::program_options::value(), "Read(0)/Write(1)") + ("min_size,n", boost::program_options::value(), "Minimal transfer size") + ("max_size,x", boost::program_options::value(), "Maximum transfer size") + ("reps_thr,r", boost::program_options::value(), "Number of reps, throughput") + ("reps_lat,l", boost::program_options::value(), "Number of reps, latency") + ("verbose,v", boost::program_options::value(), "Printout of single messages") + ("encryption,e", boost::program_options::value(), "Encryption required") + ("compression,c", boost::program_options::value(), "Compression required") + ("dpi,d", boost::program_options::value(), "DPI required"); + + boost::program_options::variables_map commandLineArgs; + boost::program_options::store(boost::program_options::parse_command_line(argc, argv, programDescription), commandLineArgs); + boost::program_options::notify(commandLineArgs); + + // Set the default values for the input-determined parameters to have a fall-back in case input-values didn't work properly + string bstream_path = ""; + uint32_t cs_dev = defDevice; + uint32_t vfid = defTargetVfid; + string tcp_ip; + bool oper = defOper; + uint32_t min_size = defMinSize; + uint32_t max_size = defMaxSize; + uint32_t n_reps_thr = defNRepsThr; + uint32_t n_reps_lat = defNRepsLat; + bool verbose = defVerbose; + bool encryption_required = def_encryption_required; + bool compression_required = def_compression_required; + bool dpi_required = def_dpi_required; + + // Read the actual arguments from the command line and parse them to variables for further usage + if(commandLineArgs.count("bitstream") > 0) { + bstream_path = commandLineArgs["bitstream"].as(); + + std::cout << std::endl << "Shell loading (path: " << bstream_path << ") ..." << std::endl; + cRnfg crnfg(cs_dev); + crnfg.shellReconfigure(bstream_path); + } + if(commandLineArgs.count("device") > 0) cs_dev = commandLineArgs["device"].as(); + if(commandLineArgs.count("vfid") > 0) vfid = commandLineArgs["vfid"].as(); + if(commandLineArgs.count("tcpaddr") > 0) { + tcp_ip = commandLineArgs["tcpaddr"].as(); + } else { + std::cout << "Provide the TCP/IP address of the server" << std::endl; + return (EXIT_FAILURE); + } + if(commandLineArgs.count("write") > 0) oper = commandLineArgs["write"].as(); + if(commandLineArgs.count("min_size") > 0) min_size = commandLineArgs["min_size"].as(); + if(commandLineArgs.count("max_size") > 0) max_size = commandLineArgs["max_size"].as(); + if(commandLineArgs.count("reps_thr") > 0) n_reps_thr = commandLineArgs["reps_thr"].as(); + if(commandLineArgs.count("reps_lat") > 0) n_reps_lat = commandLineArgs["reps_lat"].as(); + if(commandLineArgs.count("verbose") > 0) verbose = commandLineArgs["verbose"].as(); + if(commandLineArgs.count("encryption") > 0) encryption_required = commandLineArgs["encryption"].as(); + if(commandLineArgs.count("compression") > 0) compression_required = commandLineArgs["compression"].as(); + if(commandLineArgs.count("dpi") > 0) dpi_required = commandLineArgs["dpi"].as(); + + //---------------------------------------------------------------- + // RDMA server side + // --------------------------------------------------------------- + + // Get a cthread for execution: Has the vFPGA-ID, host-process-ID of this calling process and device number + cThread cthread(defTargetVfid, getpid(), cs_dev, nullptr, nullptr, encryption_required, compression_required, dpi_required); + + // Get Memory for this thread that can hold the maximum required buffer size for this experiment + cthread.getMem({CoyoteAlloc::HPF, max_size, true}); + + // Instantiate the cLib specifically as server to initiate the RDMA-QP exchange + cLib clib_rdma("/tmp/coyote-daemon-vfid-0-rdma", fidRDMA, &cthread, tcp_ip.c_str(), defPort, false); + + // Create a scatter-gather entry for the RDMA-operations + sgEntry sg; + memset(&sg, 0, sizeof(rdmaSg)); + + // Set properties of the Scatter-Gather Entry: Start with the minimum size as required by the params and select hostStream as data origin + sg.rdma.len = min_size; + sg.rdma.local_stream = strmHost; + + // Get a hmem to write values into the payload of the RDMA-packets. Uses the allocated RDMA-buffer starting at vaddr + uint64_t *hMem = (uint64_t*)(cthread.getQpair()->local.vaddr); + + PR_HEADER("RDMA BENCHMARK"); + + + //--------------------------------------------- + // Execution of the experiment + //--------------------------------------------- + + // Iterate over rdma-buffer lengths up to the maximum required buffer size + while(sg.rdma.len <= max_size) { + + // Clear all registers for a clean start + cthread.clearCompleted(); + + // Respond to sync-handshake from the client-side + cthread.connSync(false); + + // Active participation of the server is only required for WRITE-tests (otherwise just responding.) Thus if-case here: + if(oper) { + + //------------------------------------------------ + // THROUGHPUT-TEST + //------------------------------------------------ + + // Wait until all expected throughput-WRITEs have been received + uint32_t number_of_completed_local_writes = 0; + while(cthread.checkCompleted(CoyoteOper::LOCAL_WRITE) < n_reps_thr) { + if(number_of_completed_local_writes != cthread.checkCompleted(CoyoteOper::LOCAL_WRITE)) { + if(verbose) { + std::cout << "SERVER: Received " << number_of_completed_local_writes << " LOCAL WRITES so far." << std::endl; + } + } + number_of_completed_local_writes = cthread.checkCompleted(CoyoteOper::LOCAL_WRITE); + } + + // Issue the same amount of REMOTE WRITEs to the other side + for(int i = 0; i < n_reps_thr; i++) { + // Increment the hMem for increasing payload-numbers + hMem[sg.rdma.len/8-1] = hMem[sg.rdma.len/8-1] + 1; + + // Isse the REMOTE WRITE + if(verbose) { + std::cout << "SERVER: Sent out message #" << i << " at message size " << sg.rdma.len << " with content " << hMem[sg.rdma.len/8-1] + 1; + } + cthread.invoke(CoyoteOper::REMOTE_RDMA_WRITE, &sg); + } + + // Clear all registers for a clean start of latency + cthread.clearCompleted(); + + // Respond to sync-handshake from the client-side + cthread.connSync(false); + + + //----------------------------------------------- + // LATENCY-TEST + //----------------------------------------------- + + // Iterate over the number of ping-pong-exchanges according to the desired experiment setting + for(int i = 0; i < n_reps_lat; i++) { + // Wait for the next incoming WRITE + bool message_written = false; + while(cthread.checkCompleted(CoyoteOper::LOCAL_WRITE) < i+1) { + if(!message_written) { + // std::cout << "RDMA-Server: Waiting for an incoming RDMA-WRITE at currently " << i << "." << std::endl; + message_written = true; + } + } + + // Increment the number in the payload before writing back + hMem[sg.rdma.len/8-1] = hMem[sg.rdma.len/8-1] + 1; + + // Issuing a WRITE in the reverse direction to the client + // std::cout << "RDMA-Server: Invoking a RDMA-WRITE from the Server to the Client at currently " << (i+1) << "." << std::endl; + if(verbose) { + std::cout << "SERVER: Sent out message #" << i << " at message-size " << sg.rdma.len << " with content " << hMem[sg.rdma.len/8-1] << std::endl; + } + cthread.invoke(CoyoteOper::REMOTE_RDMA_WRITE, &sg); + } + } else { + // In read-case, just execute the sync-handshake between throughput and latency + cthread.connSync(false); + } + + // Increment the RDMA-length + sg.rdma.len *= 2; + } + + // Perform one last sync-handshake with the client + cthread.connSync(false); + + return 0; +} \ No newline at end of file diff --git a/sw/include/cLib.hpp b/sw/include/cLib.hpp index 17186d23..d2fb7f1a 100644 --- a/sw/include/cLib.hpp +++ b/sw/include/cLib.hpp @@ -96,139 +96,245 @@ class cLib { // cthread: Associated thread // trgt_addr: Target address for the connection // port: Target port for the connection - cLib(const char *sock_name, int32_t fid, cThread *cthread, const char *trgt_addr, uint16_t port) { - - # ifdef VERBOSE - std::cout << "cLib: Called the constructor for a local connection (AF_UNIX)." << std::endl; - # endif - - // Establish variables for connection establishment - struct addrinfo *res, *t; - char* service; - int n = 0; + cLib(const char *sock_name, int32_t fid, cThread *cthread, const char *trgt_addr, uint16_t port, bool isClient = true) { + if(isClient) { + // Establish variables for connection establishment + struct addrinfo *res, *t; + char* service; + int n = 0; + + // Establish hints for network connection + struct addrinfo hints = {}; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + // Prefill the receive buffer with 0s + memset(recv_buff, 0, recvBuffSize); + + // Check if the buffer is attached to the thread + if(!cthread->isBuffAttached()) + throw std::runtime_error("buffers not attached"); + + // Format the port number in a string + if (asprintf(&service, "%d", port) < 0) + throw std::runtime_error("asprintf() failed"); + + // Create list of possible network adresses that are stored in res + n = getaddrinfo(trgt_addr, service, &hints, &res); + if (n < 0) { + free(service); + throw std::runtime_error("getaddrinfo() failed"); + } - // Establish hints for network connection - struct addrinfo hints = {}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; + // Iterate over the possible address structs and try to create a socket. If connection is successful, we're done with this loop. + for (t = res; t; t = t->ai_next) { + sockfd = ::socket(t->ai_family, t->ai_socktype, t->ai_protocol); + if (sockfd >= 0) { + if (!::connect(sockfd, t->ai_addr, t->ai_addrlen)) { + break; + } + ::close(sockfd); + sockfd = -1; + } + } - // Prefill the receive buffer with 0s - memset(recv_buff, 0, recvBuffSize); + // Throw error if no connection at all could be established + if (sockfd < 0) + throw std::runtime_error("Could not connect to master: " + std::string(trgt_addr) + ":" + to_string(port)); - // Check if the buffer is attached to the thread - if(!cthread->isBuffAttached()) - throw std::runtime_error("buffers not attached"); + // Fid - send the file descriptor to the connected socket + if(write(sockfd, &fid, sizeof(int32_t)) != sizeof(int32_t)) { + std::cout << "ERR: Failed to send a request" << std::endl; + exit(EXIT_FAILURE); + } - // Format the port number in a string - if (asprintf(&service, "%d", port) < 0) - throw std::runtime_error("asprintf() failed"); + // Send local queue to the remote side. The Qpair is obtained from the thread. + ibvQ l_qp = cthread->getQpair()->local; - // Create list of possible network adresses that are stored in res - n = getaddrinfo(trgt_addr, service, &hints, &res); - if (n < 0) { - free(service); - throw std::runtime_error("getaddrinfo() failed"); - } + // Send the QP to the other side + if(write(sockfd, &l_qp, sizeof(ibvQ)) != sizeof(ibvQ)) { + std::cout << "ERR: Failed to send a local queue " << std::endl; + exit(EXIT_FAILURE); + } - // Iterate over the possible address structs and try to create a socket. If connection is successful, we're done with this loop. - for (t = res; t; t = t->ai_next) { - sockfd = ::socket(t->ai_family, t->ai_socktype, t->ai_protocol); - if (sockfd >= 0) { - if (!::connect(sockfd, t->ai_addr, t->ai_addrlen)) { - break; - } - ::close(sockfd); - sockfd = -1; + // Read remote queue from the remote side, received via network + # ifdef VERBOSE + std::cout << "cLib: Read remote QP from the remote side" << std::endl; + # endif + if(read(sockfd, recv_buff, sizeof(ibvQ)) != sizeof(ibvQ)) { + std::cout << "ERR: Failed to read a remote queue" << std::endl; + exit(EXIT_FAILURE); } - } - # ifdef VERBOSE - std::cout << "cLib: Connected to remote side server via" << sockfd << std::endl; - # endif + // Received remote QP is located in the receive buffer and is getting copied over to the thread, which manages all QPs + memcpy(&cthread->getQpair()->remote, recv_buff, sizeof(ibvQ)); - // Throw error if no connection at all could be established - if (sockfd < 0) - throw std::runtime_error("Could not connect to master: " + std::string(trgt_addr) + ":" + to_string(port)); + // Negotiate the Balboa-capabilities by comparing local and remote queue - // Fid - send the file descriptor to the connected socket - # ifdef VERBOSE - std::cout << "cLib: Send fid to the remote side " << fid << std::endl; - # endif - if(write(sockfd, &fid, sizeof(int32_t)) != sizeof(int32_t)) { - std::cout << "ERR: Failed to send a request" << std::endl; - exit(EXIT_FAILURE); - } + // AES-encryption: The larger aes-key becomes the common one. If both AES-keys are set to 0, no encryption is used for this QP + if(cthread->getQpair()->local.aes_key > cthread->getQpair()->remote.aes_key) { + cthread->getQpair()->remote.aes_key = cthread->getQpair()->local.aes_key; + } else { + cthread->getQpair()->local.aes_key = cthread->getQpair()->remote.aes_key; + } - // Send local queue to the remote side. The Qpair is obtained from the thread. - ibvQ l_qp = cthread->getQpair()->local; + // Compression agreement: If at least one party wants compression, it is used for this communication flow + if(cthread->getQpair()->local.compression_enabled || cthread->getQpair()->remote.compression_enabled) { + cthread->getQpair()->remote.compression_enabled = true; + cthread->getQpair()->local.compression_enabled = true; + } - // Send the QP to the other side - # ifdef VERBOSE - std::cout << "cLib: Send local QP to the remote side" << std::endl; - # endif - if(write(sockfd, &l_qp, sizeof(ibvQ)) != sizeof(ibvQ)) { - std::cout << "ERR: Failed to send a local queue " << std::endl; - exit(EXIT_FAILURE); - } + // DPI agreement: If at least one party wants to use DPI, it is used for this communication flow + if(cthread->getQpair()->local.dpi_enabled || cthread->getQpair()->remote.dpi_enabled) { + cthread->getQpair()->remote.dpi_enabled = true; + cthread->getQpair()->local.dpi_enabled = true; + } - // Read remote queue from the remote side, received via network - # ifdef VERBOSE - std::cout << "cLib: Read remote QP from the remote side" << std::endl; - # endif - if(read(sockfd, recv_buff, sizeof(ibvQ)) != sizeof(ibvQ)) { - std::cout << "ERR: Failed to read a remote queue" << std::endl; - exit(EXIT_FAILURE); - } + // Output: Print local and remote QPs + std::cout << "Queue pair: " << std::endl; + cthread->getQpair()->local.print("Local "); + cthread->getQpair()->remote.print("Remote"); - // Received remote QP is located in the receive buffer and is getting copied over to the thread, which manages all QPs - memcpy(&cthread->getQpair()->remote, recv_buff, sizeof(ibvQ)); + // Write context and connection to the configuration registers + cthread->writeQpContext(port); - // Negotiate the Balboa-capabilities by comparing local and remote queue + // ARP lookup to get the MAC-address for the remote QP IP-address + # ifdef VERBOSE + std::cout << "cLib: Initiate an Arp-lookup for the IP-address " << cthread->getQpair()->remote.ip_addr << std::endl; + # endif + cthread->doArpLookup(cthread->getQpair()->remote.ip_addr); - // AES-encryption: The larger aes-key becomes the common one. If both AES-keys are set to 0, no encryption is used for this QP - if(cthread->getQpair()->local.aes_key > cthread->getQpair()->remote.aes_key) { - cthread->getQpair()->remote.aes_key = cthread->getQpair()->local.aes_key; + // Set connection - open the network connection via the thread + # ifdef VERBOSE + std::cout << "cLib: Safe the connection in the cThread " << sockfd << std::endl; + # endif + cthread->setConnection(sockfd); + + // Printout the success of established connection + std::cout << "Client registered" << std::endl; } else { - cthread->getQpair()->local.aes_key = cthread->getQpair()->remote.aes_key; - } + // If cLib is created for a RDMA-server, it needs to take the passive part in the QP-exchange + + ////////////////////////////////////////// + // Step 1: Init the socket for QP-exchange + ////////////////////////////////////////// + + int sockfd = -1; + struct sockaddr_in server; + + // Create the socket and check if it's successful + sockfd = ::socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) + throw std::runtime_error("Could not create a socket"); + + // Select network and address for connection + server.sin_family = AF_INET; + server.sin_addr.s_addr = INADDR_ANY; + server.sin_port = htons(port); + + // Try to connect the socket + if (::bind(sockfd, (struct sockaddr*)&server, sizeof(server)) < 0) + throw std::runtime_error("Could not bind a socket"); + + if (sockfd < 0 ) + throw std::runtime_error("Could not listen to a port: " + to_string(port)); + + // Try to listen to the network socket + if(listen(sockfd, maxNumClients) == -1) { + syslog(LOG_ERR, "Error listen()"); + exit(EXIT_FAILURE); + } - // Compression agreement: If at least one party wants compression, it is used for this communication flow - if(cthread->getQpair()->local.compression_enabled || cthread->getQpair()->remote.compression_enabled) { - cthread->getQpair()->remote.compression_enabled = true; - cthread->getQpair()->local.compression_enabled = true; - } - // DPI agreement: If at least one party wants to use DPI, it is used for this communication flow - if(cthread->getQpair()->local.dpi_enabled || cthread->getQpair()->remote.dpi_enabled) { - cthread->getQpair()->remote.dpi_enabled = true; - cthread->getQpair()->local.dpi_enabled = true; - } + ////////////////////////////////////////// + // Step 2: QP-Exchange + ///////////////////////////////////////// + + // Create all required local variables + uint32_t recv_qpid; + uint8_t ack; + uint32_t n; + int connfd; + int fid; + ibvQ r_qp; + + // Create a receive buffer and allocate memory space for it + char recv_buf[recvBuffSize]; + memset(recv_buf, 0, recvBuffSize); + + // Try to accept the incoming connection + if((connfd = ::accept(sockfd, NULL, 0)) != -1) { + syslog(LOG_NOTICE, "Connection accepted remote, connfd: %d", connfd); + + // Read fid + if((n = ::read(connfd, recv_buf, sizeof(int32_t))) == sizeof(int32_t)) { + memcpy(&fid, recv_buf, sizeof(int32_t)); + syslog(LOG_NOTICE, "Function id: %d", fid); + } else { + ::close(connfd); + syslog(LOG_ERR, "Registration failed, connfd: %d, received: %d", connfd, n); + exit(EXIT_FAILURE); + } - // Output: Print local and remote QPs - std::cout << "Queue pair: " << std::endl; - cthread->getQpair()->local.print("Local "); - cthread->getQpair()->remote.print("Remote"); + // Read remote queue pair + if ((n = ::read(connfd, recv_buf, sizeof(ibvQ))) == sizeof(ibvQ)) { + memcpy(&r_qp, recv_buf, sizeof(ibvQ)); + syslog(LOG_NOTICE, "Read remote queue"); + } else { + ::close(connfd); + syslog(LOG_ERR, "Could not read a remote queue %d", n); + exit(EXIT_FAILURE); + } - // Write context and connection to the configuration registers - # ifdef VERBOSE - std::cout << "cLib: Write QP-context to the configuration registers" << std::endl; - # endif - cthread->writeQpContext(port); + // Store the received remote QP as part of the cThread + cthread->getQpair()->remote = r_qp; + cthread->getMem({CoyoteAlloc::HPF, r_qp.size, true}); - // ARP lookup to get the MAC-address for the remote QP IP-address - # ifdef VERBOSE - std::cout << "cLib: Initiate an Arp-lookup for the IP-address " << cthread->getQpair()->remote.ip_addr << std::endl; - # endif - cthread->doArpLookup(cthread->getQpair()->remote.ip_addr); + // Negotiation of the Balboa-features - // Set connection - open the network connection via the thread - # ifdef VERBOSE - std::cout << "cLib: Safe the connection in the cThread " << sockfd << std::endl; - # endif - cthread->setConnection(sockfd); + // AES-encryption: The larger AES-key becomes the common one. If both AES-keys are set to 0, no encryption is used + if(cthread->getQpair()->local.aes_key > cthread->getQpair()->remote.aes_key) { + cthread->getQpair()->remote.aes_key = cthread->getQpair()->local.aes_key; + } else { + cthread->getQpair()->local.aes_key = cthread->getQpair()->remote.aes_key; + } - // Printout the success of established connection - std::cout << "Client registered" << std::endl; + // Compression agreement: If at least one party wants compression, it is used for this communication flow + if(cthread->getQpair()->local.compression_enabled || cthread->getQpair()->remote.compression_enabled) { + cthread->getQpair()->remote.compression_enabled = true; + cthread->getQpair()->local.compression_enabled = true; + } + + // DPI agreement: If at least one party wants to use DPI, it is used for this communication flow + if(cthread->getQpair()->local.dpi_enabled || cthread->getQpair()->remote.dpi_enabled) { + cthread->getQpair()->remote.dpi_enabled = true; + cthread->getQpair()->local.dpi_enabled = true; + } + + // Send the local queue pair to the remote side + if (::write(connfd, &cthread->getQpair()->local, sizeof(ibvQ)) != sizeof(ibvQ)) { + ::close(connfd); + syslog(LOG_ERR, "Could not write a local queue"); + exit(EXIT_FAILURE); + } + + // Write context and connection to the config-space of Coyote + cthread->writeQpContext(port); + + // Perform an ARP lookup + cthread->doArpLookup(cthread->getQpair()->remote.ip_addr); + + // Set Connection for sync-handshaking etc. + cthread->setConnection(connfd); + + // Printout the success of established connection + std::cout << "Server registered" << std::endl; + + sockfd = connfd; + } else { + syslog(LOG_ERR, "Accept failed"); + } + } } ~cLib() {