Skip to content

Commit

Permalink
HTTP client request cancellation
Browse files Browse the repository at this point in the history
This patch changes `aleph.http/request` so that setting the response deferred to an error status
will terminate an in-flight request. This allows e.g. for `d/timeout!` to be used without
potentially leaking connections.

For convenient explicit cancellation, we provide `aleph.http/cancel-request!`. It sets the given
response deferred to error with an instance of the new `aleph.utils.RequestCancellationException`.

Closes #712.
  • Loading branch information
DerGuteMoritz committed Mar 12, 2024
1 parent f9d70e8 commit 7ecb588
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 93 deletions.
23 changes: 23 additions & 0 deletions src-java/aleph/utils/RequestCancellationException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package aleph.utils;

import java.util.concurrent.CancellationException;

public class RequestCancellationException extends CancellationException {

public RequestCancellationException() { }

public RequestCancellationException(String message) {
super(message);
}

public RequestCancellationException(Throwable cause) {
super(cause.getMessage());
initCause(cause);
}

public RequestCancellationException(String message, Throwable cause) {
super(message);
initCause(cause);
}

}
199 changes: 109 additions & 90 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ConnectionTimeoutException
PoolTimeoutException
ReadTimeoutException
RequestCancellationException
RequestTimeoutException)
(io.aleph.dirigiste Pools)
(io.netty.handler.codec Headers)
Expand Down Expand Up @@ -336,6 +337,9 @@
by [clj-http](https://github.com/dakrone/clj-http), and returns a deferred representing
the HTTP response. Also allows for a custom `pool` or `middleware` to be defined.
Putting the returned deferred into an error state will cancel the underlying request if it is
still in flight.
Param key | Description
-------------------- | -----------------------------------------------------------------------------------------------------------------------------------------------------------------
`connection-timeout` | timeout in milliseconds for the connection to become established
Expand All @@ -358,96 +362,111 @@
middleware identity
connection-timeout 6e4} ;; 60 seconds
:as req}]

(executor/with-executor response-executor
((middleware
(fn [req]
(let [k (client/req->domain req)
start (System/currentTimeMillis)]

;; acquire a connection
(-> (flow/acquire pool k)
(maybe-timeout! pool-timeout)

;; pool timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (PoolTimeoutException. e))))

(d/chain'
(fn [conn]

;; get the wrapper for the connection, which may or may not be realized yet
(-> (first conn)
(maybe-timeout! connection-timeout)

;; connection timeout triggered, dispose of the connetion
(d/catch' TimeoutException
(fn [^Throwable e]
(log/error e "Timed out waiting for connection to be established")
(flow/dispose pool k conn)
(d/error-deferred (ConnectionTimeoutException. e))))

;; connection failed, bail out
(d/catch'
(fn [e]
(log/error e "Connection failure")
(flow/dispose pool k conn)
(d/error-deferred e)))

;; actually make the request now
(d/chain'
(fn [conn']
(when-not (nil? conn')
(let [end (System/currentTimeMillis)]
(-> (conn' req)
(maybe-timeout! request-timeout)

;; request timeout triggered, dispose of the connection
(d/catch' TimeoutException
(fn [^Throwable e]
(flow/dispose pool k conn)
(d/error-deferred (RequestTimeoutException. e))))

;; request failed, dispose of the connection
(d/catch'
(fn [e]
(log/trace "Request failed. Disposing of connection...")
(flow/dispose pool k conn)
(d/error-deferred e)))

;; clean up the connection
(d/chain'
(fn cleanup-conn [rsp]

;; either destroy/dispose of the conn, or release it back for reuse
(-> (:aleph/destroy-conn? rsp)
(maybe-timeout! read-timeout)

(d/catch' TimeoutException
(fn [^Throwable e]
(log/trace "Request timed out. Disposing of connection...")
(flow/dispose pool k conn)
(d/error-deferred (ReadTimeoutException. e))))

(d/chain'
(fn [early?]
(if (or early?
(not (:aleph/keep-alive? rsp))
(<= 400 (:status rsp)))
(do
(log/trace "Connection finished. Disposing...")
(flow/dispose pool k conn))
(flow/release pool k conn)))))
(-> rsp
(dissoc :aleph/destroy-conn?)
(assoc :connection-time (- end start)))))))))

(fn handle-response [rsp]
(->> rsp
(middleware/handle-cookies req)
(middleware/handle-redirects request req)))))))))))
req))))
(let [dispose-conn! (atom (fn []))
result (d/deferred response-executor)
response (executor/with-executor response-executor
((middleware
(fn [req]
(let [k (client/req->domain req)
start (System/currentTimeMillis)]

;; acquire a connection
(-> (flow/acquire pool k)
(maybe-timeout! pool-timeout)

;; pool timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (PoolTimeoutException. e))))

(d/chain'
(fn [conn]
;; NOTE: All error handlers below delegate disposal of the
;; connection to the error handler on `result` which uses this
;; function.
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))

(if (realized? result)
;; to account for race condition between setting `dispose-conn!`
;; and putting `result` into error state for cancellation
(@dispose-conn!)
;; get the wrapper for the connection, which may or may not be realized yet
(-> (first conn)
(maybe-timeout! connection-timeout)

;; connection timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(log/error e "Timed out waiting for connection to be established")
(d/error-deferred (ConnectionTimeoutException. e))))

;; connection failed, bail out
(d/catch'
(fn [e]
(log/error e "Connection failure")
(d/error-deferred e)))

;; actually make the request now
(d/chain'
(fn [conn']
(when-not (nil? conn')
(let [end (System/currentTimeMillis)]
(-> (conn' req)
(maybe-timeout! request-timeout)

;; request timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (RequestTimeoutException. e))))

;; clean up the connection
(d/chain'
(fn cleanup-conn [rsp]

;; either destroy/dispose of the conn, or release it back for reuse
(-> (:aleph/destroy-conn? rsp)
(maybe-timeout! read-timeout)

;; read timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(log/trace "Request timed out.")
(d/error-deferred (ReadTimeoutException. e))))

(d/chain'
(fn [early?]
(if (or early?
(not (:aleph/keep-alive? rsp))
(<= 400 (:status rsp)))
(do
(log/trace "Connection finished. Disposing...")
(flow/dispose pool k conn))
(flow/release pool k conn)))))
(-> rsp
(dissoc :aleph/destroy-conn?)
(assoc :connection-time (- end start)))))))))

(fn handle-response [rsp]
(->> rsp
(middleware/handle-cookies req)
(middleware/handle-redirects request req))))))))))))
req))]
(d/connect response result)
(d/catch' result
(fn [e]
(log/trace e "Request failed. Disposing of connection...")
(@dispose-conn!)
(d/error-deferred e)))
result)))

(defn cancel-request!
"Accepts a response deferred as returned by `request` and cancels the underlying request if it is
still in flight.
This is done by putting the deferred into error state with an
`aleph.utils.RequestCancellationException` instance as its value."
[r]
(d/error! r (RequestCancellationException. "Request cancelled")))

(defn- req
([method url]
Expand Down
25 changes: 22 additions & 3 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(:import
(aleph.utils
ConnectionTimeoutException
RequestCancellationException
RequestTimeoutException)
(clojure.lang
ExceptionInfo)
Expand Down Expand Up @@ -1073,11 +1074,14 @@
(Thread/sleep 5)
(s/put! s (encode-http-object response))))

(defmacro with-tcp-response [response & body]
`(with-server (tcp/start-server (tcp-handler ~response) {:port port
:shutdown-timeout 0})
(defmacro with-tcp-server [handler & body]
`(with-server (tcp/start-server ~handler {:port port
:shutdown-timeout 0})
~@body))

(defmacro with-tcp-response [response & body]
`(with-tcp-server (tcp-handler ~response) ~@body))

(defmacro with-tcp-request-handler [handler options request & body]
`(with-server (http/start-server ~handler (merge http-server-options ~options))
(let [conn# @(tcp/client {:host "localhost" :port port})
Expand Down Expand Up @@ -1439,6 +1443,21 @@
(is (instance? IllegalArgumentException result))
(is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result))))))

(deftest test-in-flight-request-cancellation
(let [conn-established (promise)
conn-closed (promise)]
(with-tcp-server (fn [s _]
(deliver conn-established true)
;; Required for the client close to be detected
(s/consume identity s)
(s/on-closed s (fn []
(deliver conn-closed true))))
(let [rsp (http-get "/")]
(is (= true (deref conn-established 1000 :timeout)))
(http/cancel-request! rsp)
(is (= true (deref conn-closed 1000 :timeout)))
(is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))

(deftest ^:leak test-leak-in-raw-stream-handler
;; NOTE: Expecting 2 leaks because `with-raw-handler` will run its body for both http1 and
;; http2. It would be nicer to put this assertion into the body but the http1 server seems to
Expand Down

0 comments on commit 7ecb588

Please sign in to comment.