From ee6fe35b7d3279502eac3879c06bb99f2a9729da Mon Sep 17 00:00:00 2001 From: Alexander Galanin Date: Thu, 23 Dec 2021 18:05:37 +0300 Subject: [PATCH] issue: 2681317 integrate InputHandlers into client/server/callback code --- configure.ac | 2 +- contrib/jenkins_tests/globals.sh | 5 +- src/client.h | 227 +++++++++++--------------- src/common.h | 197 ---------------------- src/defs.cpp | 2 +- src/defs.h | 2 - src/server.h | 270 +++++++++++++------------------ src/sockperf.cpp | 234 +++++++++------------------ 8 files changed, 287 insertions(+), 652 deletions(-) diff --git a/configure.ac b/configure.ac index 3ab6bc69..700c0eda 100644 --- a/configure.ac +++ b/configure.ac @@ -70,7 +70,7 @@ AC_LANG(C++) SP_CHECK_CXXFLAGS_APPEND([OUR_CXXFLAGS], [\ -Wall \ - "--param inline-unit-growth=200"]) + "--param inline-unit-growth=300"]) ########################################################################## # check VMA extra API diff --git a/contrib/jenkins_tests/globals.sh b/contrib/jenkins_tests/globals.sh index b67d63f4..c5bc3770 100755 --- a/contrib/jenkins_tests/globals.sh +++ b/contrib/jenkins_tests/globals.sh @@ -41,6 +41,7 @@ nproc=$(grep processor /proc/cpuinfo|wc -l) make_opt="-j$(($nproc / 2 + 1))" if [ $(command -v timeout >/dev/null 2>&1 && echo $?) ]; then timeout_exe="timeout -s SIGKILL 20m" + long_timeout_exe="timeout -s SIGKILL 40m" fi trap "on_exit" INT TERM ILL KILL FPE SEGV ALRM @@ -206,10 +207,10 @@ function do_check_result() { set +e if [ -z "$5" ]; then - eval $timeout_exe $1 + eval $long_timeout_exe $1 ret=$? else - eval $timeout_exe $1 2>> "${5}.err" 1>> "${5}.log" + eval $long_timeout_exe $1 2>> "${5}.err" 1>> "${5}.log" ret=$? do_archive "${5}.err" "${5}.log" fi diff --git a/src/client.h b/src/client.h index e50acb41..b44abd6a 100644 --- a/src/client.h +++ b/src/client.h @@ -30,6 +30,7 @@ #define CLIENT_H_ #include "common.h" +#include "input_handlers.h" #include "packet.h" //============================================================================== @@ -62,6 +63,35 @@ class Client : public ClientBase { PongModeCare m_pongModeCare; // has msg_sendto() method and can be one of: PongModeNormal, // PongModeAlways, PongModeNever + class ClientMessageHandlerCallback { + Client &m_client; + int m_ifd; + struct sockaddr_in &m_recvfrom_addr; + int m_receiveCount; + + public: + inline ClientMessageHandlerCallback(Client &client, + int ifd, struct sockaddr_in &recvfrom_addr) : + m_client(client), + m_ifd(ifd), + m_recvfrom_addr(recvfrom_addr), + m_receiveCount(0) + { + } + + inline bool handle_message() + { + return m_client.handle_message(m_ifd, m_recvfrom_addr, m_receiveCount); + } + + inline int getReceiveCount() const + { + return m_receiveCount; + } + }; + public: Client(int _fd_min, int _fd_max, int _fd_num); virtual ~Client(); @@ -136,174 +166,103 @@ class Client : public ClientBase { } //------------------------------------------------------------------------------ + template inline unsigned int client_receive_from_selected(int ifd) { int ret = 0; struct sockaddr_in recvfrom_addr; - int receiveCount = 0; - int serverNo = 0; - int remain_buffer = 0; fds_data *l_fds_ifd = g_fds_array[ifd]; - TicksTime rxTime; -#ifdef USING_VMA_EXTRA_API - vma_buff_t *tmp_vma_buff = g_vma_buff; - if (SOCKETXTREME == g_pApp->m_const_params.fd_handler_type && tmp_vma_buff) { - ret = msg_recv_socketxtreme(l_fds_ifd, tmp_vma_buff, &recvfrom_addr); - } else if (g_pApp->m_const_params.is_vmazcopyread && - !(remain_buffer = free_vma_packets(ifd, l_fds_ifd->recv.cur_size))) { - // Handled buffer is filled, free_vma_packets returns 0 - ret = l_fds_ifd->recv.cur_size; - } else -#endif - { - ret = msg_recvfrom(ifd, l_fds_ifd->recv.cur_addr + l_fds_ifd->recv.cur_offset, - l_fds_ifd->recv.cur_size, &recvfrom_addr, &l_fds_ifd->recv.cur_addr, - remain_buffer); - } + InputHandler input_handler(m_pMsgReply, l_fds_ifd->recv); + ret = input_handler.receive_pending_data(ifd, &recvfrom_addr); if (unlikely(ret <= 0)) { + input_handler.cleanup(); if (ret == RET_SOCKET_SHUTDOWN) { if (l_fds_ifd->sock_type == SOCK_STREAM) { exit_with_log("A connection was forcibly closed by a peer", SOCKPERF_ERR_SOCKET, l_fds_ifd); } } - if (ret < 0) return 0; + else /* (ret < 0) */ return 0; } - int nbytes = ret; - while (nbytes) { - /* 1: message header is not received yet */ - if ((l_fds_ifd->recv.cur_offset + nbytes) < MsgHeader::EFFECTIVE_SIZE) { - l_fds_ifd->recv.cur_size -= nbytes; - l_fds_ifd->recv.cur_offset += nbytes; - - /* 4: set current buffer size to size of remained part of message header to - * guarantee getting full message header on next iteration - */ - if (l_fds_ifd->recv.cur_size < MsgHeader::EFFECTIVE_SIZE) { - l_fds_ifd->recv.cur_size = - MsgHeader::EFFECTIVE_SIZE - l_fds_ifd->recv.cur_offset; - } -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - return (receiveCount); - } else if (l_fds_ifd->recv.cur_offset < MsgHeader::EFFECTIVE_SIZE) { - /* 2: message header is got, match message to cycle buffer */ - m_pMsgReply->setBuf(l_fds_ifd->recv.cur_addr); - m_pMsgReply->setHeaderToHost(); - } else { - /* 2: message header is got, match message to cycle buffer */ - m_pMsgReply->setBuf(l_fds_ifd->recv.cur_addr); - } + ClientMessageHandlerCallback callback(*this, ifd, recvfrom_addr); + input_handler.iterate_over_buffers(callback); + input_handler.cleanup(); - if (unlikely(m_pMsgReply->getSequenceCounter() > m_pMsgRequest->getSequenceCounter())) { - exit_with_err("Sequence Number received was higher then expected", - SOCKPERF_ERR_FATAL); - } - if (unlikely(m_pMsgReply->getLength() > MAX_PAYLOAD_SIZE)) { - exit_with_err("Message received was larger than expected.", SOCKPERF_ERR_FATAL); - } + return callback.getReceiveCount(); + } - /* 3: message is not complete */ - if ((l_fds_ifd->recv.cur_offset + nbytes) < m_pMsgReply->getLength()) { - l_fds_ifd->recv.cur_size -= nbytes; - l_fds_ifd->recv.cur_offset += nbytes; - - /* 4: set current buffer size to size of remained part of message to - * guarantee getting full message on next iteration (using extended reserved - * memory) - * and shift to start of cycle buffer - */ - if (l_fds_ifd->recv.cur_size < (int)m_pMsgReply->getMaxSize()) { - l_fds_ifd->recv.cur_size = - m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset; - } + inline unsigned int client_receive_from_selected(int ifd) { #ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; + if (SOCKETXTREME == g_pApp->m_const_params.fd_handler_type) { + return client_receive_from_selected(ifd); + } else if (g_pApp->m_const_params.is_vmazcopyread) { + return client_receive_from_selected(ifd); + } else #endif - return (receiveCount); - } + { + return client_receive_from_selected(ifd); + } + } - /* 5: message is complete shift to process next one */ - nbytes -= m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset; - l_fds_ifd->recv.cur_addr += m_pMsgReply->getLength(); - l_fds_ifd->recv.cur_size -= m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset; - l_fds_ifd->recv.cur_offset = 0; + inline bool handle_message(int ifd, struct sockaddr_in &recvfrom_addr, int &receiveCount) + { + int serverNo = 0; #if defined(LOG_TRACE_MSG_IN) && (LOG_TRACE_MSG_IN == TRUE) - printf(">>> "); - hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE); + printf(">>> "); + hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE); #endif /* LOG_TRACE_MSG_IN */ + if (unlikely(!m_pMsgReply->isValidHeader())) { + exit_with_err("Message received was larger than expected.", SOCKPERF_ERR_FATAL); + } + if (unlikely(m_pMsgReply->getSequenceCounter() > m_pMsgRequest->getSequenceCounter())) { + exit_with_err("Sequence Number received was higher than expected", + SOCKPERF_ERR_FATAL); + } - if (g_b_exit) return 0; - if (m_pMsgReply->isClient()) { - assert(!(l_fds_ifd->is_multicast && g_pApp->m_const_params.mc_loop_disable)); -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - continue; - } + if (unlikely(g_b_exit)) { + return false; + } + if (m_pMsgReply->isClient()) { + assert(!(g_fds_array[ifd]->is_multicast && g_pApp->m_const_params.mc_loop_disable)); + return true; + } - // should not count the warmup messages - if (unlikely(m_pMsgReply->isWarmupMessage())) { -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - continue; - } + // should not count the warmup messages + if (unlikely(m_pMsgReply->isWarmupMessage())) { + return true; + } - receiveCount++; - rxTime.setNow(); + receiveCount++; + TicksTime rxTime; + rxTime.setNow(); #if 0 // should be part of check-data-integrity - if (g_pApp->m_const_params.msg_size_range == 0) { //ABH: added 'if', otherwise, size check will not suit latency-under-load - if (nbytes != g_msg_size && errno != EINTR) { - exit_with_log("received message size test failed (sent:%d received:%d)", g_msg_size, nbytes,SOCKPERF_ERR_FATAL); - } - } -#endif - -#ifdef DEBUG // should not occur in real test - if (m_pMsgReply->getSequenceCounter() % g_pApp->m_const_params.reply_every) { - log_err("skipping unexpected received message: seqNo=%" PRIu64 " mask=0x%x", - m_pMsgReply->getSequenceCounter(), m_pMsgReply->getFlags()); -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - continue; + if (g_pApp->m_const_params.msg_size_range == 0) { //ABH: added 'if', otherwise, size check will not suit latency-under-load + if (nbytes != g_msg_size && errno != EINTR) { + exit_with_log("received message size test failed (sent:%d received:%d)", g_msg_size, nbytes,SOCKPERF_ERR_FATAL); } + } #endif - serverNo = client_get_server_id(ifd, &recvfrom_addr); - if (unlikely(serverNo < 0)) { - exit_with_log("Number of servers more than expected", SOCKPERF_ERR_FATAL); - } else { - g_pPacketTimes->setRxTime(m_pMsgReply->getSequenceCounter(), rxTime, serverNo); - m_switchDataIntegrity.execute(m_pMsgRequest, m_pMsgReply); - } - -#ifdef USING_VMA_EXTRA_API - next: - if (tmp_vma_buff) { - ret = msg_process_next(l_fds_ifd, &tmp_vma_buff, &nbytes); - if (ret) { - return (receiveCount); - } - } -#endif +#ifdef DEBUG // should not occur in real test + if (m_pMsgReply->getSequenceCounter() % g_pApp->m_const_params.reply_every) { + log_err("skipping unexpected received message: seqNo=%" PRIu64 " mask=0x%x", + m_pMsgReply->getSequenceCounter(), m_pMsgReply->getFlags()); + return true; } +#endif - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message - */ - if (!nbytes) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; + serverNo = client_get_server_id(ifd, &recvfrom_addr); + if (unlikely(serverNo < 0)) { + exit_with_log("Number of servers more than expected", SOCKPERF_ERR_FATAL); + } else { + g_pPacketTimes->setRxTime(m_pMsgReply->getSequenceCounter(), rxTime, serverNo); + m_switchDataIntegrity.execute(m_pMsgRequest, m_pMsgReply); } - return (receiveCount); + return true; } //------------------------------------------------------------------------------ diff --git a/src/common.h b/src/common.h index 6bdb17de..b25e25fe 100644 --- a/src/common.h +++ b/src/common.h @@ -51,203 +51,6 @@ const char *handler2str(fd_block_handler_t type); int read_int_from_sys_file(const char *path); // inline functions -#ifdef USING_VMA_EXTRA_API -//------------------------------------------------------------------------------ -static inline int msg_recv_socketxtreme(fds_data *l_fds_ifd, vma_buff_t *tmp_vma_buff, - struct sockaddr_in *recvfrom_addr) { - *recvfrom_addr = g_vma_comps->src; - if (l_fds_ifd->recv.cur_offset) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size - l_fds_ifd->recv.cur_offset; - memmove(l_fds_ifd->recv.cur_addr + l_fds_ifd->recv.cur_offset, - (uint8_t *)tmp_vma_buff->payload, tmp_vma_buff->len); - } else { - l_fds_ifd->recv.cur_addr = (uint8_t *)tmp_vma_buff->payload; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - } - return tmp_vma_buff->len; -} - -//------------------------------------------------------------------------------ -static inline int msg_process_next(fds_data *l_fds_ifd, vma_buff_t **tmp_vma_buff, int *nbytes) { - if (l_fds_ifd->recv.cur_offset) { - memmove(l_fds_ifd->recv.buf, l_fds_ifd->recv.cur_addr, l_fds_ifd->recv.cur_offset); - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size - l_fds_ifd->recv.cur_offset; - if ((*tmp_vma_buff)->next) { - *tmp_vma_buff = (*tmp_vma_buff)->next; - memmove(l_fds_ifd->recv.cur_addr + l_fds_ifd->recv.cur_offset, - (uint8_t *)(*tmp_vma_buff)->payload, (*tmp_vma_buff)->len); - *nbytes = (*tmp_vma_buff)->len; - } else { - return 1; - } - } else if (0 == *nbytes && (*tmp_vma_buff)->next) { - *tmp_vma_buff = (*tmp_vma_buff)->next; - l_fds_ifd->recv.cur_addr = (uint8_t *)(*tmp_vma_buff)->payload; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - *nbytes = (*tmp_vma_buff)->len; - } - return 0; -} - -//------------------------------------------------------------------------------ -static inline int free_vma_packets(int fd, int nbytes) { - int data_to_copy; - int remain_buffer = 0; - struct vma_packet_t *pkt; - ZeroCopyData *z_ptr = g_zeroCopyData[fd]; - - if (z_ptr) { - remain_buffer = nbytes; - // Receive held data, and free VMA's previously received zero copied packets - if (z_ptr->m_pkts && z_ptr->m_pkts->n_packet_num > 0) { - - pkt = &z_ptr->m_pkts->pkts[0]; - - while (z_ptr->m_pkt_index < pkt->sz_iov) { - data_to_copy = _min(remain_buffer, (int)(pkt->iov[z_ptr->m_pkt_index].iov_len - - z_ptr->m_pkt_offset)); - remain_buffer -= data_to_copy; - z_ptr->m_pkt_offset += data_to_copy; - - // Handled buffer is filled - if (z_ptr->m_pkt_offset < pkt->iov[z_ptr->m_pkt_index].iov_len) return 0; - - z_ptr->m_pkt_offset = 0; - z_ptr->m_pkt_index++; - } - - g_vma_api->free_packets(fd, z_ptr->m_pkts->pkts, z_ptr->m_pkts->n_packet_num); - z_ptr->m_pkts = NULL; - z_ptr->m_pkt_index = 0; - z_ptr->m_pkt_offset = 0; - - // Handled buffer is filled - if (remain_buffer == 0) return 0; - } - - return remain_buffer; - } - - return nbytes; -} -#endif - -//------------------------------------------------------------------------------ -static inline int msg_recvfrom(int fd, uint8_t *buf, int nbytes, struct sockaddr_in *recvfrom_addr, - uint8_t **zcopy_pkt_addr, int remain_buffer) { - int ret = 0; - socklen_t size = sizeof(struct sockaddr_in); - int flags = 0; - -#ifdef USING_VMA_EXTRA_API - if (g_pApp->m_const_params.is_vmazcopyread) { - int data_to_copy; - struct vma_packet_t *pkt; - ZeroCopyData *z_ptr = g_zeroCopyData[fd]; - - if (z_ptr) { - // Receive the next packet with zero copy API - ret = g_vma_api->recvfrom_zcopy(fd, z_ptr->m_pkt_buf, Message::getMaxSize(), &flags, - (struct sockaddr *)recvfrom_addr, &size); - - if (ret > 0) { - // Zcopy receive is performed - if (flags & MSG_VMA_ZCOPY) { - z_ptr->m_pkts = (struct vma_packets_t *)z_ptr->m_pkt_buf; - if (z_ptr->m_pkts->n_packet_num > 0) { - - pkt = &z_ptr->m_pkts->pkts[0]; - - // Make receive address point to the beginning of returned recvfrom_zcopy - // buffer - *zcopy_pkt_addr = (uint8_t *)pkt->iov[z_ptr->m_pkt_index].iov_base; - - while (z_ptr->m_pkt_index < pkt->sz_iov) { - data_to_copy = - _min(remain_buffer, (int)pkt->iov[z_ptr->m_pkt_index].iov_len); - remain_buffer -= data_to_copy; - z_ptr->m_pkt_offset += data_to_copy; - - // Handled buffer is filled - if (z_ptr->m_pkt_offset < pkt->iov[z_ptr->m_pkt_index].iov_len) - return nbytes; - - z_ptr->m_pkt_offset = 0; - z_ptr->m_pkt_index++; - } - ret = nbytes - remain_buffer; - } else { - ret = (remain_buffer == nbytes) ? -1 : (nbytes - remain_buffer); - } - } else { - data_to_copy = _min(remain_buffer, ret); - memcpy(buf + (nbytes - remain_buffer), &z_ptr->m_pkt_buf[fd], data_to_copy); - ret = nbytes - (remain_buffer - data_to_copy); - } - } - // Non_blocked with held packet. - else if (ret < 0 && os_err_eagain() && (remain_buffer < nbytes)) { - return nbytes - remain_buffer; - } - } - } else -#endif - { -/* - When writing onto a connection-oriented socket that has been shut down - (by the local or the remote end) SIGPIPE is sent to the writing process - and EPIPE is returned. The signal is not sent when the write call specified - the MSG_NOSIGNAL flag. - Note: another way is call signal (SIGPIPE,SIG_IGN); - */ -#ifndef WIN32 - flags = MSG_NOSIGNAL; -#endif - -#if defined(DEFINED_TLS) - if (g_fds_array[fd]->tls_handle) { - ret = tls_read(g_fds_array[fd]->tls_handle, buf, nbytes); - } else -#endif /* DEFINED_TLS */ - { - ret = recvfrom(fd, buf, nbytes, flags, (struct sockaddr *)recvfrom_addr, &size); - } - -#if defined(LOG_TRACE_MSG_IN) && (LOG_TRACE_MSG_IN == TRUE) - printf("> "); - hexdump(buf, MsgHeader::EFFECTIVE_SIZE); -#endif /* LOG_TRACE_MSG_IN */ - -#if defined(LOG_TRACE_RECV) && (LOG_TRACE_RECV == TRUE) - LOG_TRACE("raw", "%s IP: %s:%d [fd=%d ret=%d] %s", __FUNCTION__, - inet_ntoa(recvfrom_addr->sin_addr), ntohs(recvfrom_addr->sin_port), fd, ret, - strerror(errno)); -#endif /* LOG_TRACE_RECV */ - } - - if (ret == 0 || errno == EPIPE || os_err_conn_reset()) { - /* If no messages are available to be received and the peer has performed an orderly - * shutdown, - * recv()/recvfrom() shall return 0 - * */ - ret = RET_SOCKET_SHUTDOWN; - errno = 0; - } - /* ret < MsgHeader::EFFECTIVE_SIZE - * ret value less than MsgHeader::EFFECTIVE_SIZE - * is bad case for UDP so error could be actual but it is possible value for TCP - */ - else if (ret < 0 && !os_err_eagain() && errno != EINTR) { - recvfromError(fd); - } - - return ret; -} - //------------------------------------------------------------------------------ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, const struct sockaddr_in *sendto_addr) { diff --git a/src/defs.cpp b/src/defs.cpp index 68fe2dde..af75df8f 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -45,7 +45,7 @@ debug_level_t g_debug_level = LOG_LVL_INFO; struct vma_buff_t *g_vma_buff = NULL; struct vma_completion_t *g_vma_comps; -ZeroCopyData::ZeroCopyData() : m_pkt_buf(NULL), m_pkts(NULL), m_pkt_index(0), m_pkt_offset(0) {}; +ZeroCopyData::ZeroCopyData() : m_pkt_buf(NULL), m_pkts(NULL) {} void ZeroCopyData::allocate() { m_pkt_buf = (unsigned char *)MALLOC(Message::getMaxSize()); } diff --git a/src/defs.h b/src/defs.h index bc9fd6c6..32a07a6f 100644 --- a/src/defs.h +++ b/src/defs.h @@ -453,8 +453,6 @@ class ZeroCopyData { ~ZeroCopyData(); unsigned char *m_pkt_buf; struct vma_packets_t *m_pkts; - unsigned int m_pkt_index; - unsigned int m_pkt_offset; }; // map from fd to zeroCopyData typedef std::map zeroCopyMap; diff --git a/src/server.h b/src/server.h index bd6e204a..ecbe7842 100644 --- a/src/server.h +++ b/src/server.h @@ -30,6 +30,7 @@ #define SERVER_H_ #include "common.h" +#include "input_handlers.h" #ifdef ST_TEST extern int prepare_socket(int fd, struct fds_data *p_data, bool stTest = false); @@ -81,6 +82,28 @@ class Server : public ServerBase { private: IoType m_ioHandler; + class ServerMessageHandlerCallback { + Server &m_server; + int m_ifd; + struct sockaddr_in &m_recvfrom_addr; + fds_data *m_fds_ifd; + + public: + inline ServerMessageHandlerCallback(Server &server, + int ifd, struct sockaddr_in &recvfrom_addr, fds_data *l_fds_ifd) : + m_server(server), + m_ifd(ifd), + m_recvfrom_addr(recvfrom_addr), + m_fds_ifd(l_fds_ifd) + { + } + + inline bool handle_message() + { + return m_server.handle_message(m_ifd, m_recvfrom_addr, m_fds_ifd); + } + }; + // protected: public: //------------------------------------------------------------------------------ @@ -92,8 +115,25 @@ class Server : public ServerBase { /* ** receive from and send to selected socket */ + template /*inline*/ bool server_receive_then_send(int ifd); + inline bool server_receive_then_send(int ifd) + { +#ifdef USING_VMA_EXTRA_API + if (SOCKETXTREME == g_pApp->m_const_params.fd_handler_type) { + return server_receive_then_send(ifd); + } else if (g_pApp->m_const_params.is_vmazcopyread) { + return server_receive_then_send(ifd); + } else +#endif + { + return server_receive_then_send(ifd); + } + } + + /*inline*/ bool handle_message(int ifd, struct sockaddr_in &recvfrom_addr, fds_data *l_fds_ifd); + //------------------------------------------------------------------------------ int server_accept(int ifd); @@ -101,6 +141,7 @@ class Server : public ServerBase { SwitchActivityInfo m_switchActivityInfo; SwitchCalcGaps m_switchCalcGaps; }; + void print_log(const char *error, fds_data *fds) { printf("IP = %-15s PORT = %5d # %s ", inet_ntoa(fds->server_addr.sin_addr), ntohs(fds->server_addr.sin_port), PRINT_PROTOCOL(fds->sock_type)); @@ -124,8 +165,6 @@ void close_ifd(int fd, int ifd, fds_data *l_fds_ifd) { if (z_ptr && z_ptr->m_pkts) { g_vma_api->free_packets(fd, z_ptr->m_pkts->pkts, z_ptr->m_pkts->n_packet_num); z_ptr->m_pkts = NULL; - z_ptr->m_pkt_index = 0; - z_ptr->m_pkt_offset = 0; } g_vma_api->register_recv_callback(fd, NULL, NULL); @@ -156,201 +195,110 @@ void close_ifd(int fd, int ifd, fds_data *l_fds_ifd) { ** receive from and send to selected socket */ template +template inline bool Server::server_receive_then_send(int ifd) { struct sockaddr_in recvfrom_addr; - struct sockaddr_in sendto_addr; - bool do_update = true; + static const bool do_update = true; int ret = 0; - int remain_buffer = 0; fds_data *l_fds_ifd = g_fds_array[ifd]; if (unlikely(!l_fds_ifd)) { return (do_update); } -#ifdef USING_VMA_EXTRA_API - vma_buff_t *tmp_vma_buff = g_vma_buff; - if (SOCKETXTREME == g_pApp->m_const_params.fd_handler_type && tmp_vma_buff) { - ret = msg_recv_socketxtreme(l_fds_ifd, tmp_vma_buff, &recvfrom_addr); - } else if (g_pApp->m_const_params.is_vmazcopyread && - !(remain_buffer = free_vma_packets(ifd, l_fds_ifd->recv.cur_size))) { - // Handled buffer is filled, free_vma_packets returns 0 - ret = l_fds_ifd->recv.cur_size; - } else -#endif - { - ret = msg_recvfrom(ifd, l_fds_ifd->recv.cur_addr + l_fds_ifd->recv.cur_offset, - l_fds_ifd->recv.cur_size, &recvfrom_addr, &l_fds_ifd->recv.cur_addr, - remain_buffer); - } + + InputHandler input_handler(m_pMsgReply, l_fds_ifd->recv); + ret = input_handler.receive_pending_data(ifd, &recvfrom_addr); if (unlikely(ret <= 0)) { + input_handler.cleanup(); if (ret == RET_SOCKET_SHUTDOWN) { if (l_fds_ifd->sock_type == SOCK_STREAM) { close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd); } return (do_update); - } - if (ret < 0) return (!do_update); - } - - int nbytes = ret; - while (nbytes) { - /* 1: message header is not received yet */ - if ((l_fds_ifd->recv.cur_offset + nbytes) < MsgHeader::EFFECTIVE_SIZE) { - l_fds_ifd->recv.cur_size -= nbytes; - l_fds_ifd->recv.cur_offset += nbytes; - - /* 4: set current buffer size to size of remained part of message header to - * guarantee getting full message header on next iteration - */ - if (l_fds_ifd->recv.cur_size < MsgHeader::EFFECTIVE_SIZE) { - l_fds_ifd->recv.cur_size = MsgHeader::EFFECTIVE_SIZE - l_fds_ifd->recv.cur_offset; - } -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif + } else /* (ret < 0) */ { return (!do_update); - } else if (l_fds_ifd->recv.cur_offset < MsgHeader::EFFECTIVE_SIZE) { - /* 2: message header is got, match message to cycle buffer */ - m_pMsgReply->setBuf(l_fds_ifd->recv.cur_addr); - m_pMsgReply->setHeaderToHost(); - } else { - /* 2: message header is got, match message to cycle buffer */ - m_pMsgReply->setBuf(l_fds_ifd->recv.cur_addr); } + } - if ((unsigned)m_pMsgReply->getLength() > (unsigned)MAX_PAYLOAD_SIZE) { - // Message received was larger than expected, message ignored. - only on stream mode. - print_log("Message received was larger than expected, message ignored.", l_fds_ifd); + ServerMessageHandlerCallback callback(*this, ifd, recvfrom_addr, l_fds_ifd); + bool ok = input_handler.iterate_over_buffers(callback); + input_handler.cleanup(); + if (likely(ok)) { + return (!do_update); + } else { + if (l_fds_ifd->sock_type == SOCK_STREAM) { close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd); - return (do_update); - } - - /* 3: message is not complete */ - if ((l_fds_ifd->recv.cur_offset + nbytes) < m_pMsgReply->getLength()) { - l_fds_ifd->recv.cur_size -= nbytes; - l_fds_ifd->recv.cur_offset += nbytes; - - /* 4: set current buffer size to size of remained part of message to - * guarantee getting full message on next iteration (using extended reserved memory) - * and shift to start of cycle buffer - */ - if (l_fds_ifd->recv.cur_size < (int)m_pMsgReply->getMaxSize()) { - l_fds_ifd->recv.cur_size = m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset; - } -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - return (!do_update); } + return (do_update); + } +} - /* 5: message is complete shift to process next one */ - nbytes -= m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset; - l_fds_ifd->recv.cur_addr += m_pMsgReply->getLength(); - l_fds_ifd->recv.cur_size -= m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset; - l_fds_ifd->recv.cur_offset = 0; +template +inline bool Server::handle_message(int ifd, + struct sockaddr_in &recvfrom_addr, fds_data *l_fds_ifd) +{ + struct sockaddr_in sendto_addr; #if defined(LOG_TRACE_MSG_IN) && (LOG_TRACE_MSG_IN == TRUE) - printf(">>> "); - hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE); + printf(">>> "); + hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE); #endif /* LOG_TRACE_MSG_IN */ - if (g_b_exit) return (!do_update); - if (!m_pMsgReply->isClient()) { - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message - */ - if (!nbytes) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - } -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - return (!do_update); - } + if (unlikely(!m_pMsgReply->isValidHeader())) { + print_log("Message received was larger than expected, message ignored.", l_fds_ifd); + return false; + } + if (g_b_exit) { + return false; + } + if (unlikely(!m_pMsgReply->isClient())) { + return true; + } + if (unlikely(m_pMsgReply->isWarmupMessage())) { + m_switchCalcGaps.execute(&recvfrom_addr, 0, true); + return true; + } - if (unlikely(m_pMsgReply->isWarmupMessage())) { - m_switchCalcGaps.execute(&recvfrom_addr, 0, true); - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message - */ - if (!nbytes) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - } -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - return (!do_update); - } + g_receiveCount++; //// should move to setRxTime (once we use it in server side) - g_receiveCount++; //// should move to setRxTime (once we use it in server side) + if (m_pMsgReply->getHeader()->isPongRequest()) { + /* if server in a no reply mode - shift to start of cycle buffer*/ + if (g_pApp->m_const_params.b_server_dont_reply) { + return true; + } + /* prepare message header */ + if (g_pApp->m_const_params.mode != MODE_BRIDGE) { + m_pMsgReply->setServer(); + } + /* get source addr to reply. memcpy is not used to improve performance */ + sendto_addr = l_fds_ifd->server_addr; - if (m_pMsgReply->getHeader()->isPongRequest()) { - /* if server in a no reply mode - shift to start of cycle buffer*/ - if (g_pApp->m_const_params.b_server_dont_reply) { -#ifdef USING_VMA_EXTRA_API - if (tmp_vma_buff) goto next; -#endif - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - return (do_update); - } - /* prepare message header */ - if (g_pApp->m_const_params.mode != MODE_BRIDGE) { - m_pMsgReply->setServer(); - } + if (l_fds_ifd->memberships_size || !l_fds_ifd->is_multicast || + g_pApp->m_const_params.b_server_reply_via_uc) { // In unicast case reply to sender /* get source addr to reply. memcpy is not used to improve performance */ - sendto_addr = l_fds_ifd->server_addr; - - if (l_fds_ifd->memberships_size || !l_fds_ifd->is_multicast || - g_pApp->m_const_params.b_server_reply_via_uc) { // In unicast case reply to sender - /* get source addr to reply. memcpy is not used to improve performance */ - sendto_addr = recvfrom_addr; - } else if (l_fds_ifd->is_multicast) { - /* always send to the same port recved from */ - sendto_addr.sin_port = recvfrom_addr.sin_port; - } - int length = m_pMsgReply->getLength(); - m_pMsgReply->setHeaderToNetwork(); - - ret = msg_sendto(ifd, m_pMsgReply->getBuf(), length, &sendto_addr); - if (unlikely(ret == RET_SOCKET_SHUTDOWN)) { - if (l_fds_ifd->sock_type == SOCK_STREAM) { - close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd); - } - return (do_update); - } - m_pMsgReply->setHeaderToHost(); + sendto_addr = recvfrom_addr; + } else if (l_fds_ifd->is_multicast) { + /* always send to the same port recved from */ + sendto_addr.sin_port = recvfrom_addr.sin_port; } + int length = m_pMsgReply->getLength(); + m_pMsgReply->setHeaderToNetwork(); - m_switchCalcGaps.execute(&recvfrom_addr, m_pMsgReply->getSequenceCounter(), false); - m_switchActivityInfo.execute(g_receiveCount); - -#ifdef USING_VMA_EXTRA_API - next: - if (tmp_vma_buff) { - ret = msg_process_next(l_fds_ifd, &tmp_vma_buff, &nbytes); - if (ret) { - return (!do_update); + int ret = msg_sendto(ifd, m_pMsgReply->getBuf(), length, &sendto_addr); + if (unlikely(ret == RET_SOCKET_SHUTDOWN)) { + if (l_fds_ifd->sock_type == SOCK_STREAM) { + close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd); } + return false; } -#endif + m_pMsgReply->setHeaderToHost(); } - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message - */ - // nbytes == 0 - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; + m_switchCalcGaps.execute(&recvfrom_addr, m_pMsgReply->getSequenceCounter(), false); + m_switchActivityInfo.execute(g_receiveCount); - return (!do_update); + return true; } #endif /* SERVER_H_ */ diff --git a/src/sockperf.cpp b/src/sockperf.cpp index 54640a12..1085636c 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -81,6 +81,7 @@ #include #include "common.h" #include "message.h" +#include "message_parser.h" #include "packet.h" #include "switches.h" #include "aopt.h" @@ -2461,6 +2462,21 @@ void set_defaults() { //------------------------------------------------------------------------------ #ifdef USING_VMA_EXTRA_API +class CallbackMessageHandler { + int m_fd; + fds_data *m_fds_ifd; + struct vma_info_t *m_vma_info; + +public: + inline CallbackMessageHandler(int fd, fds_data *l_fds_ifd, struct vma_info_t *vma_info) : + m_fd(fd), + m_fds_ifd(l_fds_ifd), + m_vma_info(vma_info) + {} + + inline bool handle_message(); +}; + vma_recv_callback_retval_t myapp_vma_recv_pkt_filter_callback(int fd, size_t iov_sz, struct iovec iov[], struct vma_info_t *vma_info, @@ -2474,11 +2490,6 @@ vma_recv_callback_retval_t myapp_vma_recv_pkt_filter_callback(int fd, size_t iov } #endif - if (iov_sz) { - }; - if (context) { - }; - // Check info structure version if (vma_info->struct_sz < sizeof(struct vma_info_t)) { log_msg("VMA's info struct is not something we can handle so un-register the application's " @@ -2494,175 +2505,90 @@ vma_recv_callback_retval_t myapp_vma_recv_pkt_filter_callback(int fd, size_t iov return VMA_PACKET_RECV; } - size_t index; - int nbytes; struct fds_data *l_fds_ifd; - Message *msgReply; - uint8_t *start_addrs; - struct sockaddr_in sendto_addr; l_fds_ifd = g_fds_array[fd]; - - if (!l_fds_ifd) { + if (unlikely(!l_fds_ifd)) { return VMA_PACKET_RECV; } + Message *msgReply = l_fds_ifd->p_msg; + SocketRecvData &recv_data = l_fds_ifd->recv; - msgReply = l_fds_ifd->p_msg; - - // Copy and concatenate received data in local reserved buffer - nbytes = 0; - for (index = 0; index < iov_sz; index++) { - nbytes += iov[index].iov_len; - } - if (nbytes > l_fds_ifd->recv.cur_size) { - memmove(l_fds_ifd->recv.buf, l_fds_ifd->recv.cur_addr, l_fds_ifd->recv.cur_offset); - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size - l_fds_ifd->recv.cur_offset; - if (nbytes > l_fds_ifd->recv.cur_size) { - log_msg("Can't handle data in callback : Received data bigger than available buffer"); - /* - * TODO going to recvfrom will not work if this callback call is coming from recvfrom - * context. - * this is because the callback is working on the same buffer that was given to - * recvfrom, so - * recvfrom will return and override the data that the callback wrote. - * if working with recvfrom, and not with iomux, need to give recvfrom and callback - * different buffers. - * This should be fixed in sockperf, so we also won't need to call the above memmove. - */ + CallbackMessageHandler handler(fd, l_fds_ifd, vma_info); + MessageParser parser(msgReply); + for (size_t i = 0; i < iov_sz; ++i) { + bool ok = parser.process_buffer(handler, recv_data, (uint8_t *)iov[i].iov_base, + (int)iov[i].iov_len); + if (unlikely(!ok)) { return VMA_PACKET_RECV; } } - nbytes = 0; - for (index = 0; index < iov_sz; index++) { - start_addrs = l_fds_ifd->recv.cur_addr + l_fds_ifd->recv.cur_offset + nbytes; - memcpy(start_addrs, iov[index].iov_base, iov[index].iov_len); - nbytes += iov[index].iov_len; - } - while (nbytes) { - /* 1: message header is not received yet */ - if ((l_fds_ifd->recv.cur_offset + nbytes) < MsgHeader::EFFECTIVE_SIZE) { - l_fds_ifd->recv.cur_size -= nbytes; - l_fds_ifd->recv.cur_offset += nbytes; + return VMA_PACKET_DROP; +} - /* 4: set current buffer size to size of remained part of message header to - * guarantee getting full message header on next iteration - */ - if (l_fds_ifd->recv.cur_size < MsgHeader::EFFECTIVE_SIZE) { - l_fds_ifd->recv.cur_size = MsgHeader::EFFECTIVE_SIZE - l_fds_ifd->recv.cur_offset; - } - return VMA_PACKET_DROP; - } else if (l_fds_ifd->recv.cur_offset < MsgHeader::EFFECTIVE_SIZE) { - /* 2: message header is got, match message to cycle buffer */ - msgReply->setBuf(l_fds_ifd->recv.cur_addr); - msgReply->setHeaderToHost(); - } else { - /* 2: message header is got, match message to cycle buffer */ - msgReply->setBuf(l_fds_ifd->recv.cur_addr); - } +inline bool CallbackMessageHandler::handle_message() +{ + struct sockaddr_in sendto_addr; - if ((unsigned)msgReply->getLength() > (unsigned)MAX_PAYLOAD_SIZE) { - log_msg("Message received was larger than expected, handle from recv_from"); - return VMA_PACKET_RECV; - } - /* 3: message is not complete */ - if ((l_fds_ifd->recv.cur_offset + nbytes) < msgReply->getLength()) { - l_fds_ifd->recv.cur_size -= nbytes; - l_fds_ifd->recv.cur_offset += nbytes; - /* 4: set current buffer size to size of remained part of message to - * guarantee getting full message on next iteration (using extended reserved memory) - * and shift to start of cycle buffer - */ - if (l_fds_ifd->recv.cur_size < (int)msgReply->getMaxSize()) { - l_fds_ifd->recv.cur_size = msgReply->getLength() - l_fds_ifd->recv.cur_offset; - } - return VMA_PACKET_DROP; + Message *msgReply = m_fds_ifd->p_msg; + + if (unlikely(g_b_exit)) { + return false; + } + if (unlikely(!msgReply->isValidHeader())) { + log_msg("Message received was larger than expected, handle from recv_from"); + return false; + } + if (unlikely(!msgReply->isClient())) { + return true; + } + if (unlikely(msgReply->isWarmupMessage())) { + return true; + } + + g_receiveCount++; + + if (msgReply->getHeader()->isPongRequest()) { + /* if server in a no reply mode - shift to start of cycle buffer*/ + if (g_pApp->m_const_params.b_server_dont_reply) { + return true; } - /* 5: message is complete shift to process next one */ - nbytes -= msgReply->getLength() - l_fds_ifd->recv.cur_offset; - l_fds_ifd->recv.cur_addr += msgReply->getLength(); - l_fds_ifd->recv.cur_size -= msgReply->getLength() - l_fds_ifd->recv.cur_offset; - l_fds_ifd->recv.cur_offset = 0; - - if (g_b_exit) return VMA_PACKET_DROP; - if (!msgReply->isClient()) { - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message - */ - if (!nbytes) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - } - return VMA_PACKET_DROP; + /* prepare message header */ + if (g_pApp->m_const_params.mode != MODE_BRIDGE) { + msgReply->setServer(); } + /* get source addr to reply. memcpy is not used to improve performance */ + sendto_addr = m_fds_ifd->server_addr; - if (msgReply->isWarmupMessage()) { - // m_switchCalcGaps.execute(vma_info->src, 0, true); - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message - */ - if (!nbytes) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; + if (m_fds_ifd->memberships_size || !m_fds_ifd->is_multicast || + g_pApp->m_const_params.b_server_reply_via_uc) { // In unicast case reply to sender + /* get source addr to reply. memcpy is not used to improve performance */ + sendto_addr = *m_vma_info->src; + } else if (m_fds_ifd->is_multicast) { + /* always send to the same port recved from */ + sendto_addr.sin_port = m_vma_info->src->sin_port; + } + int length = msgReply->getLength(); + msgReply->setHeaderToNetwork(); + msg_sendto(m_fd, msgReply->getBuf(), length, &sendto_addr); + /*if (ret == RET_SOCKET_SHUTDOWN) { + if (m_fds_ifd->sock_type == SOCK_STREAM) { + close_ifd( m_fds_ifd->next_fd,ifd,m_fds_ifd); } return VMA_PACKET_DROP; - } - - g_receiveCount++; - - if (msgReply->getHeader()->isPongRequest()) { - /* if server in a no reply mode - shift to start of cycle buffer*/ - if (g_pApp->m_const_params.b_server_dont_reply) { - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; - return VMA_PACKET_DROP; - } - /* prepare message header */ - if (g_pApp->m_const_params.mode != MODE_BRIDGE) { - msgReply->setServer(); - } - /* get source addr to reply. memcpy is not used to improve performance */ - sendto_addr = l_fds_ifd->server_addr; - - if (l_fds_ifd->memberships_size || !l_fds_ifd->is_multicast || - g_pApp->m_const_params.b_server_reply_via_uc) { // In unicast case reply to sender - /* get source addr to reply. memcpy is not used to improve performance */ - sendto_addr = *vma_info->src; - } else if (l_fds_ifd->is_multicast) { - /* always send to the same port recved from */ - sendto_addr.sin_port = vma_info->src->sin_port; - } - int length = msgReply->getLength(); - msgReply->setHeaderToNetwork(); - msg_sendto(fd, msgReply->getBuf(), length, &sendto_addr); - /*if (ret == RET_SOCKET_SHUTDOWN) { - if (l_fds_ifd->sock_type == SOCK_STREAM) { - close_ifd( l_fds_ifd->next_fd,ifd,l_fds_ifd); - } - return VMA_PACKET_DROP; - }*/ - msgReply->setHeaderToHost(); - } - /* - * TODO - * To support other server functionality when using zero callback, - * pass the server as user_context or as we pass the replyMsg, and call the server functions - */ - // m_switchCalcGaps.execute(vma_info->src, msgReply->getSequenceCounter(), false); - // m_switchActivityInfo.execute(g_receiveCount); + }*/ + msgReply->setHeaderToHost(); } - /* 6: shift to start of cycle buffer in case receiving buffer is empty and - * there is no uncompleted message + /* + * TODO + * To support other server functionality when using zero callback, + * pass the server as user_context or as we pass the replyMsg, and call the server functions */ - l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf; - l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size; - l_fds_ifd->recv.cur_offset = 0; + // m_switchCalcGaps.execute(vma_info->src, msgReply->getSequenceCounter(), false); + // m_switchActivityInfo.execute(g_receiveCount); - return VMA_PACKET_DROP; + return true; } #endif