From 689db7cb874c6fd2e1b82e5b284c2a3c7ed0f4be Mon Sep 17 00:00:00 2001 From: Jeremy Barlow Date: Wed, 18 Jan 2017 17:55:10 -0800 Subject: [PATCH 1/2] (PDB-2640) Add option for gzip compressing a request's body This commit adds a new Clojure HTTP request option, `:compress-request-body`, and corresponding Java option which can be used to have gzip compression applied to the request's body before it is sent along to the server. --- doc/clojure-client.md | 5 + project.clj | 3 +- src/clj/puppetlabs/http/client/async.clj | 10 +- src/clj/puppetlabs/http/client/common.clj | 6 + src/clj/puppetlabs/http/client/sync.clj | 4 +- .../puppetlabs/http/client/CompressType.java | 6 + .../http/client/RequestOptions.java | 10 ++ .../http/client/SimpleRequestOptions.java | 12 +- src/java/com/puppetlabs/http/client/Sync.java | 7 +- .../client/impl/CoercedRequestOptions.java | 15 +- .../http/client/impl/JavaClient.java | 165 ++++++++++++++++-- .../http/client/gzip_request_test.clj | 118 +++++++++++++ 12 files changed, 340 insertions(+), 21 deletions(-) create mode 100644 src/java/com/puppetlabs/http/client/CompressType.java create mode 100644 test/puppetlabs/http/client/gzip_request_test.clj diff --git a/doc/clojure-client.md b/doc/clojure-client.md index d700c2e..d95e9c2 100644 --- a/doc/clojure-client.md +++ b/doc/clojure-client.md @@ -92,6 +92,11 @@ which is a map containing options for the HTTP request. These options are as fol with a value of `puppetlabs.core.i18n/user-locale` will be added to the request. * `:body`: optional; may be a String or any type supported by clojure's reader +* `:compress-request-body`: optional; used to control any additional compression + which the client can apply to the request body before it is sent to the target + server. Defaults to `:none`. Supported values are: + * `:gzip` which will compress the request body as gzip + * `:none` which will not apply any additional compression to the request body * `:decompress-body`: optional; if `true`, an 'accept-encoding' header with a value of 'gzip, deflate' will be added to the request, and the response will be automatically decompressed if it contains a recognized 'content-encoding' diff --git a/project.clj b/project.clj index c261579..f60f1a9 100644 --- a/project.clj +++ b/project.clj @@ -32,7 +32,8 @@ ;; depend on this source jar using a :classifier in their :dependencies. :classifiers [["sources" :sources-jar]] - :profiles {:dev {:dependencies [[puppetlabs/kitchensink nil :classifier "test"] + :profiles {:dev {:dependencies [[cheshire] + [puppetlabs/kitchensink nil :classifier "test"] [puppetlabs/trapperkeeper] [puppetlabs/trapperkeeper nil :classifier "test"] [puppetlabs/trapperkeeper-webserver-jetty9] diff --git a/src/clj/puppetlabs/http/client/async.clj b/src/clj/puppetlabs/http/client/async.clj index c685ab1..379e100 100644 --- a/src/clj/puppetlabs/http/client/async.clj +++ b/src/clj/puppetlabs/http/client/async.clj @@ -10,7 +10,7 @@ ;; these methods. (ns puppetlabs.http.client.async - (:import (com.puppetlabs.http.client ClientOptions RequestOptions ResponseBodyType HttpMethod) + (:import (com.puppetlabs.http.client ClientOptions RequestOptions ResponseBodyType HttpMethod CompressType) (com.puppetlabs.http.client.impl JavaClient ResponseDeliveryDelegate) (org.apache.http.client.utils URIBuilder) (org.apache.http.nio.client HttpAsyncClient) @@ -119,6 +119,12 @@ :text ResponseBodyType/TEXT ResponseBodyType/STREAM)) +(schema/defn clojure-compress-request-body-type->java :- CompressType + [opts :- common/RequestOptions] + (case (:compress-request-body opts) + :gzip CompressType/GZIP + CompressType/NONE)) + (defn parse-metric-id [opts] (when-let [metric-id (:metric-id opts)] @@ -131,6 +137,7 @@ (.setAs (clojure-response-body-type->java opts)) (.setBody (:body opts)) (.setDecompressBody (clojure.core/get opts :decompress-body true)) + (.setCompressRequestBody (clojure-compress-request-body-type->java opts)) (.setHeaders (:headers opts)) (.setMetricId (parse-metric-id opts)))) @@ -177,6 +184,7 @@ (let [result (promise) defaults {:body nil :decompress-body true + :compress-request-body :none :as :stream} ^Locale locale (i18n/user-locale) ;; lower-case the header names so that we don't end up with diff --git a/src/clj/puppetlabs/http/client/common.clj b/src/clj/puppetlabs/http/client/common.clj index 8e66837..258e12c 100644 --- a/src/clj/puppetlabs/http/client/common.clj +++ b/src/clj/puppetlabs/http/client/common.clj @@ -42,6 +42,9 @@ (def BodyType (schema/enum :text :stream :unbuffered-stream)) +(def CompressType + (schema/enum :gzip :none)) + (def MetricId [(schema/either schema/Str schema/Keyword)]) (def RawUserRequestClientOptions @@ -53,6 +56,7 @@ (ok :headers) Headers (ok :body) Body (ok :decompress-body) schema/Bool + (ok :compress-request-body) CompressType (ok :as) BodyType (ok :query-params) {schema/Str schema/Str} (ok :metric-id) [schema/Str] @@ -76,6 +80,7 @@ (ok :headers) Headers (ok :body) Body (ok :decompress-body) schema/Bool + (ok :compress-request-body) CompressType (ok :as) BodyType (ok :query-params) {schema/Str schema/Str} (ok :metric-id) MetricId}) @@ -90,6 +95,7 @@ :headers Headers :body Body :decompress-body schema/Bool + :compress-request-body CompressType :as BodyType (ok :query-params) {schema/Str schema/Str} (ok :metric-id) MetricId}) diff --git a/src/clj/puppetlabs/http/client/sync.clj b/src/clj/puppetlabs/http/client/sync.clj index 3c88c1e..d127915 100644 --- a/src/clj/puppetlabs/http/client/sync.clj +++ b/src/clj/puppetlabs/http/client/sync.clj @@ -21,7 +21,9 @@ (schema/defn extract-request-opts :- common/RawUserRequestOptions [opts :- common/RawUserRequestClientOptions] - (select-keys opts [:url :method :headers :body :decompress-body :as :query-params])) + (select-keys opts [:url :method :headers :body + :decompress-body :compress-request-body + :as :query-params])) (defn request-with-client ([req client] diff --git a/src/java/com/puppetlabs/http/client/CompressType.java b/src/java/com/puppetlabs/http/client/CompressType.java new file mode 100644 index 0000000..dea1174 --- /dev/null +++ b/src/java/com/puppetlabs/http/client/CompressType.java @@ -0,0 +1,6 @@ +package com.puppetlabs.http.client; + +public enum CompressType { + GZIP, + NONE +} diff --git a/src/java/com/puppetlabs/http/client/RequestOptions.java b/src/java/com/puppetlabs/http/client/RequestOptions.java index 18adc7e..48150f7 100644 --- a/src/java/com/puppetlabs/http/client/RequestOptions.java +++ b/src/java/com/puppetlabs/http/client/RequestOptions.java @@ -12,6 +12,7 @@ public class RequestOptions { private URI uri; private Map headers; private Object body; + private CompressType requestBodyCompression = CompressType.NONE; private boolean decompressBody = true; private ResponseBodyType as = ResponseBodyType.STREAM; private String[] metricId; @@ -84,6 +85,15 @@ public class RequestOptions { return this; } + public CompressType getCompressRequestBody() { + return requestBodyCompression; + } + public RequestOptions setCompressRequestBody( + CompressType requestBodyCompression) { + this.requestBodyCompression = requestBodyCompression; + return this; + } + public ResponseBodyType getAs() { return as; } diff --git a/src/java/com/puppetlabs/http/client/SimpleRequestOptions.java b/src/java/com/puppetlabs/http/client/SimpleRequestOptions.java index d81c792..f16b41a 100644 --- a/src/java/com/puppetlabs/http/client/SimpleRequestOptions.java +++ b/src/java/com/puppetlabs/http/client/SimpleRequestOptions.java @@ -24,6 +24,7 @@ public class SimpleRequestOptions { private String[] sslCipherSuites; private boolean insecure = false; private Object body; + private CompressType requestBodyCompression = CompressType.NONE; private boolean decompressBody = true; private ResponseBodyType as = ResponseBodyType.STREAM; private boolean forceRedirects = false; @@ -137,6 +138,15 @@ public class SimpleRequestOptions { return this; } + public CompressType getCompressRequestBody() { + return requestBodyCompression; + } + public SimpleRequestOptions setRequestBodyCompression( + CompressType requestBodyCompression) { + this.requestBodyCompression = requestBodyCompression; + return this; + } + public ResponseBodyType getAs() { return as; } @@ -176,4 +186,4 @@ public class SimpleRequestOptions { this.socketTimeoutMilliseconds = socketTimeoutMilliseconds; return this; } -} \ No newline at end of file +} diff --git a/src/java/com/puppetlabs/http/client/Sync.java b/src/java/com/puppetlabs/http/client/Sync.java index 9c4cc3e..7d35f64 100644 --- a/src/java/com/puppetlabs/http/client/Sync.java +++ b/src/java/com/puppetlabs/http/client/Sync.java @@ -31,7 +31,12 @@ public class Sync { Object body = simpleOptions.getBody(); boolean decompressBody = simpleOptions.getDecompressBody(); ResponseBodyType as = simpleOptions.getAs(); - return new RequestOptions(uri, headers, body, decompressBody, as); + CompressType requestBodyDecompression = + simpleOptions.getCompressRequestBody(); + RequestOptions requestOptions = new RequestOptions( + uri, headers, body, decompressBody, as); + requestOptions.setCompressRequestBody(requestBodyDecompression); + return requestOptions; } private static ClientOptions extractClientOptions(SimpleRequestOptions simpleOptions) { diff --git a/src/java/com/puppetlabs/http/client/impl/CoercedRequestOptions.java b/src/java/com/puppetlabs/http/client/impl/CoercedRequestOptions.java index 8573d88..ca22d1e 100644 --- a/src/java/com/puppetlabs/http/client/impl/CoercedRequestOptions.java +++ b/src/java/com/puppetlabs/http/client/impl/CoercedRequestOptions.java @@ -5,21 +5,28 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; import java.net.URI; +import java.util.zip.GZIPOutputStream; -public class CoercedRequestOptions { +class CoercedRequestOptions { private final URI uri; private final HttpMethod method; private final Header[] headers; private final HttpEntity body; + private final GZIPOutputStream gzipOutputStream; + private final byte[] bytesToGzip; public CoercedRequestOptions(URI uri, HttpMethod method, Header[] headers, - HttpEntity body) { + HttpEntity body, + GZIPOutputStream gzipOutputStream, + byte[] bytesToGzip) { this.uri = uri; this.method = method; this.headers = headers; this.body = body; + this.gzipOutputStream = gzipOutputStream; + this.bytesToGzip = bytesToGzip; } public URI getUri() { @@ -37,4 +44,8 @@ public class CoercedRequestOptions { public HttpEntity getBody() { return body; } + + public GZIPOutputStream getGzipOutputStream() { return gzipOutputStream; }; + + public byte[] getBytesToGzip() { return bytesToGzip; } } diff --git a/src/java/com/puppetlabs/http/client/impl/JavaClient.java b/src/java/com/puppetlabs/http/client/impl/JavaClient.java index 7bae5b5..1f8ff13 100644 --- a/src/java/com/puppetlabs/http/client/impl/JavaClient.java +++ b/src/java/com/puppetlabs/http/client/impl/JavaClient.java @@ -2,6 +2,7 @@ package com.puppetlabs.http.client.impl; import com.codahale.metrics.MetricRegistry; import com.puppetlabs.http.client.ClientOptions; +import com.puppetlabs.http.client.CompressType; import com.puppetlabs.http.client.HttpClientException; import com.puppetlabs.http.client.HttpMethod; import com.puppetlabs.http.client.RequestOptions; @@ -45,6 +46,8 @@ import org.apache.http.nio.client.methods.HttpAsyncMethods; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.protocol.HttpContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -52,6 +55,8 @@ import javax.net.ssl.X509TrustManager; import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.io.UnsupportedEncodingException; import java.net.URI; import java.nio.charset.UnsupportedCharsetException; @@ -61,11 +66,20 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.HashMap; import java.util.Map; +import java.util.zip.GZIPOutputStream; public class JavaClient { private static final String PROTOCOL = "TLS"; + private static final Logger LOGGER = LoggerFactory.getLogger(JavaClient.class); + + // Buffer size to use in streams for request gzip compression. This is + // somewhat arbitrary but went with the same value as the Apache HTTP + // async client uses for chunking input streams for requests: + // https://github.com/apache/httpcore/blob/4.4.5/httpcore-nio/src/main/java/org/apache/http/nio/entity/EntityAsyncContentProducer.java#L58 + private static int GZIP_BUFFER_SIZE = 4096; + private static Header[] prepareHeaders(RequestOptions options, ContentType contentType) { Map result = new HashMap(); @@ -80,6 +94,10 @@ public class JavaClient { (! result.containsKey("accept-encoding"))) { result.put("accept-encoding", new BasicHeader("Accept-Encoding", "gzip, deflate")); } + if (options.getCompressRequestBody() == CompressType.GZIP && + (! result.containsKey("content-encoding"))) { + result.put("content-encoding", new BasicHeader("Content-Encoding", "gzip")); + } if (contentType != null) { result.put("content-type", new BasicHeader("Content-Type", @@ -123,6 +141,11 @@ public class JavaClient { return contentType; } + private static void throwUnsupportedBodyException(Object body) { + throw new HttpClientException("Unsupported body type for request: " + + body.getClass() + ". Only InputStream and String are supported."); + } + private static CoercedRequestOptions coerceRequestOptions(RequestOptions options, HttpMethod method) { URI uri = options.getUri(); @@ -135,27 +158,59 @@ public class JavaClient { Header[] headers = prepareHeaders(options, contentType); HttpEntity body = null; + GZIPOutputStream gzipOutputStream = null; + Object bodyFromOptions = options.getBody(); + byte[] bytesToGzip = null; - if (options.getBody() instanceof String) { - String originalBody = (String) options.getBody(); - if (contentType != null) { - body = new NStringEntity(originalBody, contentType); - } - else { + if ((bodyFromOptions instanceof String) || + (bodyFromOptions instanceof InputStream)) { + // See comments in the requestWithClient() method about why the + // request body is routed through a GZIPOutputStream, + // PipedOutputStream, and PipedInputStream in order to achieve + // gzip compression. + if (options.getCompressRequestBody() == CompressType.GZIP) { + PipedInputStream pis = new PipedInputStream(GZIP_BUFFER_SIZE); + PipedOutputStream pos = new PipedOutputStream(); try { - body = new NStringEntity(originalBody); - } - catch (UnsupportedEncodingException e) { + pos.connect(pis); + gzipOutputStream = new GZIPOutputStream(pos, + GZIP_BUFFER_SIZE); + body = new InputStreamEntity(pis); + } catch (IOException ioe) { throw new HttpClientException( - "Unable to create request body", e); + "Error setting up gzip stream for request", ioe); } + if (bodyFromOptions instanceof String) { + String bodyAsString = (String) bodyFromOptions; + if (contentType != null) { + bytesToGzip = bodyAsString.getBytes(contentType.getCharset()); + } else { + bytesToGzip = bodyAsString.getBytes(); + } + } + } else if (bodyFromOptions instanceof String) { + String originalBody = (String) bodyFromOptions; + if (contentType != null) { + body = new NStringEntity(originalBody, contentType); + } + else { + try { + body = new NStringEntity(originalBody); + } + catch (UnsupportedEncodingException e) { + throw new HttpClientException( + "Unable to create request body", e); + } + } + } else { + body = new InputStreamEntity((InputStream) bodyFromOptions); } - - } else if (options.getBody() instanceof InputStream) { - body = new InputStreamEntity((InputStream)options.getBody()); + } else if (bodyFromOptions != null) { + throwUnsupportedBodyException(bodyFromOptions); } - return new CoercedRequestOptions(uri, method, headers, body); + return new CoercedRequestOptions(uri, + method, headers, body, gzipOutputStream, bytesToGzip); } public static CoercedClientOptions coerceClientOptions(ClientOptions options) { @@ -320,6 +375,46 @@ public class JavaClient { client.execute(HttpAsyncMethods.create(request), consumer, timedStreamingCompleteCallback); } + private static void gzipRequestPayload( + GZIPOutputStream gzipOutputStream, + byte[] bytesToGzip, + Object requestBody) { + try { + if (bytesToGzip != null) { + gzipOutputStream.write(bytesToGzip); + } else { + if (requestBody instanceof InputStream) { + InputStream requestInputStream = (InputStream) requestBody; + byte[] byteBuffer = new byte[GZIP_BUFFER_SIZE]; + IOUtils.copyLarge(requestInputStream, + gzipOutputStream, byteBuffer); + } else { + throwUnsupportedBodyException(requestBody); + } + } + // IOExceptions may be thrown either during the IOUtils.copyLarge() + // call above or during the close() call to the GZIPOutputStream below. + // The GZIPOutputStream object is backed by a PipedOutputStream object + // which is connected to a PipedInputStream object. Most likely, any + // IOExceptions thrown up to this level would be due to the underlying + // PipedInputStream being closed prematurely. In those cases, the + // Apache HTTP Async library should detect the failure while processing + // the request and deliver an appropriate "failure" callback as the + // result for the request. The IOExceptions are caught and not rethrown + // here so that the client can still receive the "failure" callback from + // the Apache HTTP Async library later on. The exceptions are still + // logged here at a debug level for troubleshooting purposes. + } catch (IOException ioe) { + LOGGER.debug("Error writing gzip request body", ioe); + } finally { + try { + gzipOutputStream.close(); + } catch (IOException ioe) { + LOGGER.debug("Error closing gzip request stream", ioe); + } + } + } + public static void requestWithClient(final RequestOptions requestOptions, final HttpMethod method, final IResponseCallback callback, @@ -364,6 +459,48 @@ public class JavaClient { TimerUtils.startFullResponseTimers(registry, request, metricId, metricNamespace)); client.execute(request, timedFutureCallback); } + + // The approach used for gzip-compressing the request payload is far from + // ideal here. The approach involves reading the bytes from the supplied + // request body, redirecting those through a JDK GZIPOutputStream object + // to compress them, and then piping those back into in InputStream that + // the Apache Async HTTP layer reads from in order to get the bytes + // to transmit in the HTTP request. The JDK apparently has no built-in + // functionality for gzip-compressing a source byte array or InputStream + // back into a separate InputStream that the Apache Async HTTP layer + // could use. + // + // A better approach would probably be to do something like one of the + // approaches discussed in http://stackoverflow.com/questions/11036280/compress-an-inputstream-with-gzip. + // For example, the InputStream given to the Apache Async HTTP layer + // could be wrapped with a FilterInputStream which gzip-compresses + // bytes on the fly as the Apache Async HTTP layer asks for them. The + // approaches on that thread are pretty involved, though, and appear to + // have liberally copied content from the JDK source. A clean-room + // implementation would probably be better but would also likely + // require a fair bit of testing to ensure that it produces good gzip + // content under varying read scenarios. + // + // The approach being used for now requires writing through the + // GZIPOutputStream and underlying PipedOutputStream from the thread on + // which the HTTP request is made. The connected PipedInputStream is + // then read from a separate thread, one of the Apache HTTP Async IO + // worker threads -- hopefully avoiding the possibility of a deadlock + // in the process. + // + // For requests that provide an InputStream as a source argument, it + // would also probably be more performant to do the GZIPOutputStream + // writing from a separate thread and would give an AsyncHttpClient + // requestor the ability to do other work while the source InputStream + // is being read and compressed. As a simplification for now, this + // implementation doesn't spin up a separate thread (or thread pool) + // for performing gzip compression. + GZIPOutputStream gzipOutputStream = coercedRequestOptions.getGzipOutputStream(); + if (gzipOutputStream != null) { + gzipRequestPayload(gzipOutputStream, + coercedRequestOptions.getBytesToGzip(), + requestOptions.getBody()); + } } public static CloseableHttpAsyncClient createClient(ClientOptions clientOptions) { diff --git a/test/puppetlabs/http/client/gzip_request_test.clj b/test/puppetlabs/http/client/gzip_request_test.clj new file mode 100644 index 0000000..e18bf41 --- /dev/null +++ b/test/puppetlabs/http/client/gzip_request_test.clj @@ -0,0 +1,118 @@ +(ns puppetlabs.http.client.gzip-request-test + (:import (com.puppetlabs.http.client Sync + SimpleRequestOptions + ResponseBodyType + CompressType) + (java.io ByteArrayInputStream) + (java.net URI) + (java.util.zip GZIPInputStream)) + (:require [clojure.test :refer :all] + [cheshire.core :as cheshire] + [schema.test :as schema-test] + [puppetlabs.http.client.sync :as http-client] + [puppetlabs.http.client.test-common :refer :all] + [puppetlabs.trapperkeeper.testutils.webserver :as testwebserver])) + +(use-fixtures :once schema-test/validate-schemas) + +(defn req-body-app + [req] + (let [response {:request-content-encoding (get-in req [:headers "content-encoding"]) + :request-body-decompressed (slurp + (GZIPInputStream. (:body req)) + :encoding "utf-8")}] + {:status 200 + :headers {"Content-Type" "application/json; charset=utf-8"} + :body (cheshire/generate-string response)})) + +(def short-request-body "gzip me�") + +(def big-request-body + (apply str (repeat 4000 "and�i�said�hey�yeah�yeah�whats�going�on"))) + +(defn string->byte-array-input-stream + [source] + (-> source + (.getBytes) + (ByteArrayInputStream.))) + +(defn post-gzip-clj-request + [port body] + (-> (http-client/post (format "http://localhost:%d" port) + {:body body + :headers {"Content-Type" "text/plain; charset=utf-8"} + :compress-request-body :gzip + :as :text}) + :body + (cheshire/parse-string true))) + +(defn post-gzip-java-request + [port body] + (-> (SimpleRequestOptions. (URI. (format "http://localhost:%d/hello/" port))) + (.setBody body) + (.setHeaders {"Content-Type" "text/plain; charset=utf-8"}) + (.setRequestBodyCompression CompressType/GZIP) + (.setAs ResponseBodyType/TEXT) + (Sync/post) + (.getBody) + (cheshire/parse-string true))) + +(deftest clj-sync-client-gzip-requests + (testing "for clojure sync client" + (testwebserver/with-test-webserver + req-body-app + port + (testing "short string body is gzipped in request" + (let [response (post-gzip-clj-request port short-request-body)] + (is (= "gzip" (:request-content-encoding response))) + (is (= short-request-body (:request-body-decompressed response))))) + (testing "big string body is gzipped in request" + (let [response (post-gzip-clj-request port big-request-body)] + (is (= "gzip" (:request-content-encoding response))) + (is (= big-request-body (:request-body-decompressed response))))) + (testing "short inputstream body is gzipped in request" + (let [response (post-gzip-clj-request + port + (string->byte-array-input-stream short-request-body))] + (is (= "gzip" (:request-content-encoding response))) + (is (= short-request-body (:request-body-decompressed response))))) + (testing "big inputstream body is gzipped in request" + (let [response (post-gzip-clj-request + port + (string->byte-array-input-stream big-request-body))] + (is (= "gzip" (:request-content-encoding response))) + (is (= big-request-body (:request-body-decompressed response)))))))) + +(deftest java-sync-client-gzip-requests + (testing "for java sync client" + (testwebserver/with-test-webserver + req-body-app + port + (testing "short string body is gzipped in request" + (let [response (post-gzip-java-request port short-request-body)] + (is (= "gzip" (:request-content-encoding response))) + (is (= short-request-body (:request-body-decompressed response))))) + (testing "big string body is gzipped in request" + (let [response (post-gzip-java-request port big-request-body)] + (is (= "gzip" (:request-content-encoding response))) + (is (= big-request-body (:request-body-decompressed response))))) + (testing "short inputstream body is gzipped in request" + (let [response (post-gzip-java-request + port + (string->byte-array-input-stream short-request-body))] + (is (= "gzip" (:request-content-encoding response))) + (is (= short-request-body (:request-body-decompressed response))))) + (testing "big inputstream body is gzipped in request" + (let [response (post-gzip-java-request + port + (string->byte-array-input-stream big-request-body))] + (is (= "gzip" (:request-content-encoding response))) + (is (= big-request-body (:request-body-decompressed response)))))))) + +(deftest connect-exception-during-gzip-request-returns-failure + (testing "connection exception during gzip request returns failure" + (is (connect-exception-thrown? + (http-client/post "http://localhost:65535" + {:body short-request-body + :compress-request-body :gzip + :as :text}))))) From cb5de4555d461a2c7c806e95c96f57b05e256dc2 Mon Sep 17 00:00:00 2001 From: Jeremy Barlow Date: Wed, 8 Feb 2017 12:29:42 -0800 Subject: [PATCH 2/2] (PDB-2640) Close gzip-wrapped body input stream on request completion In the previous commit, if gzip compression was applied to a request input stream, the stream was not automatically closed when the request was complete. The Apache HTTP async library, however, closes an input stream in cases where gzip compression is not being added. In this commit, the original request input stream is closed immediately after attempts to copy the data from it to the downstream gzipped input stream have been completed, for compatibility with the pre-existing Apache HTTP async library behavior. --- .../http/client/impl/JavaClient.java | 8 ++- .../http/client/gzip_request_test.clj | 64 ++++++++++++------- 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/src/java/com/puppetlabs/http/client/impl/JavaClient.java b/src/java/com/puppetlabs/http/client/impl/JavaClient.java index 1f8ff13..ebec76c 100644 --- a/src/java/com/puppetlabs/http/client/impl/JavaClient.java +++ b/src/java/com/puppetlabs/http/client/impl/JavaClient.java @@ -386,8 +386,12 @@ public class JavaClient { if (requestBody instanceof InputStream) { InputStream requestInputStream = (InputStream) requestBody; byte[] byteBuffer = new byte[GZIP_BUFFER_SIZE]; - IOUtils.copyLarge(requestInputStream, - gzipOutputStream, byteBuffer); + try { + IOUtils.copyLarge(requestInputStream, + gzipOutputStream, byteBuffer); + } finally { + requestInputStream.close(); + } } else { throwUnsupportedBodyException(requestBody); } diff --git a/test/puppetlabs/http/client/gzip_request_test.clj b/test/puppetlabs/http/client/gzip_request_test.clj index e18bf41..4b7e7e5 100644 --- a/test/puppetlabs/http/client/gzip_request_test.clj +++ b/test/puppetlabs/http/client/gzip_request_test.clj @@ -3,7 +3,7 @@ SimpleRequestOptions ResponseBodyType CompressType) - (java.io ByteArrayInputStream) + (java.io ByteArrayInputStream FilterInputStream) (java.net URI) (java.util.zip GZIPInputStream)) (:require [clojure.test :refer :all] @@ -31,10 +31,14 @@ (apply str (repeat 4000 "and�i�said�hey�yeah�yeah�whats�going�on"))) (defn string->byte-array-input-stream - [source] - (-> source - (.getBytes) - (ByteArrayInputStream.))) + [source is-closed-atom] + (let [bis (-> source + (.getBytes) + (ByteArrayInputStream.))] + (proxy [FilterInputStream] [bis] + (close [] + (reset! is-closed-atom true) + (proxy-super close))))) (defn post-gzip-clj-request [port body] @@ -71,17 +75,23 @@ (is (= "gzip" (:request-content-encoding response))) (is (= big-request-body (:request-body-decompressed response))))) (testing "short inputstream body is gzipped in request" - (let [response (post-gzip-clj-request + (let [is-closed (atom false) + response (post-gzip-clj-request port - (string->byte-array-input-stream short-request-body))] + (string->byte-array-input-stream short-request-body + is-closed))] (is (= "gzip" (:request-content-encoding response))) - (is (= short-request-body (:request-body-decompressed response))))) + (is (= short-request-body (:request-body-decompressed response))) + (is @is-closed "input stream was not closed after request"))) (testing "big inputstream body is gzipped in request" - (let [response (post-gzip-clj-request + (let [is-closed (atom false) + response (post-gzip-clj-request port - (string->byte-array-input-stream big-request-body))] + (string->byte-array-input-stream big-request-body + is-closed))] (is (= "gzip" (:request-content-encoding response))) - (is (= big-request-body (:request-body-decompressed response)))))))) + (is (= big-request-body (:request-body-decompressed response))) + (is @is-closed "input stream was not closed after request")))))) (deftest java-sync-client-gzip-requests (testing "for java sync client" @@ -97,22 +107,32 @@ (is (= "gzip" (:request-content-encoding response))) (is (= big-request-body (:request-body-decompressed response))))) (testing "short inputstream body is gzipped in request" - (let [response (post-gzip-java-request + (let [is-closed (atom false) + response (post-gzip-java-request port - (string->byte-array-input-stream short-request-body))] + (string->byte-array-input-stream short-request-body + is-closed))] (is (= "gzip" (:request-content-encoding response))) - (is (= short-request-body (:request-body-decompressed response))))) + (is (= short-request-body (:request-body-decompressed response))) + (is @is-closed "input stream was not closed after request"))) (testing "big inputstream body is gzipped in request" - (let [response (post-gzip-java-request + (let [is-closed (atom false) + response (post-gzip-java-request port - (string->byte-array-input-stream big-request-body))] + (string->byte-array-input-stream big-request-body + is-closed))] (is (= "gzip" (:request-content-encoding response))) - (is (= big-request-body (:request-body-decompressed response)))))))) + (is (= big-request-body (:request-body-decompressed response))) + (is @is-closed "input stream was not closed after request")))))) (deftest connect-exception-during-gzip-request-returns-failure (testing "connection exception during gzip request returns failure" - (is (connect-exception-thrown? - (http-client/post "http://localhost:65535" - {:body short-request-body - :compress-request-body :gzip - :as :text}))))) + (let [is-closed (atom false)] + (is (connect-exception-thrown? + (http-client/post "http://localhost:65535" + {:body (string->byte-array-input-stream + short-request-body + is-closed) + :compress-request-body :gzip + :as :text}))) + (is @is-closed "input stream was not closed after request"))))