Update Java API to include ResponseBodyType.UNBUFFERED_STREAM and update Clojure implementation to give consistent HttpContext usage with both clj and Java

This commit is contained in:
Scott Walker 2016-01-08 13:10:58 +00:00
parent a5ed1d0bf8
commit 15b13fed4f
6 changed files with 283 additions and 141 deletions

View file

@ -22,15 +22,15 @@
(org.apache.http.nio.entity NStringEntity)
(org.apache.http.entity InputStreamEntity ContentType)
(java.io InputStream)
(com.puppetlabs.http.client.impl Compression StreamingAsyncResponseConsumer FnDeliverable)
(com.puppetlabs.http.client.impl StreamingAsyncResponseConsumer FnDeliverable)
(org.apache.http.client RedirectStrategy)
(org.apache.http.impl.client LaxRedirectStrategy DefaultRedirectStrategy)
(org.apache.http.nio.conn.ssl SSLIOSessionStrategy)
(org.apache.http.client.config RequestConfig)
(org.apache.http.nio.client.methods HttpAsyncMethods)
(org.apache.http.nio.client HttpAsyncClient)
(org.apache.http.client.protocol ResponseContentEncoding)
(org.apache.http.protocol BasicHttpContext))
(org.apache.http.client.protocol ResponseContentEncoding HttpClientContext)
(org.apache.http.protocol HttpContext))
(:require [puppetlabs.ssl-utils.core :as ssl]
[clojure.string :as str]
[puppetlabs.http.client.common :as common]
@ -188,7 +188,7 @@
""))))
(defn- response-map
[opts http-response]
[opts http-response http-context]
(let [headers (get-resp-headers http-response)
orig-encoding (headers "content-encoding")]
{:opts opts
@ -200,7 +200,7 @@
(if (:decompress-body opts)
(.process (ResponseContentEncoding.)
http-response
(BasicHttpContext.)))
http-context))
(when-let [entity (.getEntity http-response)]
(.getContent entity)))}))
@ -232,9 +232,10 @@
[result :- common/ResponsePromise
opts :- common/RequestOptions
callback :- common/ResponseCallbackFn
http-response :- HttpResponse]
http-response :- HttpResponse
http-context :- HttpContext]
(try
(let [response (cond-> (response-map opts http-response)
(let [response (cond-> (response-map opts http-response http-context)
(and (not= :stream (:as opts))
(not= :unbuffered-stream (:as opts))) (coerce-body-type))]
(deliver-result result opts callback response))
@ -245,10 +246,11 @@
(schema/defn future-callback :- FutureCallback
[result :- common/ResponsePromise
opts :- common/RequestOptions
callback :- common/ResponseCallbackFn]
callback :- common/ResponseCallbackFn
http-context :- HttpContext]
(reify FutureCallback
(completed [this http-response]
(complete-response result opts callback http-response))
(complete-response result opts callback http-response http-context))
(failed [this e]
(deliver-result result opts callback
(error-response opts e)))
@ -330,14 +332,15 @@
future-callback :- FutureCallback
result :- common/ResponsePromise
opts :- common/RequestOptions
callback :- common/ResponseCallbackFn]
callback :- common/ResponseCallbackFn
http-context :- HttpContext]
(let [;; Create an Apache AsyncResponseConsumer that will return the response to us as soon as it is
;; available then send the response body asynchronously
consumer (StreamingAsyncResponseConsumer.
(FnDeliverable.
(fn
[http-response]
(complete-response result opts callback http-response))))
(complete-response result opts callback http-response http-context))))
;; If an error occurs early in the request, the consumer may not get a chance to return the
;; response using the FnDeliverable. This wrapper around the future-callback guarantees that
;; happens. It also takes care of notifying the consumer of the final result.
@ -397,9 +400,10 @@
(.setHeaders request (:headers coerced-opts))
(when body
(.setEntity request body))
(let [future-callback (future-callback result opts callback)]
(let [http-context (HttpClientContext/create)
future-callback (future-callback result opts callback http-context)]
(if (= :unbuffered-stream (:as opts))
(execute-with-consumer client request future-callback result opts callback)
(execute-with-consumer client request future-callback result opts callback http-context)
(.execute client request future-callback)))
result))

View file

@ -4,6 +4,7 @@ package com.puppetlabs.http.client;
* This Enum represents the possible types of the body of a response.
*/
public enum ResponseBodyType {
UNBUFFERED_STREAM,
STREAM,
TEXT;
}

View file

@ -1,23 +0,0 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.http.client.HttpClientException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;
public class Compression {
public static InputStream gunzip(InputStream gzipped) {
try {
return new GZIPInputStream(gzipped);
} catch (IOException e) {
throw new HttpClientException("Unable to gunzip stream", e);
}
}
public static InputStream inflate(InputStream deflated) {
return new InflaterInputStream(deflated);
}
}

View file

@ -29,6 +29,8 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpTrace;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.protocol.ResponseContentEncoding;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
@ -38,6 +40,7 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.client.methods.HttpAsyncMethods;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.protocol.HttpContext;
@ -216,50 +219,113 @@ public class JavaClient {
return context;
}
private static void completeResponse(Promise<Response> promise,
RequestOptions requestOptions,
IResponseCallback callback,
HttpResponse httpResponse,
HttpContext httpContext) {
try {
Map<String, String> headers = new HashMap<>();
for (Header h : httpResponse.getAllHeaders()) {
headers.put(h.getName().toLowerCase(), h.getValue());
}
String origContentEncoding = headers.get("content-encoding");
if (requestOptions.getDecompressBody()) {
new ResponseContentEncoding().process(httpResponse, httpContext);
}
Object body = null;
HttpEntity entity = httpResponse.getEntity();
if (entity != null) {
body = entity.getContent();
}
ContentType contentType = null;
if (headers.get("content-type") != null) {
contentType = ContentType.parse(headers.get("content-type"));
}
if (requestOptions.getAs() == ResponseBodyType.TEXT) {
body = coerceBodyType((InputStream) body, requestOptions.getAs(), contentType);
}
deliverResponse(requestOptions,
new Response(requestOptions, origContentEncoding, body,
headers, httpResponse.getStatusLine().getStatusCode(),
contentType),
callback, promise);
} catch (Exception e) {
deliverResponse(requestOptions, new Response(requestOptions, e), callback, promise);
}
}
private static void executeWithConsumer(final CloseableHttpAsyncClient client,
final FutureCallback<HttpResponse> futureCallback,
final HttpRequestBase request) {
/*
* Create an Apache AsyncResponseConsumer that will return the response to us as soon as it is available,
* then send the response body asynchronously
*/
final StreamingAsyncResponseConsumer consumer = new StreamingAsyncResponseConsumer(new Deliverable<HttpResponse>() {
@Override
public void deliver(HttpResponse httpResponse) {
futureCallback.completed(httpResponse);
}
});
/*
* Normally the consumer returns the response as soon as it is available using the deliver() callback (above)
* which delegates to the supplied futureCallback.
*
* If an error occurs early in the request, the consumer may not get a chance to delivery the response. This
* streamingCompleteCallback wraps the supplied futureCallback and ensures:
* - The supplied futureCallback is always eventually called even in error states
* - Any exception that occurs during stream processing (after the response has been returned) is propagated
* back to the client using the setFinalResult() method.
*/
FutureCallback<HttpResponse> streamingCompleteCallback = new
FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
consumer.setFinalResult(null);
futureCallback.completed(httpResponse);
}
@Override
public void failed(Exception e) {
if (e instanceof IOException) {
consumer.setFinalResult((IOException) e);
} else {
consumer.setFinalResult(new IOException(e));
}
futureCallback.failed(e);
}
@Override
public void cancelled() {
consumer.setFinalResult(null);
futureCallback.cancelled();
}
};
client.execute(HttpAsyncMethods.create(request), consumer, streamingCompleteCallback);
}
public static Promise<Response> requestWithClient(final RequestOptions requestOptions,
final HttpMethod method,
final IResponseCallback callback,
final CloseableHttpAsyncClient client) {
CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions, method);
HttpRequestBase request = constructRequest(coercedRequestOptions.getMethod(),
coercedRequestOptions.getUri(), coercedRequestOptions.getBody());
request.setHeaders(coercedRequestOptions.getHeaders());
final Promise<Response> promise = new Promise<Response>();
final HttpContext httpContext = HttpClientContext.create();
client.execute(request, new FutureCallback<org.apache.http.HttpResponse>() {
final Promise<Response> promise = new Promise<>();
FutureCallback futureCallback = new FutureCallback<HttpResponse>() {
@Override
public void completed(org.apache.http.HttpResponse httpResponse) {
try {
Object body = null;
HttpEntity entity = httpResponse.getEntity();
if (entity != null) {
body = entity.getContent();
}
Map<String, String> headers = new HashMap<String, String>();
for (Header h : httpResponse.getAllHeaders()) {
headers.put(h.getName().toLowerCase(), h.getValue());
}
String origContentEncoding = headers.get("content-encoding");
if (requestOptions.getDecompressBody()) {
body = decompress((InputStream)body, headers);
}
ContentType contentType = null;
if (headers.get("content-type") != null) {
contentType = ContentType.parse(headers.get("content-type"));
}
if (requestOptions.getAs() != ResponseBodyType.STREAM) {
body = coerceBodyType((InputStream)body, requestOptions.getAs(), contentType);
}
deliverResponse(requestOptions,
new Response(requestOptions, origContentEncoding, body,
headers, httpResponse.getStatusLine().getStatusCode(),
contentType),
callback, promise);
} catch (Exception e) {
deliverResponse(requestOptions, new Response(requestOptions, e), callback, promise);
}
public void completed(HttpResponse httpResponse) {
completeResponse(promise, requestOptions, callback, httpResponse, httpContext);
}
@Override
@ -271,7 +337,13 @@ public class JavaClient {
public void cancelled() {
deliverResponse(requestOptions, new Response(requestOptions, new HttpClientException("Request cancelled", null)), callback, promise);
}
});
};
if (requestOptions.getAs() == ResponseBodyType.UNBUFFERED_STREAM) {
executeWithConsumer(client, futureCallback, request);
} else {
client.execute(request, futureCallback);
}
return promise;
}
@ -397,22 +469,6 @@ public class JavaClient {
return request;
}
public static InputStream decompress(InputStream compressed, Map<String, String> headers) {
String contentEncoding = headers.get("content-encoding");
if (contentEncoding == null) {
return compressed;
}
switch (contentEncoding) {
case "gzip":
headers.remove("content-encoding");
return Compression.gunzip(compressed);
case "deflate":
headers.remove("content-encoding");
return Compression.inflate(compressed);
default:
return compressed;
}
}
public static Object coerceBodyType(InputStream body, ResponseBodyType as,
ContentType contentType) {

View file

@ -1,22 +1,16 @@
(ns puppetlabs.http.client.async-unbuffered-test
(:import (com.puppetlabs.http.client Async RequestOptions ClientOptions ResponseBodyType)
(org.apache.http.impl.nio.client HttpAsyncClients)
(java.net URI SocketTimeoutException ServerSocket ConnectException)
(java.net SocketTimeoutException ConnectException)
(java.io PipedInputStream PipedOutputStream)
(java.util.concurrent TimeoutException)
(java.util UUID))
(:require [clojure.test :refer :all]
[clojure.java.io :as io]
[puppetlabs.http.client.test-common :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[puppetlabs.trapperkeeper.testutils.webserver :as testwebserver]
[puppetlabs.trapperkeeper.services.webserver.jetty9-service :as jetty9]
[puppetlabs.http.client.common :as common]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]
[clojure.tools.logging :as log]))
[schema.test :as schema-test]))
(use-fixtures :once schema-test/validate-schemas)
@ -118,7 +112,7 @@
[data opts]
(testlogging/with-test-logging
(testing " - check data can be streamed successfully success"
(testing " - check data can be streamed successfully"
(testwebserver/with-test-webserver-and-config
(successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
@ -154,13 +148,13 @@
(deftest clojure-blocking-streaming-without-decompression
(testing "clojure :unbuffered-stream with 1K payload and no decompression"
;; This is a small enough payload that :unbuffered-stream still buffered it all in memory and so it behaves
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(clojure-blocking-streaming (generate-data 1024) {:as :unbuffered-stream :decompress-body false})))
(deftest clojure-blocking-streaming-with-decompression
(testing "clojure :unbuffered-stream with 1K payload and decompression"
;; This is a small enough payload that :unbuffered-stream still buffered it all in memory and so it behaves
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(clojure-blocking-streaming (generate-data 1024) {:as :unbuffered-stream :decompress-body true})))
@ -179,3 +173,158 @@
(deftest clojure-existing-streaming-with-large-payload-with-decompression
(testing "clojure :stream with 32M payload and decompression"
(clojure-blocking-streaming (generate-data (* 32 1024 1024)) {:as :stream :decompress-body true})))
(defn- java-non-blocking-streaming
"Stream 32M of data (roughly) which is large enough to ensure the client won't buffer it all. Checks the data is
streamed in a non-blocking manner i.e some data is received by the client before the server has finished
transmission"
[decompress-body?]
(testlogging/with-test-logging
(let [data (generate-data (* 32 1024 1024))
opts {:as :unbuffered-stream :decompress-body decompress-body?}]
(testing " - check data can be streamed successfully"
(let [send-more-data (promise)]
(testwebserver/with-test-webserver-and-config
(successful-handler data send-more-data) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options ResponseBodyType/UNBUFFERED_STREAM)
_ (.setDecompressBody request-options decompress-body?)
response (-> client (.get request-options) .deref)
status (.getStatus response)
body (.getBody response)]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(deliver send-more-data true) ;; Indicate we read some chars
(is (= (str data "yyyy") (str "xxxx" (slurp instream)))))))))) ;; Read the rest and validate
(testing " - check socket timeout is handled"
(try
(testwebserver/with-test-webserver-and-config
(blocking-handler data) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options ResponseBodyType/UNBUFFERED_STREAM)
_ (.setDecompressBody request-options decompress-body?)
response (-> client (.get request-options) .deref)
body (.getBody response)
error (.getError response)]
(is (nil? error))
;; Consume the body to get the exception
(is (thrown? SocketTimeoutException (slurp body))))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
)))
(testing " - check connection timeout is handled"
(with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" 12345 "/bad"))
_ (.setAs request-options ResponseBodyType/UNBUFFERED_STREAM)
_ (.setDecompressBody request-options decompress-body?)
response (-> client (.get request-options) .deref)
error (.getError response)]
(is error)
(is (instance? ConnectException error))))))))
(deftest java-non-blocking-streaming-without-decompression
(testing "java :unbuffered-stream with 32MB payload and no decompression"
(java-non-blocking-streaming false)))
(deftest java-non-blocking-streaming-with-decompression
(testing "java :unbuffered-stream with 32MB payload and decompression"
(java-non-blocking-streaming true)))
(defn- java-blocking-streaming
"Stream data that is buffered client-side i.e. in a blocking manner"
[data response-body-type decompress-body?]
(testlogging/with-test-logging
(testing " - check data can be streamed successfully success"
(testwebserver/with-test-webserver-and-config
(successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options response-body-type)
_ (.setDecompressBody request-options decompress-body?)
response (-> client (.get request-options) .deref)
status (.getStatus response)
body (.getBody response)]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(is (= (str data "yyyy") (str "xxxx" (slurp instream))))))))) ;; Read the rest and validate
(testing " - check socket timeout is handled"
(try
(testwebserver/with-test-webserver-and-config
(blocking-handler data) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options response-body-type)
_ (.setDecompressBody request-options decompress-body?)
response (-> client (.get request-options) .deref)
error (.getError response)]
(is (instance? SocketTimeoutException error)))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
)))
(testing " - check connection timeout is handled"
(with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" 12345 "/bad"))
_ (.setAs request-options response-body-type)
_ (.setDecompressBody request-options decompress-body?)
response (-> client (.get request-options) .deref)
error (.getError response)]
(is error)
(is (instance? ConnectException error)))))))
(deftest java-blocking-streaming-without-decompression
(testing "java :unbuffered-stream with 1K payload and no decompression"
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(java-blocking-streaming (generate-data 1024) ResponseBodyType/UNBUFFERED_STREAM false)))
(deftest java-blocking-streaming-with-decompression
(testing "java :unbuffered-stream with 1K payload and decompression"
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(java-blocking-streaming (generate-data 1024) ResponseBodyType/UNBUFFERED_STREAM true)))
(deftest java-existing-streaming-with-small-payload-without-decompression
(testing "java :stream with 1K payload and no decompression"
(java-blocking-streaming (generate-data 1024) ResponseBodyType/STREAM false)))
(deftest java-existing-streaming-with-small-payload-with-decompression
(testing "java :stream with 1K payload and decompression"
(java-blocking-streaming (generate-data 1024) ResponseBodyType/STREAM false)))
(deftest java-existing-streaming-with-large-payload-without-decompression
(testing "java :stream with 32M payload and no decompression"
(java-blocking-streaming (generate-data (* 32 1024 1024)) ResponseBodyType/STREAM false)))
(deftest java-existing-streaming-with-large-payload-with-decompression
(testing "java :stream with 32M payload and decompression"
(java-blocking-streaming (generate-data (* 32 1024 1024)) ResponseBodyType/STREAM true)))

View file

@ -1,45 +0,0 @@
(ns puppetlabs.http.client.decompress-test
(:import (java.io ByteArrayOutputStream ByteArrayInputStream)
(java.util.zip GZIPOutputStream DeflaterInputStream)
(org.apache.commons.io IOUtils)
(com.puppetlabs.http.client.impl JavaClient)
(java.util HashMap))
(:require [clojure.test :refer :all]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]))
(use-fixtures :once schema-test/validate-schemas)
(def compressible-body (apply str (repeat 1000 "f")))
(defn gzip
[s]
(let [baos (ByteArrayOutputStream.)
gos (GZIPOutputStream. baos)]
(-> s
(.getBytes "UTF-8")
(ByteArrayInputStream.)
(IOUtils/copy gos))
(.close gos)
(ByteArrayInputStream. (.toByteArray baos))))
(defn deflate
[s]
(-> s
(.getBytes "UTF-8")
(ByteArrayInputStream.)
(DeflaterInputStream.)))
(deftest gzip-compress-test
(testing "java gzip decompression"
(let [headers (HashMap. {"content-encoding" "gzip"})
response (JavaClient/decompress (gzip compressible-body) headers)]
(is (not (.containsKey headers "content-encoding")))
(is (= compressible-body (slurp response))))))
(deftest deflate-compress-test
(testing "java gzip decompression"
(let [headers (HashMap. {"content-encoding" "deflate"})
response (JavaClient/decompress (deflate compressible-body) headers)]
(is (not (.containsKey headers "content-encoding")))
(is (= compressible-body (slurp response))))))