Incorporate comments from camlow325

- Fixes to ExceptionInsertingPipedInputStream
- Fix to async consumer and removal of decompression code
- Use ResponseContentEncoding to do all decompression including removing previous approach
- Clarify tests and add ConnectionException
This commit is contained in:
Scott Walker 2015-10-06 13:33:55 +01:00
parent ca5ad63179
commit 862090251c
5 changed files with 58 additions and 86 deletions

View file

@ -28,7 +28,8 @@
(org.apache.http.nio.conn.ssl SSLIOSessionStrategy)
(org.apache.http.client.config RequestConfig)
(org.apache.http.nio.client.methods HttpAsyncMethods)
(org.apache.http.nio.client HttpAsyncClient))
(org.apache.http.nio.client HttpAsyncClient)
(org.apache.http.client.protocol ResponseContentEncoding))
(:require [puppetlabs.ssl-utils.core :as ssl]
[clojure.string :as str]
[puppetlabs.kitchensink.core :as ks]
@ -170,24 +171,6 @@
{}
(.getAllHeaders http-response)))
(defmulti decompress (fn [resp] (get-in resp [:headers "content-encoding"])))
(defmethod decompress "gzip"
[resp]
(-> resp
(ks/dissoc-in [:headers "content-encoding"])
(update-in [:body] #(Compression/gunzip %))))
(defmethod decompress "deflate"
[resp]
(-> resp
(ks/dissoc-in [:headers "content-encoding"])
(update-in [:body] #(Compression/inflate %))))
(defmethod decompress nil
[resp]
resp)
(defn- parse-content-type
[content-type-header]
(if (empty? content-type-header)
@ -214,8 +197,11 @@
:status (.. http-response getStatusLine getStatusCode)
:headers headers
:content-type (parse-content-type (headers "content-type"))
:body (when-let [entity (.getEntity http-response)]
(.getContent entity))}))
:body (do
(if (:decompress-body opts)
(.process (ResponseContentEncoding.) http-response nil))
(when-let [entity (.getEntity http-response)]
(.getContent entity)))}))
(schema/defn error-response :- common/ErrorResponse
[opts :- common/UserRequestOptions
@ -248,8 +234,6 @@
http-response :- HttpResponse]
(try
(let [response (cond-> (response-map opts http-response)
(and (:decompress-body opts)
(not= :unbuffered-stream (:as opts))) (decompress)
(and (not= :stream (:as opts))
(not= :unbuffered-stream (:as opts))) (coerce-body-type))]
(deliver-result result opts callback response))

View file

@ -25,7 +25,7 @@ public class ExceptionInsertingPipedInputStream extends PipedInputStream {
@Override
public synchronized int read() throws IOException {
int read = super.read();
if (read == 1) {
if (read == -1) {
checkFinalResult();
}
return read;
@ -40,4 +40,10 @@ public class ExceptionInsertingPipedInputStream extends PipedInputStream {
return read;
}
@Override
public void close() throws IOException {
super.close();
checkFinalResult();
}
}

View file

@ -1,11 +1,6 @@
package com.puppetlabs.http.client.impl;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.entity.DeflateDecompressingEntity;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.methods.AsyncByteConsumer;
@ -36,25 +31,7 @@ public class StreamingAsyncResponseConsumer extends AsyncByteConsumer<HttpRespon
PipedInputStream pis = new ExceptionInsertingPipedInputStream(ioExceptionPromise);
pos = new PipedOutputStream();
pos.connect(pis);
HttpEntity modifiedEntity = new BasicHttpEntity();
((BasicHttpEntity) modifiedEntity).setContent(pis);
HttpEntity entity = response.getEntity();
if (entity != null) {
Header header = entity.getContentEncoding();
if (header != null) {
HeaderElement[] codecs = header.getElements();
for (HeaderElement codec : codecs) {
if (codec.getName().equalsIgnoreCase("gzip")) {
modifiedEntity = new GzipDecompressingEntity(modifiedEntity);
break;
} else if (codec.getName().equalsIgnoreCase("deflate")) {
modifiedEntity = new DeflateDecompressingEntity(modifiedEntity);
break;
}
}
}
}
response.setEntity(modifiedEntity);
((BasicHttpEntity) response.getEntity()).setContent(pis);
this.response = response;
promise.deliver(response);
}
@ -73,8 +50,10 @@ public class StreamingAsyncResponseConsumer extends AsyncByteConsumer<HttpRespon
this.response = null;
this.promise = null;
try {
this.pos.close();
this.pos = null;
if (pos != null) {
this.pos.close();
this.pos = null;
}
} catch (IOException e) {
throw new IllegalStateException(e);
}

View file

@ -1,7 +1,7 @@
(ns puppetlabs.http.client.async-plaintext-test
(:import (com.puppetlabs.http.client Async RequestOptions ClientOptions ResponseBodyType)
(org.apache.http.impl.nio.client HttpAsyncClients)
(java.net URI SocketTimeoutException ServerSocket)
(java.net URI SocketTimeoutException ServerSocket ConnectException)
(java.io PipedInputStream PipedOutputStream)
(java.util.concurrent TimeoutException))
(:require [clojure.test :refer :all]
@ -359,25 +359,27 @@
(is (= "Hello, World!" (:body response)))))))))))
(defn- build-content-handler
[data initial-bytes-read? server-side-failure?]
[data initial-bytes-read? wait-forever?]
(fn [_]
(let [outstream (PipedOutputStream.)
instream (PipedInputStream.)
_ (.connect instream outstream)
outwriter (io/make-writer outstream {})]
instream (PipedInputStream. 4)]
(.connect instream outstream)
;; Return the response immediately and asynchronously stream some data into it
(future
(.write outwriter data)
(.flush outwriter)
(if server-side-failure?
(throw (Exception. "Server side failure!")))
(.write outstream (.getBytes data))
(.flush outstream)
(if wait-forever?
; The :socket-timeout-milliseconds setting on the client means we don't actually block forever and forces
; a SocketTimeoutException on the underlying socket
(deref (promise)))
; Block until the client confirms it has read the first few bytes
; Socket time out ensures we don't block here forever in practice if it does wrong
; Again :socket-timeout-milliseconds on the client ensures we can't really get stuck here, even if the
; test fails
(if initial-bytes-read?
(deref initial-bytes-read?))
; Write the last of the data
(.write outwriter "xxxx")
(.close outwriter))
(.write outstream (.getBytes "xxxx"))
(.close outstream))
{:status 200
:body instream})))
@ -385,17 +387,17 @@
[data-size opts]
(testing (str "clojure streaming success: " data-size " bytes with opts " opts)
(let [data (apply str (repeat (- data-size 4) "x"))
;; We check that we can read some bytes before all are transmitted only if:
;; - :unbuffered-stream is enabled
;; - the total traffic returned is more than about 40K to prevent buffering issues
;; i.e. data-size is large enough and not compressed
;; If :unbuffered-stream is enabled then we check that we can read some bytes from the response before all
;; bytes have actually been transmitted
;; We need to make sure the amount of data sent is enough to ensure it doesn't get buffered by the OS or Jetty
;; About 64K seems to be the threshold
initial-bytes-read? (if (and (= :unbuffered-stream (:as opts))
(not (:decompress-body opts))
(> data-size (* 42 1024)))
(>= data-size (* 64 1024)))
(promise))]
(testwebserver/with-test-webserver-and-config
(build-content-handler data initial-bytes-read? nil) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 1000 :socket-timeout-milliseconds 2000})]
(build-content-handler data initial-bytes-read? false) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100 :socket-timeout-milliseconds 20000})]
(let [response @(common/get
client
(str "http://localhost:" port "/hello")
@ -413,14 +415,14 @@
(let [final-string (str "xxxx" (slurp instream))]
(is (= (str data "xxxx") final-string))))))))))
(defn- clojure-streaming-failure
(defn- clojure-streaming-socket-timeout
[data-size opts]
(testing (str "clojure streaming failure: " data-size " bytes with opts " opts)
(testing (str "clojure streaming socket timeout: " data-size " bytes with opts " opts)
(let [data (apply str (repeat (- data-size 4) "x"))]
(try
(testwebserver/with-test-webserver-and-config
(build-content-handler data nil true) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 1000 :socket-timeout-milliseconds 2000})]
(with-open [client (async/create-client {:connect-timeout-milliseconds 100 :socket-timeout-milliseconds 200})]
(let [response @(common/get
client
(str "http://localhost:" port "/hello")
@ -440,11 +442,24 @@
;; Expected whenever a server-side failure is generated
)))))
(defn- clojure-streaming-connection-error
[opts]
(testing (str "clojure streaming connect error")
(with-open [client (async/create-client {:connect-timeout-milliseconds 100})]
(let [response @(common/get
client
(str "http://localhost:" 12345 "/hello")
opts)
{:keys [error]} response]
(is error)
(is (instance? ConnectException error))))))
(deftest clojure-streaming
(testing "clojure streaming is consistent with different payload sizes and opts"
(testlogging/with-test-logging
(doseq [data-size [32 (* 64 1024) (* 1024 1024)]
(doseq [data-size [32 (* 1024) (* 1024 1024)]
decompress-body? [false true]
as [:unbuffered-stream :stream]]
(clojure-streaming-success data-size {:as as :decompress-body decompress-body?})
(clojure-streaming-failure data-size {:as as :decompress-body decompress-body?})))))
(clojure-streaming-socket-timeout data-size {:as as :decompress-body decompress-body?})
(clojure-streaming-connection-error {:as as :decompress-body decompress-body?})))))

View file

@ -31,12 +31,6 @@
(DeflaterInputStream.)))
(deftest gzip-compress-test
(testing "clojure gzip decompression"
(let [test-response {:headers {"content-encoding" "gzip"}
:body (gzip compressible-body)}
response (async/decompress test-response)]
(is (not (contains? (:headers response) "content-encoding")))
(is (= compressible-body (slurp (:body response))))))
(testing "java gzip decompression"
(let [headers (HashMap. {"content-encoding" "gzip"})
response (JavaClient/decompress (gzip compressible-body) headers)]
@ -44,12 +38,6 @@
(is (= compressible-body (slurp response))))))
(deftest deflate-compress-test
(testing "clojure deflate decompression"
(let [test-response {:headers {"content-encoding" "deflate"}
:body (deflate compressible-body)}
response (async/decompress test-response)]
(is (not (contains? (:headers response) "content-encoding")))
(is (= compressible-body (slurp (:body response))))))
(testing "java gzip decompression"
(let [headers (HashMap. {"content-encoding" "deflate"})
response (JavaClient/decompress (deflate compressible-body) headers)]