diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 19e568cbc5..e80c8e233a 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -28,6 +28,10 @@ #include #include +#define THIS_FILE "ioq_winnt" + +typedef BOOL (WINAPI *FnCancelIoEx)(HANDLE hFile, LPOVERLAPPED lpOverlapped); +static FnCancelIoEx fnCancelIoEx = NULL; #if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0 # include @@ -92,6 +96,14 @@ union operation_key #endif }; +struct pending_op +{ + PJ_DECL_LIST_MEMBER(struct pending_op); + union operation_key pending_key; + pj_ioqueue_op_key_t *op_key; + pj_pool_t *pool; +}; + /* Type of handle in the key. */ enum handle_type { @@ -121,13 +133,10 @@ struct pj_ioqueue_key_t int connecting; #endif -#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_atomic_t *ref_count; pj_bool_t closing; - pj_time_val free_time; pj_mutex_t *mutex; -#endif - + struct pending_op pending_list; }; /* @@ -135,17 +144,15 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { + pj_pool_t *pool; pj_ioqueue_cfg cfg; HANDLE iocp; pj_lock_t *lock; pj_bool_t auto_delete_lock; pj_bool_t default_concurrency; -#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_ioqueue_key_t active_list; pj_ioqueue_key_t free_list; - pj_ioqueue_key_t closing_list; -#endif /* These are to keep track of connecting sockets */ #if PJ_HAS_TCP @@ -157,13 +164,6 @@ struct pj_ioqueue_t #endif }; - -#if PJ_IOQUEUE_HAS_SAFE_UNREG -/* Prototype */ -static void scan_closing_keys(pj_ioqueue_t *ioqueue); -#endif - - #if PJ_HAS_TCP /* * Process the socket when the overlapped accept() completed. @@ -185,6 +185,7 @@ static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key, SO_UPDATE_ACCEPT_CONTEXT, (char*)&key->hnd, sizeof(SOCKET)); + (void)status; /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. * So ignore the error status. */ @@ -375,6 +376,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool, /* Create IOCP */ ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); + ioqueue->pool = pool; if (cfg) pj_memcpy(&ioqueue->cfg, cfg, sizeof(*cfg)); else @@ -394,13 +396,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool, ioqueue->auto_delete_lock = PJ_TRUE; ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* * Create and initialize key pools. */ pj_list_init(&ioqueue->active_list); pj_list_init(&ioqueue->free_list); - pj_list_init(&ioqueue->closing_list); /* Preallocate keys according to max_fd setting, and put them * in free_list. @@ -409,7 +409,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool, pj_ioqueue_key_t *key; key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); - + pj_list_init(&key->pending_list); rc = pj_atomic_create(pool, 0, &key->ref_count); if (rc != PJ_SUCCESS) { key = ioqueue->free_list.next; @@ -437,11 +437,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create2(pj_pool_t *pool, pj_list_push_back(&ioqueue->free_list, key); } -#endif *p_ioqueue = ioqueue; - PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); + PJ_LOG(4, (THIS_FILE, "WinNT IOCP I/O Queue created (%p)", ioqueue)); return PJ_SUCCESS; } @@ -471,7 +470,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) if (CloseHandle(ioqueue->iocp) != TRUE) return PJ_RETURN_OS_ERROR(GetLastError()); -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Destroy reference counters */ key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { @@ -480,20 +478,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) key = key->next; } - key = ioqueue->closing_list.next; - while (key != &ioqueue->closing_list) { - pj_atomic_destroy(key->ref_count); - pj_mutex_destroy(key->mutex); - key = key->next; - } - key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); pj_mutex_destroy(key->mutex); key = key->next; } -#endif pj_lock_release(ioqueue->lock); if (ioqueue->auto_delete_lock) @@ -551,12 +541,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, pj_lock_acquire(ioqueue->lock); -#if PJ_IOQUEUE_HAS_SAFE_UNREG - /* Scan closing list first to release unused keys. - * Must do this with lock acquired. - */ - scan_closing_keys(ioqueue); - /* If safe unregistration is used, then get the key record from * the free list. */ @@ -575,10 +559,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, rec->closing = 0; -#else - rec = (pj_ioqueue_key_t *)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); -#endif - /* Build the key for this socket. */ rec->ioqueue = ioqueue; rec->hnd = (HANDLE)sock; @@ -624,10 +604,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, *key = rec; -#if PJ_IOQUEUE_HAS_SAFE_UNREG pj_list_push_back(&ioqueue->active_list, rec); -#endif - pj_lock_release(ioqueue->lock); return PJ_SUCCESS; @@ -674,8 +651,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, return PJ_SUCCESS; } - -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Decrement the key's reference counter, and when the counter reach zero, * destroy the key. */ @@ -686,17 +661,51 @@ static void decrement_counter(pj_ioqueue_key_t *key) pj_lock_acquire(key->ioqueue->lock); pj_assert(key->closing == 1); - pj_gettickcount(&key->free_time); - key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; - pj_time_val_normalize(&key->free_time); - pj_list_erase(key); - pj_list_push_back(&key->ioqueue->closing_list, key); + pj_list_push_back(&key->ioqueue->free_list, key); pj_lock_release(key->ioqueue->lock); } } -#endif + +static struct pending_op *alloc_pending_op(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buf, pj_ssize_t len) +{ + pj_pool_t *pool; + struct pending_op *op; + + pool = pj_pool_create(key->ioqueue->pool->factory, "pending_op%p", 512, 512, NULL); + PJ_ASSERT_RETURN(pool, NULL); + op = PJ_POOL_ZALLOC_T(pool, struct pending_op); + op->pool = pool; + op->op_key = op_key; + op->pending_key.overlapped.wsabuf.buf = (CHAR *)buf; + op->pending_key.overlapped.wsabuf.len = (ULONG)len; + + pj_atomic_inc(key->ref_count); + pj_mutex_lock(key->mutex); + pj_list_push_back(&key->pending_list, op); + pj_mutex_unlock(key->mutex); + return op; +} + +static void release_pending_op(pj_ioqueue_key_t *key, struct pending_op *op) +{ + if (!key || !op) + return; + pj_mutex_lock(key->mutex); + pj_list_erase(op); + pj_mutex_unlock(key->mutex); + pj_pool_release(op->pool); + decrement_counter(key); +} + +static void cancel_all_pending_op(pj_ioqueue_key_t *key) +{ + if (!fnCancelIoEx) + fnCancelIoEx = (FnCancelIoEx)GetProcAddress(GetModuleHandle(PJ_T("Kernel32.dll")), "CancelIoEx"); + if (fnCancelIoEx) + fnCancelIoEx(key->hnd, NULL); +} /* * Poll the I/O Completion Port, execute callback, @@ -711,11 +720,15 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, pj_ioqueue_key_t *key; pj_ssize_t size_status = -1; BOOL rcGetQueued; + struct pending_op *xpending_op = NULL; + pj_ioqueue_op_key_t *op_key = NULL; /* Poll for completion status. */ rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransferred, &dwKey, (OVERLAPPED**)&pOv, dwTimeout); + if (!rcGetQueued && pOv) + PJ_PERROR(4, (THIS_FILE, PJ_STATUS_FROM_OS(GetLastError()), "GetQueuedCompletionStatus() error dwKey:%p, pOv:%p", (void *)dwKey, pOv)); /* The return value is: * - nonzero if event was dequeued. @@ -735,10 +748,25 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (p_key) *p_key = key; -#if PJ_IOQUEUE_HAS_SAFE_UNREG + switch(pOv->operation) + { + case PJ_IOQUEUE_OP_RECV: + case PJ_IOQUEUE_OP_RECV_FROM: + case PJ_IOQUEUE_OP_SEND: + case PJ_IOQUEUE_OP_SEND_TO: + case PJ_IOQUEUE_OP_ACCEPT: + xpending_op = (struct pending_op *)((char *)pOv - offsetof(struct pending_op, pending_key)); + op_key = xpending_op->op_key; + break; + default: + break; + } + /* We shouldn't call callbacks if key is quitting. */ - if (key->closing) + if (key->closing) { + release_pending_op(key, xpending_op); return PJ_TRUE; + } /* If concurrency is disabled, lock the key * (and save the lock status to local var since app may change @@ -755,6 +783,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (has_lock) { pj_mutex_unlock(key->mutex); } + release_pending_op(key, xpending_op); return PJ_TRUE; } @@ -762,9 +791,6 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, * deleted */ pj_atomic_inc(key->ref_count); -#else - PJ_UNUSED_ARG(has_lock); -#endif /* Carry out the callback */ switch (pOv->operation) { @@ -773,16 +799,14 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, case PJ_IOQUEUE_OP_RECV_FROM: pOv->operation = 0; if (key->cb.on_read_complete) - key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, - size_status); + key->cb.on_read_complete(key, op_key, size_status); break; case PJ_IOQUEUE_OP_WRITE: case PJ_IOQUEUE_OP_SEND: case PJ_IOQUEUE_OP_SEND_TO: pOv->operation = 0; if (key->cb.on_write_complete) - key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, - size_status); + key->cb.on_write_complete(key, op_key, size_status); break; #if PJ_HAS_TCP case PJ_IOQUEUE_OP_ACCEPT: @@ -802,9 +826,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, status = PJ_RETURN_OS_ERROR(dwError); } - key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv, - newsock, status); - + key->cb.on_accept_complete(key, op_key, newsock, status); } break; case PJ_IOQUEUE_OP_CONNECT: @@ -814,12 +836,11 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, break; } -#if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(key); if (has_lock) pj_mutex_unlock(key->mutex); -#endif + release_pending_op(key, xpending_op); return PJ_TRUE; } @@ -838,6 +859,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) PJ_ASSERT_RETURN(key, PJ_EINVAL); + /* Lock the key to make sure no callback is simultaneously modifying + * the key. We need to lock the key before ioqueue here to prevent + * deadlock. + */ + pj_ioqueue_lock_key(key); + + /* Best effort to avoid double key-unregistration */ + if (key->closing) { + pj_ioqueue_unlock_key(key); + return PJ_SUCCESS; + } + pj_ioqueue_unlock_key(key); + #if PJ_HAS_TCP if (key->connecting) { unsigned pos; @@ -858,7 +892,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) } #endif -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Mark key as closing before closing handle. */ key->closing = 1; @@ -871,10 +904,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) } else { has_lock = PJ_FALSE; } -#else - PJ_UNUSED_ARG(has_lock); -#endif - + + /* Cancel all penging I/O operations in order to free all pending memory */ + cancel_all_pending_op(key); + /* Close handle (the only way to disassociate handle from IOCP). * We also need to close handle to make sure that no further events * will come to the handle. @@ -901,7 +934,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) key->cb.on_read_complete = NULL; key->cb.on_write_complete = NULL; -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Even after handle is closed, I suspect that IOCP may still try to * do something with the handle, causing memory corruption when pool * debugging is enabled. @@ -927,42 +959,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) if (has_lock) pj_mutex_unlock(key->mutex); -#endif return PJ_SUCCESS; } -#if PJ_IOQUEUE_HAS_SAFE_UNREG -/* Scan the closing list, and put pending closing keys to free list. - * Must do this with ioqueue mutex held. - */ -static void scan_closing_keys(pj_ioqueue_t *ioqueue) -{ - if (!pj_list_empty(&ioqueue->closing_list)) { - pj_time_val now; - pj_ioqueue_key_t *key; - - pj_gettickcount(&now); - - /* Move closing keys to free list when they've finished the closing - * idle time. - */ - key = ioqueue->closing_list.next; - while (key != &ioqueue->closing_list) { - pj_ioqueue_key_t *next = key->next; - - pj_assert(key->closing != 0); - - if (PJ_TIME_VAL_GTE(now, key->free_time)) { - pj_list_erase(key); - pj_list_push_back(&ioqueue->free_list, key); - } - key = next; - } - } -} -#endif - /* * pj_ioqueue_poll() * @@ -993,17 +993,6 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) } #endif -#if PJ_IOQUEUE_HAS_SAFE_UNREG - /* Check the closing keys only when there's no activity and when there are - * pending closing keys. - */ - if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { - pj_lock_acquire(ioqueue->lock); - scan_closing_keys(ioqueue); - pj_lock_release(ioqueue->lock); - } -#endif - /* Return number of events. */ return event_count; } @@ -1028,15 +1017,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, DWORD bytesRead; DWORD dwFlags = 0; union operation_key *op_key_rec; + struct pending_op *op; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED; -#endif op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; @@ -1062,6 +1050,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, } } + op = alloc_pending_op(key, op_key, buffer, *length); + op_key_rec = &op->pending_key; + dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); /* @@ -1079,6 +1070,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) { *length = -1; + release_pending_op(key, op); return PJ_STATUS_FROM_OS(dwStatus); } } @@ -1104,15 +1096,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, DWORD bytesRead; DWORD dwFlags = 0; union operation_key *op_key_rec; + struct pending_op *op; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED; -#endif op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; @@ -1138,6 +1129,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, } } + op = alloc_pending_op(key, op_key, buffer, *length); + op_key_rec = &op->pending_key; + dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); /* @@ -1155,10 +1149,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) { *length = -1; + release_pending_op(key, op); return PJ_STATUS_FROM_OS(dwStatus); } } - + /* Pending operation has been scheduled. */ return PJ_EPENDING; } @@ -1195,15 +1190,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, DWORD bytesWritten; DWORD dwFlags; union operation_key *op_key_rec; + struct pending_op *op; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED; -#endif op_key_rec = (union operation_key*)op_key->internal__; @@ -1231,6 +1225,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, } } + op = alloc_pending_op(key, op_key, (void *)data, *length); + op_key_rec = &op->pending_key; + dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); /* @@ -1246,8 +1243,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, &op_key_rec->overlapped.overlapped, NULL); if (rc == SOCKET_ERROR) { DWORD dwStatus = WSAGetLastError(); - if (dwStatus!=WSA_IO_PENDING) + if (dwStatus!=WSA_IO_PENDING) { + release_pending_op(key, op); return PJ_STATUS_FROM_OS(dwStatus); + } } /* Asynchronous operation successfully submitted. */ @@ -1273,15 +1272,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, pj_status_t status; union operation_key *op_key_rec; SOCKET sock; + struct pending_op *op; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED; -#endif /* * See if there is a new connection immediately available. @@ -1323,12 +1321,15 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, * No connection is immediately available. * Must schedule an asynchronous operation. */ - op_key_rec = (union operation_key*)op_key->internal__; - + op = alloc_pending_op(key, op_key, NULL, 0); + op_key_rec = &op->pending_key; + status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &op_key_rec->accept.newsock); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + release_pending_op(key, op); return status; + } op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; op_key_rec->accept.addrlen = addrlen; @@ -1346,11 +1347,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, if (rc == TRUE) { ioqueue_on_accept_complete(key, &op_key_rec->accept); + release_pending_op(key, op); return PJ_SUCCESS; } else { DWORD dwStatus = WSAGetLastError(); - if (dwStatus!=WSA_IO_PENDING) + if (dwStatus!=WSA_IO_PENDING) { + release_pending_op(key, op); return PJ_STATUS_FROM_OS(dwStatus); + } } /* Asynchronous Accept() has been submitted. */ @@ -1374,11 +1378,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); -#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check key is not closing */ if (key->closing) return PJ_ECANCELLED; -#endif /* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { @@ -1493,20 +1495,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) { -#if PJ_IOQUEUE_HAS_SAFE_UNREG return pj_mutex_lock(key->mutex); -#else - PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); -#endif } PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) { -#if PJ_IOQUEUE_HAS_SAFE_UNREG return pj_mutex_unlock(key->mutex); -#else - PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); -#endif } PJ_DEF(pj_oshandle_t) pj_ioqueue_get_os_handle( pj_ioqueue_t *ioqueue )