Skip to content

Commit

Permalink
issue: 2681317 integrate InputHandlers into client/server/callback code
Browse files Browse the repository at this point in the history
  • Loading branch information
agalanin-at-nvidia authored and igor-ivanov committed Dec 24, 2021
1 parent b852b49 commit ee6fe35
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 652 deletions.
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ AC_LANG(C++)

SP_CHECK_CXXFLAGS_APPEND([OUR_CXXFLAGS], [\
-Wall \
"--param inline-unit-growth=200"])
"--param inline-unit-growth=300"])

##########################################################################
# check VMA extra API
Expand Down
5 changes: 3 additions & 2 deletions contrib/jenkins_tests/globals.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nproc=$(grep processor /proc/cpuinfo|wc -l)
make_opt="-j$(($nproc / 2 + 1))"
if [ $(command -v timeout >/dev/null 2>&1 && echo $?) ]; then
timeout_exe="timeout -s SIGKILL 20m"
long_timeout_exe="timeout -s SIGKILL 40m"
fi

trap "on_exit" INT TERM ILL KILL FPE SEGV ALRM
Expand Down Expand Up @@ -206,10 +207,10 @@ function do_check_result()
{
set +e
if [ -z "$5" ]; then
eval $timeout_exe $1
eval $long_timeout_exe $1
ret=$?
else
eval $timeout_exe $1 2>> "${5}.err" 1>> "${5}.log"
eval $long_timeout_exe $1 2>> "${5}.err" 1>> "${5}.log"
ret=$?
do_archive "${5}.err" "${5}.log"
fi
Expand Down
227 changes: 93 additions & 134 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define CLIENT_H_

#include "common.h"
#include "input_handlers.h"
#include "packet.h"

//==============================================================================
Expand Down Expand Up @@ -62,6 +63,35 @@ class Client : public ClientBase {
PongModeCare m_pongModeCare; // has msg_sendto() method and can be one of: PongModeNormal,
// PongModeAlways, PongModeNever

class ClientMessageHandlerCallback {
Client<IoType, SwitchDataIntegrity, SwitchActivityInfo,
SwitchCycleDuration, SwitchMsgSize, PongModeCare> &m_client;
int m_ifd;
struct sockaddr_in &m_recvfrom_addr;
int m_receiveCount;

public:
inline ClientMessageHandlerCallback(Client<IoType, SwitchDataIntegrity, SwitchActivityInfo,
SwitchCycleDuration, SwitchMsgSize, PongModeCare> &client,
int ifd, struct sockaddr_in &recvfrom_addr) :
m_client(client),
m_ifd(ifd),
m_recvfrom_addr(recvfrom_addr),
m_receiveCount(0)
{
}

inline bool handle_message()
{
return m_client.handle_message(m_ifd, m_recvfrom_addr, m_receiveCount);
}

inline int getReceiveCount() const
{
return m_receiveCount;
}
};

public:
Client(int _fd_min, int _fd_max, int _fd_num);
virtual ~Client();
Expand Down Expand Up @@ -136,174 +166,103 @@ class Client : public ClientBase {
}

//------------------------------------------------------------------------------
template <class InputHandler>
inline unsigned int client_receive_from_selected(int ifd) {
int ret = 0;
struct sockaddr_in recvfrom_addr;
int receiveCount = 0;
int serverNo = 0;
int remain_buffer = 0;
fds_data *l_fds_ifd = g_fds_array[ifd];

TicksTime rxTime;
#ifdef USING_VMA_EXTRA_API
vma_buff_t *tmp_vma_buff = g_vma_buff;
if (SOCKETXTREME == g_pApp->m_const_params.fd_handler_type && tmp_vma_buff) {
ret = msg_recv_socketxtreme(l_fds_ifd, tmp_vma_buff, &recvfrom_addr);
} else if (g_pApp->m_const_params.is_vmazcopyread &&
!(remain_buffer = free_vma_packets(ifd, l_fds_ifd->recv.cur_size))) {
// Handled buffer is filled, free_vma_packets returns 0
ret = l_fds_ifd->recv.cur_size;
} else
#endif
{
ret = msg_recvfrom(ifd, l_fds_ifd->recv.cur_addr + l_fds_ifd->recv.cur_offset,
l_fds_ifd->recv.cur_size, &recvfrom_addr, &l_fds_ifd->recv.cur_addr,
remain_buffer);
}
InputHandler input_handler(m_pMsgReply, l_fds_ifd->recv);
ret = input_handler.receive_pending_data(ifd, &recvfrom_addr);
if (unlikely(ret <= 0)) {
input_handler.cleanup();
if (ret == RET_SOCKET_SHUTDOWN) {
if (l_fds_ifd->sock_type == SOCK_STREAM) {
exit_with_log("A connection was forcibly closed by a peer", SOCKPERF_ERR_SOCKET,
l_fds_ifd);
}
}
if (ret < 0) return 0;
else /* (ret < 0) */ return 0;
}

int nbytes = ret;
while (nbytes) {
/* 1: message header is not received yet */
if ((l_fds_ifd->recv.cur_offset + nbytes) < MsgHeader::EFFECTIVE_SIZE) {
l_fds_ifd->recv.cur_size -= nbytes;
l_fds_ifd->recv.cur_offset += nbytes;

/* 4: set current buffer size to size of remained part of message header to
* guarantee getting full message header on next iteration
*/
if (l_fds_ifd->recv.cur_size < MsgHeader::EFFECTIVE_SIZE) {
l_fds_ifd->recv.cur_size =
MsgHeader::EFFECTIVE_SIZE - l_fds_ifd->recv.cur_offset;
}
#ifdef USING_VMA_EXTRA_API
if (tmp_vma_buff) goto next;
#endif
return (receiveCount);
} else if (l_fds_ifd->recv.cur_offset < MsgHeader::EFFECTIVE_SIZE) {
/* 2: message header is got, match message to cycle buffer */
m_pMsgReply->setBuf(l_fds_ifd->recv.cur_addr);
m_pMsgReply->setHeaderToHost();
} else {
/* 2: message header is got, match message to cycle buffer */
m_pMsgReply->setBuf(l_fds_ifd->recv.cur_addr);
}
ClientMessageHandlerCallback callback(*this, ifd, recvfrom_addr);
input_handler.iterate_over_buffers(callback);
input_handler.cleanup();

if (unlikely(m_pMsgReply->getSequenceCounter() > m_pMsgRequest->getSequenceCounter())) {
exit_with_err("Sequence Number received was higher then expected",
SOCKPERF_ERR_FATAL);
}
if (unlikely(m_pMsgReply->getLength() > MAX_PAYLOAD_SIZE)) {
exit_with_err("Message received was larger than expected.", SOCKPERF_ERR_FATAL);
}
return callback.getReceiveCount();
}

/* 3: message is not complete */
if ((l_fds_ifd->recv.cur_offset + nbytes) < m_pMsgReply->getLength()) {
l_fds_ifd->recv.cur_size -= nbytes;
l_fds_ifd->recv.cur_offset += nbytes;

/* 4: set current buffer size to size of remained part of message to
* guarantee getting full message on next iteration (using extended reserved
* memory)
* and shift to start of cycle buffer
*/
if (l_fds_ifd->recv.cur_size < (int)m_pMsgReply->getMaxSize()) {
l_fds_ifd->recv.cur_size =
m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset;
}
inline unsigned int client_receive_from_selected(int ifd) {
#ifdef USING_VMA_EXTRA_API
if (tmp_vma_buff) goto next;
if (SOCKETXTREME == g_pApp->m_const_params.fd_handler_type) {
return client_receive_from_selected<SocketXtremeInputHandler>(ifd);
} else if (g_pApp->m_const_params.is_vmazcopyread) {
return client_receive_from_selected<VmaZCopyReadInputHandler>(ifd);
} else
#endif
return (receiveCount);
}
{
return client_receive_from_selected<RecvFromInputHandler>(ifd);
}
}

/* 5: message is complete shift to process next one */
nbytes -= m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset;
l_fds_ifd->recv.cur_addr += m_pMsgReply->getLength();
l_fds_ifd->recv.cur_size -= m_pMsgReply->getLength() - l_fds_ifd->recv.cur_offset;
l_fds_ifd->recv.cur_offset = 0;
inline bool handle_message(int ifd, struct sockaddr_in &recvfrom_addr, int &receiveCount)
{
int serverNo = 0;

#if defined(LOG_TRACE_MSG_IN) && (LOG_TRACE_MSG_IN == TRUE)
printf(">>> ");
hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE);
printf(">>> ");
hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE);
#endif /* LOG_TRACE_MSG_IN */
if (unlikely(!m_pMsgReply->isValidHeader())) {
exit_with_err("Message received was larger than expected.", SOCKPERF_ERR_FATAL);
}
if (unlikely(m_pMsgReply->getSequenceCounter() > m_pMsgRequest->getSequenceCounter())) {
exit_with_err("Sequence Number received was higher than expected",
SOCKPERF_ERR_FATAL);
}

if (g_b_exit) return 0;
if (m_pMsgReply->isClient()) {
assert(!(l_fds_ifd->is_multicast && g_pApp->m_const_params.mc_loop_disable));
#ifdef USING_VMA_EXTRA_API
if (tmp_vma_buff) goto next;
#endif
continue;
}
if (unlikely(g_b_exit)) {
return false;
}
if (m_pMsgReply->isClient()) {
assert(!(g_fds_array[ifd]->is_multicast && g_pApp->m_const_params.mc_loop_disable));
return true;
}

// should not count the warmup messages
if (unlikely(m_pMsgReply->isWarmupMessage())) {
#ifdef USING_VMA_EXTRA_API
if (tmp_vma_buff) goto next;
#endif
continue;
}
// should not count the warmup messages
if (unlikely(m_pMsgReply->isWarmupMessage())) {
return true;
}

receiveCount++;
rxTime.setNow();
receiveCount++;
TicksTime rxTime;
rxTime.setNow();

#if 0 // should be part of check-data-integrity
if (g_pApp->m_const_params.msg_size_range == 0) { //ABH: added 'if', otherwise, size check will not suit latency-under-load
if (nbytes != g_msg_size && errno != EINTR) {
exit_with_log("received message size test failed (sent:%d received:%d)", g_msg_size, nbytes,SOCKPERF_ERR_FATAL);
}
}
#endif

#ifdef DEBUG // should not occur in real test
if (m_pMsgReply->getSequenceCounter() % g_pApp->m_const_params.reply_every) {
log_err("skipping unexpected received message: seqNo=%" PRIu64 " mask=0x%x",
m_pMsgReply->getSequenceCounter(), m_pMsgReply->getFlags());
#ifdef USING_VMA_EXTRA_API
if (tmp_vma_buff) goto next;
#endif
continue;
if (g_pApp->m_const_params.msg_size_range == 0) { //ABH: added 'if', otherwise, size check will not suit latency-under-load
if (nbytes != g_msg_size && errno != EINTR) {
exit_with_log("received message size test failed (sent:%d received:%d)", g_msg_size, nbytes,SOCKPERF_ERR_FATAL);
}
}
#endif

serverNo = client_get_server_id(ifd, &recvfrom_addr);
if (unlikely(serverNo < 0)) {
exit_with_log("Number of servers more than expected", SOCKPERF_ERR_FATAL);
} else {
g_pPacketTimes->setRxTime(m_pMsgReply->getSequenceCounter(), rxTime, serverNo);
m_switchDataIntegrity.execute(m_pMsgRequest, m_pMsgReply);
}

#ifdef USING_VMA_EXTRA_API
next:
if (tmp_vma_buff) {
ret = msg_process_next(l_fds_ifd, &tmp_vma_buff, &nbytes);
if (ret) {
return (receiveCount);
}
}
#endif
#ifdef DEBUG // should not occur in real test
if (m_pMsgReply->getSequenceCounter() % g_pApp->m_const_params.reply_every) {
log_err("skipping unexpected received message: seqNo=%" PRIu64 " mask=0x%x",
m_pMsgReply->getSequenceCounter(), m_pMsgReply->getFlags());
return true;
}
#endif

/* 6: shift to start of cycle buffer in case receiving buffer is empty and
* there is no uncompleted message
*/
if (!nbytes) {
l_fds_ifd->recv.cur_addr = l_fds_ifd->recv.buf;
l_fds_ifd->recv.cur_size = l_fds_ifd->recv.max_size;
l_fds_ifd->recv.cur_offset = 0;
serverNo = client_get_server_id(ifd, &recvfrom_addr);
if (unlikely(serverNo < 0)) {
exit_with_log("Number of servers more than expected", SOCKPERF_ERR_FATAL);
} else {
g_pPacketTimes->setRxTime(m_pMsgReply->getSequenceCounter(), rxTime, serverNo);
m_switchDataIntegrity.execute(m_pMsgRequest, m_pMsgReply);
}

return (receiveCount);
return true;
}

//------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit ee6fe35

Please sign in to comment.