Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Async client disconnect #2372

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions core/async.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void uwsgi_async_init() {
// optimization, this array maps file descriptor to requests
uwsgi.async_waiting_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd);
uwsgi.async_proto_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd);
uwsgi.async_idle_fd_table = uwsgi_calloc(sizeof(struct wsgi_request *) * uwsgi.max_fd);

}

Expand All @@ -53,6 +54,10 @@ struct wsgi_request *find_wsgi_req_by_fd(int fd) {
return uwsgi.async_waiting_fd_table[fd];
}

struct wsgi_request *find_wsgi_req_idle_by_fd(int fd) {
return uwsgi.async_idle_fd_table[fd];
}

static void runqueue_remove(struct uwsgi_async_request *u_request) {

struct uwsgi_async_request *parent = u_request->prev;
Expand Down Expand Up @@ -125,7 +130,11 @@ void async_reset_request(struct wsgi_request *wsgi_req) {

struct uwsgi_async_fd *uaf = wsgi_req->waiting_fds;
while (uaf) {
event_queue_del_fd(uwsgi.async_queue, uaf->fd, uaf->event);
if (uaf->fd == wsgi_req->fd) {
event_queue_idle_fd(uwsgi.async_queue, uaf->fd);
} else {
event_queue_del_fd(uwsgi.async_queue, uaf->fd, uaf->event);
}
uwsgi.async_waiting_fd_table[uaf->fd] = NULL;
struct uwsgi_async_fd *current_uaf = uaf;
uaf = current_uaf->next;
Expand Down Expand Up @@ -199,7 +208,11 @@ int async_add_fd_read(struct wsgi_request *wsgi_req, int fd, int timeout) {
}
uwsgi.async_waiting_fd_table[fd] = wsgi_req;
wsgi_req->async_force_again = 1;
return event_queue_add_fd_read(uwsgi.async_queue, fd);
if (fd == wsgi_req->fd) {
return event_queue_fd_write_to_read(uwsgi.async_queue, fd);
} else {
return event_queue_add_fd_read(uwsgi.async_queue, fd);
}
}

static int async_wait_fd_read(int fd, int timeout) {
Expand Down Expand Up @@ -307,7 +320,11 @@ int async_add_fd_write(struct wsgi_request *wsgi_req, int fd, int timeout) {

uwsgi.async_waiting_fd_table[fd] = wsgi_req;
wsgi_req->async_force_again = 1;
return event_queue_add_fd_write(uwsgi.async_queue, fd);
if (fd == wsgi_req->fd) {
return event_queue_fd_read_to_write(uwsgi.async_queue, fd);
} else {
return event_queue_add_fd_write(uwsgi.async_queue, fd);
}
}

static int async_wait_fd_write(int fd, int timeout) {
Expand Down Expand Up @@ -533,6 +550,15 @@ void async_loop() {
uwsgi_sock = uwsgi_sock->next;
}

if (event_queue_interesting_fd_is_closed(events, i)) {
uwsgi.wsgi_req = find_wsgi_req_idle_by_fd(interesting_fd);
if (uwsgi.wsgi_req) {
uwsgi.wsgi_req->async_closed = 1;
runqueue_push(uwsgi.wsgi_req);
continue;
}
}

if (!is_a_new_connection) {
// proto event
uwsgi.wsgi_req = find_wsgi_req_proto_by_fd(interesting_fd);
Expand All @@ -544,7 +570,8 @@ void async_loop() {
if (!proto_parser_status) {
// remove fd from event poll and fd proto table
uwsgi.async_proto_fd_table[interesting_fd] = NULL;
event_queue_del_fd(uwsgi.async_queue, interesting_fd, event_queue_read());
event_queue_idle_fd(uwsgi.async_queue, interesting_fd);
uwsgi.async_idle_fd_table[interesting_fd] = uwsgi.wsgi_req;
// put request in the runqueue (set it as UWSGI_OK to signal the first run)
uwsgi.wsgi_req->async_status = UWSGI_OK;
runqueue_push(uwsgi.wsgi_req);
Expand Down
39 changes: 31 additions & 8 deletions core/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ int event_queue_add_fd_read(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLIN;
ee.events = EPOLLIN | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)) {
Expand All @@ -532,7 +532,7 @@ int event_queue_fd_write_to_read(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLIN;
ee.events = EPOLLIN | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
Expand All @@ -548,7 +548,7 @@ int event_queue_fd_read_to_write(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLOUT;
ee.events = EPOLLOUT | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
Expand All @@ -564,7 +564,7 @@ int event_queue_fd_readwrite_to_read(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLIN;
ee.events = EPOLLIN | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
Expand All @@ -580,7 +580,7 @@ int event_queue_fd_readwrite_to_write(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLOUT;
ee.events = EPOLLOUT | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
Expand All @@ -597,7 +597,7 @@ int event_queue_fd_read_to_readwrite(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLIN | EPOLLOUT;
ee.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
Expand All @@ -613,7 +613,7 @@ int event_queue_fd_write_to_readwrite(int eq, int fd) {
struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLIN | EPOLLOUT;
ee.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
Expand Down Expand Up @@ -642,12 +642,28 @@ int event_queue_del_fd(int eq, int fd, int event) {
return 0;
}

int event_queue_idle_fd(int eq, int fd) {

struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.data.fd = fd;
ee.events = EPOLLRDHUP;

if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
uwsgi_error("epoll_ctl()");
return -1;
}

return 0;
}

int event_queue_add_fd_write(int eq, int fd) {

struct epoll_event ee;

memset(&ee, 0, sizeof(struct epoll_event));
ee.events = EPOLLOUT;
ee.events = EPOLLOUT | EPOLLRDHUP;
ee.data.fd = fd;

if (epoll_ctl(eq, EPOLL_CTL_ADD, fd, &ee)) {
Expand Down Expand Up @@ -692,6 +708,13 @@ int event_queue_interesting_fd_is_write(void *events, int id) {
return 0;
}

int event_queue_interesting_fd_is_closed(void *events, int id) {
struct epoll_event *ee = (struct epoll_event *) events;
if (ee[id].events & EPOLLRDHUP) {
return 1;
}
return 0;
}

int event_queue_wait_multi(int eq, int timeout, void *events, int nevents) {

Expand Down
2 changes: 2 additions & 0 deletions plugins/python/uwsgi_python.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#define uwsgi_py_write_set_exception(x) if (!uwsgi.disable_write_exception) { PyErr_SetString(PyExc_IOError, "write error"); };
#define uwsgi_py_write_exception(x) uwsgi_py_write_set_exception(x); uwsgi_manage_exception(x, 0);

#define uwsgi_py_closed_set_exception(x) };
#define uwsgi_py_closed_exception(x) PyErr_SetString(PyExc_IOError, "Connection reset by peer"); uwsgi_manage_exception(x, 0);

#define uwsgi_py_check_write_errors if (wsgi_req->write_errors > 0 && uwsgi.write_errors_exception_only) {\
uwsgi_py_write_set_exception(wsgi_req);\
Expand Down
5 changes: 5 additions & 0 deletions plugins/python/wsgi_subhandler.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ int uwsgi_response_subhandler_wsgi(struct wsgi_request *wsgi_req) {

PyObject *pychunk;

if (wsgi_req->async_closed) {
uwsgi_py_closed_exception(wsgi_req);
goto clear;
}

// return or yield ?
// in strict mode we do not optimize apps directly returning strings (or bytes)
if (!up.wsgi_strict) {
Expand Down
4 changes: 4 additions & 0 deletions uwsgi.h
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,7 @@ struct wsgi_request {

int async_id;
int async_status;
int async_closed;

int switches;
size_t write_pos;
Expand Down Expand Up @@ -2366,6 +2367,7 @@ struct uwsgi_server {
// async commodity
struct wsgi_request **async_waiting_fd_table;
struct wsgi_request **async_proto_fd_table;
struct wsgi_request **async_idle_fd_table;
struct uwsgi_async_request *async_runqueue;
struct uwsgi_async_request *async_runqueue_last;

Expand Down Expand Up @@ -3407,6 +3409,7 @@ int event_queue_init(void);
void *event_queue_alloc(int);
int event_queue_add_fd_read(int, int);
int event_queue_add_fd_write(int, int);
int event_queue_idle_fd(int, int);
int event_queue_del_fd(int, int, int);
int event_queue_wait(int, int, int *);
int event_queue_wait_multi(int, int, void *, int);
Expand All @@ -3420,6 +3423,7 @@ int event_queue_fd_read_to_readwrite(int, int);
int event_queue_fd_write_to_readwrite(int, int);
int event_queue_interesting_fd_is_read(void *, int);
int event_queue_interesting_fd_is_write(void *, int);
int event_queue_interesting_fd_is_closed(void *, int);

int event_queue_add_timer(int, int *, int);
int event_queue_add_timer_hr(int, int *, int, long);
Expand Down