(PDB-2640) Close gzip-wrapped body input stream on request completion
In the previous commit, if gzip compression was applied to a request input stream, the stream was not automatically closed when the request was complete. The Apache HTTP async library, however, closes an input stream in cases where gzip compression is not being added. In this commit, the original request input stream is closed immediately after attempts to copy the data from it to the downstream gzipped input stream have been completed, for compatibility with the pre-existing Apache HTTP async library behavior.
This commit is contained in:
parent
689db7cb87
commit
cb5de4555d
2 changed files with 48 additions and 24 deletions
|
@ -386,8 +386,12 @@ public class JavaClient {
|
||||||
if (requestBody instanceof InputStream) {
|
if (requestBody instanceof InputStream) {
|
||||||
InputStream requestInputStream = (InputStream) requestBody;
|
InputStream requestInputStream = (InputStream) requestBody;
|
||||||
byte[] byteBuffer = new byte[GZIP_BUFFER_SIZE];
|
byte[] byteBuffer = new byte[GZIP_BUFFER_SIZE];
|
||||||
|
try {
|
||||||
IOUtils.copyLarge(requestInputStream,
|
IOUtils.copyLarge(requestInputStream,
|
||||||
gzipOutputStream, byteBuffer);
|
gzipOutputStream, byteBuffer);
|
||||||
|
} finally {
|
||||||
|
requestInputStream.close();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throwUnsupportedBodyException(requestBody);
|
throwUnsupportedBodyException(requestBody);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
SimpleRequestOptions
|
SimpleRequestOptions
|
||||||
ResponseBodyType
|
ResponseBodyType
|
||||||
CompressType)
|
CompressType)
|
||||||
(java.io ByteArrayInputStream)
|
(java.io ByteArrayInputStream FilterInputStream)
|
||||||
(java.net URI)
|
(java.net URI)
|
||||||
(java.util.zip GZIPInputStream))
|
(java.util.zip GZIPInputStream))
|
||||||
(:require [clojure.test :refer :all]
|
(:require [clojure.test :refer :all]
|
||||||
|
@ -31,10 +31,14 @@
|
||||||
(apply str (repeat 4000 "and<6E>i<EFBFBD>said<69>hey<65>yeah<61>yeah<61>whats<74>going<6E>on")))
|
(apply str (repeat 4000 "and<6E>i<EFBFBD>said<69>hey<65>yeah<61>yeah<61>whats<74>going<6E>on")))
|
||||||
|
|
||||||
(defn string->byte-array-input-stream
|
(defn string->byte-array-input-stream
|
||||||
[source]
|
[source is-closed-atom]
|
||||||
(-> source
|
(let [bis (-> source
|
||||||
(.getBytes)
|
(.getBytes)
|
||||||
(ByteArrayInputStream.)))
|
(ByteArrayInputStream.))]
|
||||||
|
(proxy [FilterInputStream] [bis]
|
||||||
|
(close []
|
||||||
|
(reset! is-closed-atom true)
|
||||||
|
(proxy-super close)))))
|
||||||
|
|
||||||
(defn post-gzip-clj-request
|
(defn post-gzip-clj-request
|
||||||
[port body]
|
[port body]
|
||||||
|
@ -71,17 +75,23 @@
|
||||||
(is (= "gzip" (:request-content-encoding response)))
|
(is (= "gzip" (:request-content-encoding response)))
|
||||||
(is (= big-request-body (:request-body-decompressed response)))))
|
(is (= big-request-body (:request-body-decompressed response)))))
|
||||||
(testing "short inputstream body is gzipped in request"
|
(testing "short inputstream body is gzipped in request"
|
||||||
(let [response (post-gzip-clj-request
|
(let [is-closed (atom false)
|
||||||
|
response (post-gzip-clj-request
|
||||||
port
|
port
|
||||||
(string->byte-array-input-stream short-request-body))]
|
(string->byte-array-input-stream short-request-body
|
||||||
|
is-closed))]
|
||||||
(is (= "gzip" (:request-content-encoding response)))
|
(is (= "gzip" (:request-content-encoding response)))
|
||||||
(is (= short-request-body (:request-body-decompressed response)))))
|
(is (= short-request-body (:request-body-decompressed response)))
|
||||||
|
(is @is-closed "input stream was not closed after request")))
|
||||||
(testing "big inputstream body is gzipped in request"
|
(testing "big inputstream body is gzipped in request"
|
||||||
(let [response (post-gzip-clj-request
|
(let [is-closed (atom false)
|
||||||
|
response (post-gzip-clj-request
|
||||||
port
|
port
|
||||||
(string->byte-array-input-stream big-request-body))]
|
(string->byte-array-input-stream big-request-body
|
||||||
|
is-closed))]
|
||||||
(is (= "gzip" (:request-content-encoding response)))
|
(is (= "gzip" (:request-content-encoding response)))
|
||||||
(is (= big-request-body (:request-body-decompressed response))))))))
|
(is (= big-request-body (:request-body-decompressed response)))
|
||||||
|
(is @is-closed "input stream was not closed after request"))))))
|
||||||
|
|
||||||
(deftest java-sync-client-gzip-requests
|
(deftest java-sync-client-gzip-requests
|
||||||
(testing "for java sync client"
|
(testing "for java sync client"
|
||||||
|
@ -97,22 +107,32 @@
|
||||||
(is (= "gzip" (:request-content-encoding response)))
|
(is (= "gzip" (:request-content-encoding response)))
|
||||||
(is (= big-request-body (:request-body-decompressed response)))))
|
(is (= big-request-body (:request-body-decompressed response)))))
|
||||||
(testing "short inputstream body is gzipped in request"
|
(testing "short inputstream body is gzipped in request"
|
||||||
(let [response (post-gzip-java-request
|
(let [is-closed (atom false)
|
||||||
|
response (post-gzip-java-request
|
||||||
port
|
port
|
||||||
(string->byte-array-input-stream short-request-body))]
|
(string->byte-array-input-stream short-request-body
|
||||||
|
is-closed))]
|
||||||
(is (= "gzip" (:request-content-encoding response)))
|
(is (= "gzip" (:request-content-encoding response)))
|
||||||
(is (= short-request-body (:request-body-decompressed response)))))
|
(is (= short-request-body (:request-body-decompressed response)))
|
||||||
|
(is @is-closed "input stream was not closed after request")))
|
||||||
(testing "big inputstream body is gzipped in request"
|
(testing "big inputstream body is gzipped in request"
|
||||||
(let [response (post-gzip-java-request
|
(let [is-closed (atom false)
|
||||||
|
response (post-gzip-java-request
|
||||||
port
|
port
|
||||||
(string->byte-array-input-stream big-request-body))]
|
(string->byte-array-input-stream big-request-body
|
||||||
|
is-closed))]
|
||||||
(is (= "gzip" (:request-content-encoding response)))
|
(is (= "gzip" (:request-content-encoding response)))
|
||||||
(is (= big-request-body (:request-body-decompressed response))))))))
|
(is (= big-request-body (:request-body-decompressed response)))
|
||||||
|
(is @is-closed "input stream was not closed after request"))))))
|
||||||
|
|
||||||
(deftest connect-exception-during-gzip-request-returns-failure
|
(deftest connect-exception-during-gzip-request-returns-failure
|
||||||
(testing "connection exception during gzip request returns failure"
|
(testing "connection exception during gzip request returns failure"
|
||||||
|
(let [is-closed (atom false)]
|
||||||
(is (connect-exception-thrown?
|
(is (connect-exception-thrown?
|
||||||
(http-client/post "http://localhost:65535"
|
(http-client/post "http://localhost:65535"
|
||||||
{:body short-request-body
|
{:body (string->byte-array-input-stream
|
||||||
|
short-request-body
|
||||||
|
is-closed)
|
||||||
:compress-request-body :gzip
|
:compress-request-body :gzip
|
||||||
:as :text})))))
|
:as :text})))
|
||||||
|
(is @is-closed "input stream was not closed after request"))))
|
||||||
|
|
Loading…
Reference in a new issue