From 7ecb58818be537eaed259de918ee3dd47ce8a3e5 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Sun, 11 Feb 2024 18:36:25 +0100 Subject: [PATCH] HTTP client request cancellation This patch changes `aleph.http/request` so that setting the response deferred to an error status will terminate an in-flight request. This allows e.g. for `d/timeout!` to be used without potentially leaking connections. For convenient explicit cancellation, we provide `aleph.http/cancel-request!`. It sets the given response deferred to error with an instance of the new `aleph.utils.RequestCancellationException`. Closes #712. --- .../utils/RequestCancellationException.java | 23 ++ src/aleph/http.clj | 199 ++++++++++-------- test/aleph/http_test.clj | 25 ++- 3 files changed, 154 insertions(+), 93 deletions(-) create mode 100644 src-java/aleph/utils/RequestCancellationException.java diff --git a/src-java/aleph/utils/RequestCancellationException.java b/src-java/aleph/utils/RequestCancellationException.java new file mode 100644 index 00000000..948c8788 --- /dev/null +++ b/src-java/aleph/utils/RequestCancellationException.java @@ -0,0 +1,23 @@ +package aleph.utils; + +import java.util.concurrent.CancellationException; + +public class RequestCancellationException extends CancellationException { + + public RequestCancellationException() { } + + public RequestCancellationException(String message) { + super(message); + } + + public RequestCancellationException(Throwable cause) { + super(cause.getMessage()); + initCause(cause); + } + + public RequestCancellationException(String message, Throwable cause) { + super(message); + initCause(cause); + } + +} diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 630647cb..dbdbe7dd 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -20,6 +20,7 @@ ConnectionTimeoutException PoolTimeoutException ReadTimeoutException + RequestCancellationException RequestTimeoutException) (io.aleph.dirigiste Pools) (io.netty.handler.codec Headers) @@ -336,6 +337,9 @@ by [clj-http](https://github.com/dakrone/clj-http), and returns a deferred representing the HTTP response. Also allows for a custom `pool` or `middleware` to be defined. + Putting the returned deferred into an error state will cancel the underlying request if it is + still in flight. + Param key | Description -------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- `connection-timeout` | timeout in milliseconds for the connection to become established @@ -358,96 +362,111 @@ middleware identity connection-timeout 6e4} ;; 60 seconds :as req}] - - (executor/with-executor response-executor - ((middleware - (fn [req] - (let [k (client/req->domain req) - start (System/currentTimeMillis)] - - ;; acquire a connection - (-> (flow/acquire pool k) - (maybe-timeout! pool-timeout) - - ;; pool timeout triggered - (d/catch' TimeoutException - (fn [^Throwable e] - (d/error-deferred (PoolTimeoutException. e)))) - - (d/chain' - (fn [conn] - - ;; get the wrapper for the connection, which may or may not be realized yet - (-> (first conn) - (maybe-timeout! connection-timeout) - - ;; connection timeout triggered, dispose of the connetion - (d/catch' TimeoutException - (fn [^Throwable e] - (log/error e "Timed out waiting for connection to be established") - (flow/dispose pool k conn) - (d/error-deferred (ConnectionTimeoutException. e)))) - - ;; connection failed, bail out - (d/catch' - (fn [e] - (log/error e "Connection failure") - (flow/dispose pool k conn) - (d/error-deferred e))) - - ;; actually make the request now - (d/chain' - (fn [conn'] - (when-not (nil? conn') - (let [end (System/currentTimeMillis)] - (-> (conn' req) - (maybe-timeout! request-timeout) - - ;; request timeout triggered, dispose of the connection - (d/catch' TimeoutException - (fn [^Throwable e] - (flow/dispose pool k conn) - (d/error-deferred (RequestTimeoutException. e)))) - - ;; request failed, dispose of the connection - (d/catch' - (fn [e] - (log/trace "Request failed. Disposing of connection...") - (flow/dispose pool k conn) - (d/error-deferred e))) - - ;; clean up the connection - (d/chain' - (fn cleanup-conn [rsp] - - ;; either destroy/dispose of the conn, or release it back for reuse - (-> (:aleph/destroy-conn? rsp) - (maybe-timeout! read-timeout) - - (d/catch' TimeoutException - (fn [^Throwable e] - (log/trace "Request timed out. Disposing of connection...") - (flow/dispose pool k conn) - (d/error-deferred (ReadTimeoutException. e)))) - - (d/chain' - (fn [early?] - (if (or early? - (not (:aleph/keep-alive? rsp)) - (<= 400 (:status rsp))) - (do - (log/trace "Connection finished. Disposing...") - (flow/dispose pool k conn)) - (flow/release pool k conn))))) - (-> rsp - (dissoc :aleph/destroy-conn?) - (assoc :connection-time (- end start))))))))) - - (fn handle-response [rsp] - (->> rsp - (middleware/handle-cookies req) - (middleware/handle-redirects request req))))))))))) - req)))) + (let [dispose-conn! (atom (fn [])) + result (d/deferred response-executor) + response (executor/with-executor response-executor + ((middleware + (fn [req] + (let [k (client/req->domain req) + start (System/currentTimeMillis)] + + ;; acquire a connection + (-> (flow/acquire pool k) + (maybe-timeout! pool-timeout) + + ;; pool timeout triggered + (d/catch' TimeoutException + (fn [^Throwable e] + (d/error-deferred (PoolTimeoutException. e)))) + + (d/chain' + (fn [conn] + ;; NOTE: All error handlers below delegate disposal of the + ;; connection to the error handler on `result` which uses this + ;; function. + (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) + + (if (realized? result) + ;; to account for race condition between setting `dispose-conn!` + ;; and putting `result` into error state for cancellation + (@dispose-conn!) + ;; get the wrapper for the connection, which may or may not be realized yet + (-> (first conn) + (maybe-timeout! connection-timeout) + + ;; connection timeout triggered + (d/catch' TimeoutException + (fn [^Throwable e] + (log/error e "Timed out waiting for connection to be established") + (d/error-deferred (ConnectionTimeoutException. e)))) + + ;; connection failed, bail out + (d/catch' + (fn [e] + (log/error e "Connection failure") + (d/error-deferred e))) + + ;; actually make the request now + (d/chain' + (fn [conn'] + (when-not (nil? conn') + (let [end (System/currentTimeMillis)] + (-> (conn' req) + (maybe-timeout! request-timeout) + + ;; request timeout triggered + (d/catch' TimeoutException + (fn [^Throwable e] + (d/error-deferred (RequestTimeoutException. e)))) + + ;; clean up the connection + (d/chain' + (fn cleanup-conn [rsp] + + ;; either destroy/dispose of the conn, or release it back for reuse + (-> (:aleph/destroy-conn? rsp) + (maybe-timeout! read-timeout) + + ;; read timeout triggered + (d/catch' TimeoutException + (fn [^Throwable e] + (log/trace "Request timed out.") + (d/error-deferred (ReadTimeoutException. e)))) + + (d/chain' + (fn [early?] + (if (or early? + (not (:aleph/keep-alive? rsp)) + (<= 400 (:status rsp))) + (do + (log/trace "Connection finished. Disposing...") + (flow/dispose pool k conn)) + (flow/release pool k conn))))) + (-> rsp + (dissoc :aleph/destroy-conn?) + (assoc :connection-time (- end start))))))))) + + (fn handle-response [rsp] + (->> rsp + (middleware/handle-cookies req) + (middleware/handle-redirects request req)))))))))))) + req))] + (d/connect response result) + (d/catch' result + (fn [e] + (log/trace e "Request failed. Disposing of connection...") + (@dispose-conn!) + (d/error-deferred e))) + result))) + +(defn cancel-request! + "Accepts a response deferred as returned by `request` and cancels the underlying request if it is + still in flight. + + This is done by putting the deferred into error state with an + `aleph.utils.RequestCancellationException` instance as its value." + [r] + (d/error! r (RequestCancellationException. "Request cancelled"))) (defn- req ([method url] diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index 0e251112..4bf304ce 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -18,6 +18,7 @@ (:import (aleph.utils ConnectionTimeoutException + RequestCancellationException RequestTimeoutException) (clojure.lang ExceptionInfo) @@ -1073,11 +1074,14 @@ (Thread/sleep 5) (s/put! s (encode-http-object response)))) -(defmacro with-tcp-response [response & body] - `(with-server (tcp/start-server (tcp-handler ~response) {:port port - :shutdown-timeout 0}) +(defmacro with-tcp-server [handler & body] + `(with-server (tcp/start-server ~handler {:port port + :shutdown-timeout 0}) ~@body)) +(defmacro with-tcp-response [response & body] + `(with-tcp-server (tcp-handler ~response) ~@body)) + (defmacro with-tcp-request-handler [handler options request & body] `(with-server (http/start-server ~handler (merge http-server-options ~options)) (let [conn# @(tcp/client {:host "localhost" :port port}) @@ -1439,6 +1443,21 @@ (is (instance? IllegalArgumentException result)) (is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result)))))) +(deftest test-in-flight-request-cancellation + (let [conn-established (promise) + conn-closed (promise)] + (with-tcp-server (fn [s _] + (deliver conn-established true) + ;; Required for the client close to be detected + (s/consume identity s) + (s/on-closed s (fn [] + (deliver conn-closed true)))) + (let [rsp (http-get "/")] + (is (= true (deref conn-established 1000 :timeout))) + (http/cancel-request! rsp) + (is (= true (deref conn-closed 1000 :timeout))) + (is (thrown? RequestCancellationException (deref rsp 1000 :timeout))))))) + (deftest ^:leak test-leak-in-raw-stream-handler ;; NOTE: Expecting 2 leaks because `with-raw-handler` will run its body for both http1 and ;; http2. It would be nicer to put this assertion into the body but the http1 server seems to