Skip to content

Commit

Permalink
perf: block busyloop with epoll, do not reuse pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
horror-proton committed Dec 22, 2023
1 parent 607d0f9 commit b34469a
Showing 1 changed file with 87 additions and 73 deletions.
160 changes: 87 additions & 73 deletions src/MaaCore/Controller/Platform/PosixIO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

#include <fcntl.h>
#include <signal.h>
#include <sys/epoll.h>
#include <sys/errno.h>
#include <sys/socket.h>
#ifndef __APPLE__
#include <sys/prctl.h>
#endif
#include <cstring>
#include <sys/wait.h>
#include <unistd.h>

Expand All @@ -21,14 +23,6 @@ asst::PosixIO::PosixIO(Assistant* inst) : InstHelper(inst)
{
LogTraceFunction;

int pipe_in_ret = ::pipe(m_pipe_in);
int pipe_out_ret = ::pipe(m_pipe_out);
::fcntl(m_pipe_out[PIPE_READ], F_SETFL, O_NONBLOCK);

if (pipe_in_ret < 0 || pipe_out_ret < 0) {
Log.error(__FUNCTION__, "controller pipe created failed", pipe_in_ret, pipe_out_ret);
}

m_support_socket = true;
}

Expand All @@ -40,11 +34,6 @@ asst::PosixIO::~PosixIO()
::close(m_server_sock);
m_server_sock = -1;
}

::close(m_pipe_in[PIPE_READ]);
::close(m_pipe_in[PIPE_WRITE]);
::close(m_pipe_out[PIPE_READ]);
::close(m_pipe_out[PIPE_WRITE]);
}

std::optional<int> asst::PosixIO::call_command(const std::string& cmd, const bool recv_by_socket,
Expand All @@ -60,80 +49,105 @@ std::optional<int> asst::PosixIO::call_command(const std::string& cmd, const boo
return timeout && timeout < duration_cast<milliseconds>(steady_clock::now() - start_time).count();
};

int pipe_in[2] {};
int pipe_out[2] {};
::pipe(pipe_in);
::pipe(pipe_out);
::fcntl(pipe_out[PIPE_READ], F_SETFL, O_NONBLOCK);

int exit_ret = 0;
m_child = ::fork();
if (m_child == 0) {
// child process

::dup2(m_pipe_in[PIPE_READ], STDIN_FILENO);
::dup2(m_pipe_out[PIPE_WRITE], STDOUT_FILENO);
::dup2(m_pipe_out[PIPE_WRITE], STDERR_FILENO);
::dup2(pipe_in[PIPE_READ], STDIN_FILENO);
::dup2(pipe_out[PIPE_WRITE], STDOUT_FILENO);
::dup2(pipe_out[PIPE_WRITE], STDERR_FILENO);

// all these are for use by parent only
// close(m_pipe_in[PIPE_READ]);
// close(m_pipe_in[PIPE_WRITE]);
// close(m_pipe_out[PIPE_READ]);
// close(m_pipe_out[PIPE_WRITE]);
::close(pipe_in[PIPE_WRITE]);
::close(pipe_out[PIPE_READ]);
// TODO: close all other fds

exit_ret = execlp("sh", "sh", "-c", cmd.c_str(), nullptr);
::exit(exit_ret);
Log.error("exec failed:", std::strerror(errno));
return std::nullopt;
}
::close(pipe_in[PIPE_READ]);
::close(pipe_out[PIPE_WRITE]);
if (m_child < 0) {
// failed to create child process
::close(pipe_in[PIPE_WRITE]);
::close(pipe_out[PIPE_READ]);
Log.error("Call `", cmd, "` create process failed:", std::strerror(errno));
return std::nullopt;
}
else if (m_child > 0) {
// parent process
if (recv_by_socket) {
sockaddr addr {};
socklen_t len = sizeof(addr);
sock_buffer = asst::platform::single_page_buffer<char>();

int client_socket = ::accept(m_server_sock, &addr, &len);
if (client_socket < 0) {
Log.error("accept failed:", strerror(errno));
::kill(m_child, SIGKILL);
::waitpid(m_child, &exit_ret, 0);
return std::nullopt;
}

ssize_t read_num = ::read(client_socket, sock_buffer.get(), sock_buffer.size());
while (read_num > 0) {
sock_data.insert(sock_data.end(), sock_buffer.get(), sock_buffer.get() + read_num);
read_num = ::read(client_socket, sock_buffer.get(), sock_buffer.size());
}
::shutdown(client_socket, SHUT_RDWR);
::close(client_socket);
// parent process
if (recv_by_socket) {
sockaddr addr {};
socklen_t len = sizeof(addr);
sock_buffer = asst::platform::single_page_buffer<char>();

int client_socket = ::accept(m_server_sock, &addr, &len);
if (client_socket < 0) {
Log.error("accept failed:", strerror(errno));
::kill(m_child, SIGTERM);
Log.error("Killing child `", cmd, "`, pid:", m_child);
::kill(m_child, SIGKILL);
::waitpid(m_child, &exit_ret, 0);
return std::nullopt;
}

// has the child exited in the loop?
bool child_exited = false;

// whether we recv_by_socket or not, we have to
// drain the output pipe so that it doesn't block I/O.
do {
ssize_t read_num = ::read(m_pipe_out[PIPE_READ], pipe_buffer.get(), pipe_buffer.size());
while (read_num > 0) {
if (!recv_by_socket) {
pipe_data.insert(pipe_data.end(), pipe_buffer.get(), pipe_buffer.get() + read_num);
}
read_num = ::read(m_pipe_out[PIPE_READ], pipe_buffer.get(), pipe_buffer.size());
}
if (::waitpid(m_child, &exit_ret, WNOHANG) != 0) {
child_exited = true;
break;
}
if (check_timeout()) {
Log.warn("timeout when reading the output, will kill the child: ", m_child);
break;
}
} while (true);
ssize_t read_num = ::read(client_socket, sock_buffer.get(), sock_buffer.size());
while (read_num > 0) {
sock_data.insert(sock_data.end(), sock_buffer.get(), sock_buffer.get() + read_num);
read_num = ::read(client_socket, sock_buffer.get(), sock_buffer.size());
}
::shutdown(client_socket, SHUT_RDWR);
::close(client_socket);
}

if (!child_exited) {
::kill(m_child, SIGKILL);
::waitpid(m_child, &exit_ret, 0);
// has the child exited in the loop?
bool child_exited = false;

static const auto epfd = ::epoll_create1(EPOLL_CLOEXEC);
{
::epoll_event ev {};
ev.events = POLL_IN;
ev.data.fd = pipe_out[PIPE_READ];
::epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_out[PIPE_READ], &ev);
}

// whether we recv_by_socket or not, we have to
// drain the output pipe so that it doesn't block I/O.
while (true) {
ssize_t read_num = ::read(pipe_out[PIPE_READ], pipe_buffer.get(), pipe_buffer.size());
while (read_num > 0) {
if (!recv_by_socket) {
pipe_data.insert(pipe_data.end(), pipe_buffer.get(), pipe_buffer.get() + read_num);
}
read_num = ::read(pipe_out[PIPE_READ], pipe_buffer.get(), pipe_buffer.size());
}
if (::waitpid(m_child, &exit_ret, WNOHANG) != 0) {
child_exited = true;
break;
}
if (check_timeout()) {
Log.warn("timeout when reading the output, killing child:", m_child);
break;
}
std::array<::epoll_event, 1> events {};
::epoll_wait(epfd, events.data(), 1, 1000);
}
else {
// failed to create child process
Log.error("Call `", cmd, "` create process failed, child:", m_child);
return std::nullopt;

::close(pipe_in[PIPE_WRITE]);
::close(pipe_out[PIPE_READ]);

if (!child_exited) {
::kill(m_child, SIGTERM);
Log.error("Killing child `", cmd, "`, pid:", m_child);
::kill(m_child, SIGKILL);
::waitpid(m_child, &exit_ret, 0);
}

return exit_ret;
Expand Down Expand Up @@ -223,7 +237,7 @@ std::shared_ptr<asst::IOHandler> asst::PosixIO::interactive_shell(const std::str
::prctl(PR_SET_PDEATHSIG, SIGTERM);
#endif

::execlp("/bin/sh", "sh", "-c", cmd.c_str(), nullptr);
::execlp("sh", "sh", "-c", cmd.c_str(), nullptr);
exit(-1);
}

Expand Down

0 comments on commit b34469a

Please sign in to comment.