(PDB-2640) Add option for gzip compressing a request's body

This commit adds a new Clojure HTTP request option,
`:compress-request-body`, and corresponding Java option which can be used
to have gzip compression applied to the request's body before it is sent
along to the server.
This commit is contained in:
Jeremy Barlow 2017-01-18 17:55:10 -08:00
parent 4c60d6d371
commit 689db7cb87
12 changed files with 340 additions and 21 deletions

View file

@ -92,6 +92,11 @@ which is a map containing options for the HTTP request. These options are as fol
with a value of `puppetlabs.core.i18n/user-locale` will be added to the
request.
* `:body`: optional; may be a String or any type supported by clojure's reader
* `:compress-request-body`: optional; used to control any additional compression
which the client can apply to the request body before it is sent to the target
server. Defaults to `:none`. Supported values are:
* `:gzip` which will compress the request body as gzip
* `:none` which will not apply any additional compression to the request body
* `:decompress-body`: optional; if `true`, an 'accept-encoding' header with a value of
'gzip, deflate' will be added to the request, and the response will be
automatically decompressed if it contains a recognized 'content-encoding'

View file

@ -32,7 +32,8 @@
;; depend on this source jar using a :classifier in their :dependencies.
:classifiers [["sources" :sources-jar]]
:profiles {:dev {:dependencies [[puppetlabs/kitchensink nil :classifier "test"]
:profiles {:dev {:dependencies [[cheshire]
[puppetlabs/kitchensink nil :classifier "test"]
[puppetlabs/trapperkeeper]
[puppetlabs/trapperkeeper nil :classifier "test"]
[puppetlabs/trapperkeeper-webserver-jetty9]

View file

@ -10,7 +10,7 @@
;; these methods.
(ns puppetlabs.http.client.async
(:import (com.puppetlabs.http.client ClientOptions RequestOptions ResponseBodyType HttpMethod)
(:import (com.puppetlabs.http.client ClientOptions RequestOptions ResponseBodyType HttpMethod CompressType)
(com.puppetlabs.http.client.impl JavaClient ResponseDeliveryDelegate)
(org.apache.http.client.utils URIBuilder)
(org.apache.http.nio.client HttpAsyncClient)
@ -119,6 +119,12 @@
:text ResponseBodyType/TEXT
ResponseBodyType/STREAM))
(schema/defn clojure-compress-request-body-type->java :- CompressType
[opts :- common/RequestOptions]
(case (:compress-request-body opts)
:gzip CompressType/GZIP
CompressType/NONE))
(defn parse-metric-id
[opts]
(when-let [metric-id (:metric-id opts)]
@ -131,6 +137,7 @@
(.setAs (clojure-response-body-type->java opts))
(.setBody (:body opts))
(.setDecompressBody (clojure.core/get opts :decompress-body true))
(.setCompressRequestBody (clojure-compress-request-body-type->java opts))
(.setHeaders (:headers opts))
(.setMetricId (parse-metric-id opts))))
@ -177,6 +184,7 @@
(let [result (promise)
defaults {:body nil
:decompress-body true
:compress-request-body :none
:as :stream}
^Locale locale (i18n/user-locale)
;; lower-case the header names so that we don't end up with

View file

@ -42,6 +42,9 @@
(def BodyType
(schema/enum :text :stream :unbuffered-stream))
(def CompressType
(schema/enum :gzip :none))
(def MetricId [(schema/either schema/Str schema/Keyword)])
(def RawUserRequestClientOptions
@ -53,6 +56,7 @@
(ok :headers) Headers
(ok :body) Body
(ok :decompress-body) schema/Bool
(ok :compress-request-body) CompressType
(ok :as) BodyType
(ok :query-params) {schema/Str schema/Str}
(ok :metric-id) [schema/Str]
@ -76,6 +80,7 @@
(ok :headers) Headers
(ok :body) Body
(ok :decompress-body) schema/Bool
(ok :compress-request-body) CompressType
(ok :as) BodyType
(ok :query-params) {schema/Str schema/Str}
(ok :metric-id) MetricId})
@ -90,6 +95,7 @@
:headers Headers
:body Body
:decompress-body schema/Bool
:compress-request-body CompressType
:as BodyType
(ok :query-params) {schema/Str schema/Str}
(ok :metric-id) MetricId})

View file

@ -21,7 +21,9 @@
(schema/defn extract-request-opts :- common/RawUserRequestOptions
[opts :- common/RawUserRequestClientOptions]
(select-keys opts [:url :method :headers :body :decompress-body :as :query-params]))
(select-keys opts [:url :method :headers :body
:decompress-body :compress-request-body
:as :query-params]))
(defn request-with-client
([req client]

View file

@ -0,0 +1,6 @@
package com.puppetlabs.http.client;
public enum CompressType {
GZIP,
NONE
}

View file

@ -12,6 +12,7 @@ public class RequestOptions {
private URI uri;
private Map<String, String> headers;
private Object body;
private CompressType requestBodyCompression = CompressType.NONE;
private boolean decompressBody = true;
private ResponseBodyType as = ResponseBodyType.STREAM;
private String[] metricId;
@ -84,6 +85,15 @@ public class RequestOptions {
return this;
}
public CompressType getCompressRequestBody() {
return requestBodyCompression;
}
public RequestOptions setCompressRequestBody(
CompressType requestBodyCompression) {
this.requestBodyCompression = requestBodyCompression;
return this;
}
public ResponseBodyType getAs() {
return as;
}

View file

@ -24,6 +24,7 @@ public class SimpleRequestOptions {
private String[] sslCipherSuites;
private boolean insecure = false;
private Object body;
private CompressType requestBodyCompression = CompressType.NONE;
private boolean decompressBody = true;
private ResponseBodyType as = ResponseBodyType.STREAM;
private boolean forceRedirects = false;
@ -137,6 +138,15 @@ public class SimpleRequestOptions {
return this;
}
public CompressType getCompressRequestBody() {
return requestBodyCompression;
}
public SimpleRequestOptions setRequestBodyCompression(
CompressType requestBodyCompression) {
this.requestBodyCompression = requestBodyCompression;
return this;
}
public ResponseBodyType getAs() {
return as;
}
@ -176,4 +186,4 @@ public class SimpleRequestOptions {
this.socketTimeoutMilliseconds = socketTimeoutMilliseconds;
return this;
}
}
}

View file

@ -31,7 +31,12 @@ public class Sync {
Object body = simpleOptions.getBody();
boolean decompressBody = simpleOptions.getDecompressBody();
ResponseBodyType as = simpleOptions.getAs();
return new RequestOptions(uri, headers, body, decompressBody, as);
CompressType requestBodyDecompression =
simpleOptions.getCompressRequestBody();
RequestOptions requestOptions = new RequestOptions(
uri, headers, body, decompressBody, as);
requestOptions.setCompressRequestBody(requestBodyDecompression);
return requestOptions;
}
private static ClientOptions extractClientOptions(SimpleRequestOptions simpleOptions) {

View file

@ -5,21 +5,28 @@ import org.apache.http.Header;
import org.apache.http.HttpEntity;
import java.net.URI;
import java.util.zip.GZIPOutputStream;
public class CoercedRequestOptions {
class CoercedRequestOptions {
private final URI uri;
private final HttpMethod method;
private final Header[] headers;
private final HttpEntity body;
private final GZIPOutputStream gzipOutputStream;
private final byte[] bytesToGzip;
public CoercedRequestOptions(URI uri,
HttpMethod method,
Header[] headers,
HttpEntity body) {
HttpEntity body,
GZIPOutputStream gzipOutputStream,
byte[] bytesToGzip) {
this.uri = uri;
this.method = method;
this.headers = headers;
this.body = body;
this.gzipOutputStream = gzipOutputStream;
this.bytesToGzip = bytesToGzip;
}
public URI getUri() {
@ -37,4 +44,8 @@ public class CoercedRequestOptions {
public HttpEntity getBody() {
return body;
}
public GZIPOutputStream getGzipOutputStream() { return gzipOutputStream; };
public byte[] getBytesToGzip() { return bytesToGzip; }
}

View file

@ -2,6 +2,7 @@ package com.puppetlabs.http.client.impl;
import com.codahale.metrics.MetricRegistry;
import com.puppetlabs.http.client.ClientOptions;
import com.puppetlabs.http.client.CompressType;
import com.puppetlabs.http.client.HttpClientException;
import com.puppetlabs.http.client.HttpMethod;
import com.puppetlabs.http.client.RequestOptions;
@ -45,6 +46,8 @@ import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
@ -52,6 +55,8 @@ import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.charset.UnsupportedCharsetException;
@ -61,11 +66,20 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.GZIPOutputStream;
public class JavaClient {
private static final String PROTOCOL = "TLS";
private static final Logger LOGGER = LoggerFactory.getLogger(JavaClient.class);
// Buffer size to use in streams for request gzip compression. This is
// somewhat arbitrary but went with the same value as the Apache HTTP
// async client uses for chunking input streams for requests:
// https://github.com/apache/httpcore/blob/4.4.5/httpcore-nio/src/main/java/org/apache/http/nio/entity/EntityAsyncContentProducer.java#L58
private static int GZIP_BUFFER_SIZE = 4096;
private static Header[] prepareHeaders(RequestOptions options,
ContentType contentType) {
Map<String, Header> result = new HashMap<String, Header>();
@ -80,6 +94,10 @@ public class JavaClient {
(! result.containsKey("accept-encoding"))) {
result.put("accept-encoding", new BasicHeader("Accept-Encoding", "gzip, deflate"));
}
if (options.getCompressRequestBody() == CompressType.GZIP &&
(! result.containsKey("content-encoding"))) {
result.put("content-encoding", new BasicHeader("Content-Encoding", "gzip"));
}
if (contentType != null) {
result.put("content-type", new BasicHeader("Content-Type",
@ -123,6 +141,11 @@ public class JavaClient {
return contentType;
}
private static void throwUnsupportedBodyException(Object body) {
throw new HttpClientException("Unsupported body type for request: " +
body.getClass() + ". Only InputStream and String are supported.");
}
private static CoercedRequestOptions coerceRequestOptions(RequestOptions options, HttpMethod method) {
URI uri = options.getUri();
@ -135,27 +158,59 @@ public class JavaClient {
Header[] headers = prepareHeaders(options, contentType);
HttpEntity body = null;
GZIPOutputStream gzipOutputStream = null;
Object bodyFromOptions = options.getBody();
byte[] bytesToGzip = null;
if (options.getBody() instanceof String) {
String originalBody = (String) options.getBody();
if (contentType != null) {
body = new NStringEntity(originalBody, contentType);
}
else {
if ((bodyFromOptions instanceof String) ||
(bodyFromOptions instanceof InputStream)) {
// See comments in the requestWithClient() method about why the
// request body is routed through a GZIPOutputStream,
// PipedOutputStream, and PipedInputStream in order to achieve
// gzip compression.
if (options.getCompressRequestBody() == CompressType.GZIP) {
PipedInputStream pis = new PipedInputStream(GZIP_BUFFER_SIZE);
PipedOutputStream pos = new PipedOutputStream();
try {
body = new NStringEntity(originalBody);
}
catch (UnsupportedEncodingException e) {
pos.connect(pis);
gzipOutputStream = new GZIPOutputStream(pos,
GZIP_BUFFER_SIZE);
body = new InputStreamEntity(pis);
} catch (IOException ioe) {
throw new HttpClientException(
"Unable to create request body", e);
"Error setting up gzip stream for request", ioe);
}
if (bodyFromOptions instanceof String) {
String bodyAsString = (String) bodyFromOptions;
if (contentType != null) {
bytesToGzip = bodyAsString.getBytes(contentType.getCharset());
} else {
bytesToGzip = bodyAsString.getBytes();
}
}
} else if (bodyFromOptions instanceof String) {
String originalBody = (String) bodyFromOptions;
if (contentType != null) {
body = new NStringEntity(originalBody, contentType);
}
else {
try {
body = new NStringEntity(originalBody);
}
catch (UnsupportedEncodingException e) {
throw new HttpClientException(
"Unable to create request body", e);
}
}
} else {
body = new InputStreamEntity((InputStream) bodyFromOptions);
}
} else if (options.getBody() instanceof InputStream) {
body = new InputStreamEntity((InputStream)options.getBody());
} else if (bodyFromOptions != null) {
throwUnsupportedBodyException(bodyFromOptions);
}
return new CoercedRequestOptions(uri, method, headers, body);
return new CoercedRequestOptions(uri,
method, headers, body, gzipOutputStream, bytesToGzip);
}
public static CoercedClientOptions coerceClientOptions(ClientOptions options) {
@ -320,6 +375,46 @@ public class JavaClient {
client.execute(HttpAsyncMethods.create(request), consumer, timedStreamingCompleteCallback);
}
private static void gzipRequestPayload(
GZIPOutputStream gzipOutputStream,
byte[] bytesToGzip,
Object requestBody) {
try {
if (bytesToGzip != null) {
gzipOutputStream.write(bytesToGzip);
} else {
if (requestBody instanceof InputStream) {
InputStream requestInputStream = (InputStream) requestBody;
byte[] byteBuffer = new byte[GZIP_BUFFER_SIZE];
IOUtils.copyLarge(requestInputStream,
gzipOutputStream, byteBuffer);
} else {
throwUnsupportedBodyException(requestBody);
}
}
// IOExceptions may be thrown either during the IOUtils.copyLarge()
// call above or during the close() call to the GZIPOutputStream below.
// The GZIPOutputStream object is backed by a PipedOutputStream object
// which is connected to a PipedInputStream object. Most likely, any
// IOExceptions thrown up to this level would be due to the underlying
// PipedInputStream being closed prematurely. In those cases, the
// Apache HTTP Async library should detect the failure while processing
// the request and deliver an appropriate "failure" callback as the
// result for the request. The IOExceptions are caught and not rethrown
// here so that the client can still receive the "failure" callback from
// the Apache HTTP Async library later on. The exceptions are still
// logged here at a debug level for troubleshooting purposes.
} catch (IOException ioe) {
LOGGER.debug("Error writing gzip request body", ioe);
} finally {
try {
gzipOutputStream.close();
} catch (IOException ioe) {
LOGGER.debug("Error closing gzip request stream", ioe);
}
}
}
public static void requestWithClient(final RequestOptions requestOptions,
final HttpMethod method,
final IResponseCallback callback,
@ -364,6 +459,48 @@ public class JavaClient {
TimerUtils.startFullResponseTimers(registry, request, metricId, metricNamespace));
client.execute(request, timedFutureCallback);
}
// The approach used for gzip-compressing the request payload is far from
// ideal here. The approach involves reading the bytes from the supplied
// request body, redirecting those through a JDK GZIPOutputStream object
// to compress them, and then piping those back into in InputStream that
// the Apache Async HTTP layer reads from in order to get the bytes
// to transmit in the HTTP request. The JDK apparently has no built-in
// functionality for gzip-compressing a source byte array or InputStream
// back into a separate InputStream that the Apache Async HTTP layer
// could use.
//
// A better approach would probably be to do something like one of the
// approaches discussed in http://stackoverflow.com/questions/11036280/compress-an-inputstream-with-gzip.
// For example, the InputStream given to the Apache Async HTTP layer
// could be wrapped with a FilterInputStream which gzip-compresses
// bytes on the fly as the Apache Async HTTP layer asks for them. The
// approaches on that thread are pretty involved, though, and appear to
// have liberally copied content from the JDK source. A clean-room
// implementation would probably be better but would also likely
// require a fair bit of testing to ensure that it produces good gzip
// content under varying read scenarios.
//
// The approach being used for now requires writing through the
// GZIPOutputStream and underlying PipedOutputStream from the thread on
// which the HTTP request is made. The connected PipedInputStream is
// then read from a separate thread, one of the Apache HTTP Async IO
// worker threads -- hopefully avoiding the possibility of a deadlock
// in the process.
//
// For requests that provide an InputStream as a source argument, it
// would also probably be more performant to do the GZIPOutputStream
// writing from a separate thread and would give an AsyncHttpClient
// requestor the ability to do other work while the source InputStream
// is being read and compressed. As a simplification for now, this
// implementation doesn't spin up a separate thread (or thread pool)
// for performing gzip compression.
GZIPOutputStream gzipOutputStream = coercedRequestOptions.getGzipOutputStream();
if (gzipOutputStream != null) {
gzipRequestPayload(gzipOutputStream,
coercedRequestOptions.getBytesToGzip(),
requestOptions.getBody());
}
}
public static CloseableHttpAsyncClient createClient(ClientOptions clientOptions) {

View file

@ -0,0 +1,118 @@
(ns puppetlabs.http.client.gzip-request-test
(:import (com.puppetlabs.http.client Sync
SimpleRequestOptions
ResponseBodyType
CompressType)
(java.io ByteArrayInputStream)
(java.net URI)
(java.util.zip GZIPInputStream))
(:require [clojure.test :refer :all]
[cheshire.core :as cheshire]
[schema.test :as schema-test]
[puppetlabs.http.client.sync :as http-client]
[puppetlabs.http.client.test-common :refer :all]
[puppetlabs.trapperkeeper.testutils.webserver :as testwebserver]))
(use-fixtures :once schema-test/validate-schemas)
(defn req-body-app
[req]
(let [response {:request-content-encoding (get-in req [:headers "content-encoding"])
:request-body-decompressed (slurp
(GZIPInputStream. (:body req))
:encoding "utf-8")}]
{:status 200
:headers {"Content-Type" "application/json; charset=utf-8"}
:body (cheshire/generate-string response)}))
(def short-request-body "gzip me<6D>")
(def big-request-body
(apply str (repeat 4000 "and<6E>i<EFBFBD>said<69>hey<65>yeah<61>yeah<61>whats<74>going<6E>on")))
(defn string->byte-array-input-stream
[source]
(-> source
(.getBytes)
(ByteArrayInputStream.)))
(defn post-gzip-clj-request
[port body]
(-> (http-client/post (format "http://localhost:%d" port)
{:body body
:headers {"Content-Type" "text/plain; charset=utf-8"}
:compress-request-body :gzip
:as :text})
:body
(cheshire/parse-string true)))
(defn post-gzip-java-request
[port body]
(-> (SimpleRequestOptions. (URI. (format "http://localhost:%d/hello/" port)))
(.setBody body)
(.setHeaders {"Content-Type" "text/plain; charset=utf-8"})
(.setRequestBodyCompression CompressType/GZIP)
(.setAs ResponseBodyType/TEXT)
(Sync/post)
(.getBody)
(cheshire/parse-string true)))
(deftest clj-sync-client-gzip-requests
(testing "for clojure sync client"
(testwebserver/with-test-webserver
req-body-app
port
(testing "short string body is gzipped in request"
(let [response (post-gzip-clj-request port short-request-body)]
(is (= "gzip" (:request-content-encoding response)))
(is (= short-request-body (:request-body-decompressed response)))))
(testing "big string body is gzipped in request"
(let [response (post-gzip-clj-request port big-request-body)]
(is (= "gzip" (:request-content-encoding response)))
(is (= big-request-body (:request-body-decompressed response)))))
(testing "short inputstream body is gzipped in request"
(let [response (post-gzip-clj-request
port
(string->byte-array-input-stream short-request-body))]
(is (= "gzip" (:request-content-encoding response)))
(is (= short-request-body (:request-body-decompressed response)))))
(testing "big inputstream body is gzipped in request"
(let [response (post-gzip-clj-request
port
(string->byte-array-input-stream big-request-body))]
(is (= "gzip" (:request-content-encoding response)))
(is (= big-request-body (:request-body-decompressed response))))))))
(deftest java-sync-client-gzip-requests
(testing "for java sync client"
(testwebserver/with-test-webserver
req-body-app
port
(testing "short string body is gzipped in request"
(let [response (post-gzip-java-request port short-request-body)]
(is (= "gzip" (:request-content-encoding response)))
(is (= short-request-body (:request-body-decompressed response)))))
(testing "big string body is gzipped in request"
(let [response (post-gzip-java-request port big-request-body)]
(is (= "gzip" (:request-content-encoding response)))
(is (= big-request-body (:request-body-decompressed response)))))
(testing "short inputstream body is gzipped in request"
(let [response (post-gzip-java-request
port
(string->byte-array-input-stream short-request-body))]
(is (= "gzip" (:request-content-encoding response)))
(is (= short-request-body (:request-body-decompressed response)))))
(testing "big inputstream body is gzipped in request"
(let [response (post-gzip-java-request
port
(string->byte-array-input-stream big-request-body))]
(is (= "gzip" (:request-content-encoding response)))
(is (= big-request-body (:request-body-decompressed response))))))))
(deftest connect-exception-during-gzip-request-returns-failure
(testing "connection exception during gzip request returns failure"
(is (connect-exception-thrown?
(http-client/post "http://localhost:65535"
{:body short-request-body
:compress-request-body :gzip
:as :text})))))