(ns puppetlabs.http.client.async-plaintext-test
(:import (com.puppetlabs.http.client Async RequestOptions ClientOptions ResponseBodyType)
(org.apache.http.impl.nio.client HttpAsyncClients)
( URI SocketTimeoutException ServerSocket ConnectException)
( PipedInputStream PipedOutputStream)
(java.util.concurrent TimeoutException))
(:require [clojure.test :refer :all]
[ :as io]
[puppetlabs.http.client.test-common :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[puppetlabs.trapperkeeper.testutils.webserver :as testwebserver]
[ :as jetty9]
[puppetlabs.http.client.common :as common]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]
[ :as log]))
(use-fixtures :once schema-test/validate-schemas)
(defn app
{:status 200
:body "Hello, World!"})
(tk/defservice test-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler app "/hello")
(deftest persistent-async-client-test
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java async client"
(let [request-options (RequestOptions. (URI. "http://localhost:10000/hello/"))
client-options (ClientOptions.)
client (Async/createClient client-options)]
(testing "HEAD request with persistent async client"
(let [response (.head client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= nil (.getBody (.deref response))))))
(testing "GET request with persistent async client"
(let [response (.get client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "POST request with persistent async client"
(let [response (.post client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "PUT request with persistent async client"
(let [response (.put client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "DELETE request with persistent async client"
(let [response (.delete client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "TRACE request with persistent async client"
(let [response (.trace client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "OPTIONS request with persistent async client"
(let [response (.options client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "PATCH request with persistent async client"
(let [response (.patch client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "client closes properly"
(.close client)
(is (thrown? IllegalStateException
(.get client request-options))))))
(testing "clojure async client"
(let [client (async/create-client {})]
(testing "HEAD request with persistent async client"
(let [response (common/head client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= nil (:body @response)))))
(testing "GET request with persistent async client"
(let [response (common/get client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "POST request with persistent async client"
(let [response (common/post client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PUT request with persistent async client"
(let [response (common/put client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "DELETE request with persistent async client"
(let [response (common/delete client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "TRACE request with persistent async client"
(let [response (common/trace client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "OPTIONS request with persistent async client"
(let [response (common/options client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PATCH request with persistent async client"
(let [response (common/patch client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "client closes properly"
(common/close client)
(is (thrown? IllegalStateException
(common/get client
(deftest request-with-client-test
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(let [client (HttpAsyncClients/createDefault)
opts {:method :get :url "http://localhost:10000/hello/"}]
(.start client)
(testing "GET request works with request-with-client"
(let [response (async/request-with-client opts nil client)]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "Client persists when passed to request-with-client"
(let [response (async/request-with-client opts nil client)]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(.close client)))))
(deftest query-params-test-async
(testutils/with-app-with-config app
[jetty9/jetty9-service test-params-web-service]
{:webserver {:port 8080}}
(testing "URL Query Parameters work with the Java client"
(let [client (Async/createClient (ClientOptions.))]
(let [request-options (RequestOptions.
(URI. "http://localhost:8080/params?foo=bar&baz=lux"))
response (.get client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= queryparams (read-string (slurp (.getBody
(.deref response)))))))
(.close client)))))
(testing "URL Query Parameters work with the clojure client"
(with-open [client (async/create-client {})]
(let [opts {:method :get
:url "http://localhost:8080/params/"
:query-params queryparams
:as :text}
response (common/get client "http://localhost:8080/params" opts)]
(is (= 200 (:status @response)))
(is (= queryparams (read-string (:body @response)))))))
(testing "URL Query Parameters can be set directly in the URL"
(with-open [client (async/create-client {})]
(let [response (common/get client
{:as :text})]
(is (= 200 (:status @response)))
(is (= (str {"paramone" "one"}) (:body @response))))))
(testing (str "URL Query Parameters set in URL are overwritten if params "
"are also specified in options map")
(with-open [client (async/create-client {})]
(let [response (common/get client
(is (= 200 (:status @response)))
(is (= queryparams (read-string (:body @response))))))))))
(deftest redirect-test-async
(testutils/with-app-with-config app
[jetty9/jetty9-service redirect-web-service]
{:webserver {:port 8080}}
(testing (str "redirects on POST not followed by persistent Java client "
"when forceRedirects option not set to true")
(let [client (Async/createClient (ClientOptions.))]
(let [request-options (RequestOptions.
(URI. "http://localhost:8080/hello"))
response (.post client request-options)]
(is (= 302 (.getStatus (.deref response)))))
(.close client)))))
(testing "redirects on POST followed by Java client when option is set"
(let [client (Async/createClient (.. (ClientOptions.)
(setForceRedirects true)))]
(let [request-options (RequestOptions.
(URI. "http://localhost:8080/hello"))
response (.post client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response))))))
(.close client)))))
(testing "redirects not followed by Java client when :follow-redirects is false"
(let [client (Async/createClient (.. (ClientOptions.)
(setFollowRedirects false)))]
(let [request-options (RequestOptions.
(URI. "http://localhost:8080/hello"))
response (.get client request-options)]
(is (= 302 (.getStatus (.deref response)))))
(.close client)))))
(testing ":follow-redirects overrides :force-redirects for Java client"
(let [client (Async/createClient (.. (ClientOptions.)
(setFollowRedirects false)
(setForceRedirects true)))]
(let [request-options (RequestOptions.
(URI. "http://localhost:8080/hello"))
response (.get client request-options)]
(is (= 302 (.getStatus (.deref response)))))
(.close client)))))
(testing (str "redirects on POST not followed by clojure client "
"when :force-redirects is not set to true")
(with-open [client (async/create-client {:force-redirects false})]
(let [opts {:method :post
:url "http://localhost:8080/hello"
:as :text}
response (common/post client "http://localhost:8080/hello" opts)]
(is (= 302 (:status @response))))))
(testing (str "redirects on POST followed by persistent clojure client "
"when option is set")
(with-open [client (async/create-client {:force-redirects true})]
(let [response (common/post client
{:as :text})]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (:body @response))))))
(testing (str "persistent clojure client does not follow redirects when "
":follow-redirects is set to false")
(with-open [client (async/create-client {:follow-redirects false})]
(let [response (common/get client
{:as :text})]
(is (= 302 (:status @response))))))
(testing ":follow-redirects overrides :force-redirects with persistent clj client"
(with-open [client (async/create-client {:follow-redirects false
:force-redirects true})]
(let [response (common/get client
{:as :text})]
(is (= 302 (:status @response)))))))))
(deftest short-connect-timeout-persistent-java-test-async
(testing (str "connection times out properly for java persistent client "
"async request with short timeout")
(with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 250)
(let [request-options (RequestOptions. "")
time-before-connect (System/currentTimeMillis)]
(is (connect-exception-thrown? (-> client
(.get request-options)
"Unexpected result for connection attempt")
(is (elapsed-within-range? time-before-connect 2000)
"Connection attempt took significantly longer than timeout")))))
(deftest short-connect-timeout-persistent-clojure-test-async
(testing (str "connection times out properly for clojure persistent client "
"async request with short timeout")
(with-open [client (async/create-client
{:connect-timeout-milliseconds 250})]
(let [time-before-connect (System/currentTimeMillis)]
(is (connect-exception-thrown? (-> @(common/get
"Unexpected result for connection attempt")
(is (elapsed-within-range? time-before-connect 2000)
"Connection attempt took significantly longer than timeout")))))
(deftest longer-connect-timeout-test-async
(testing "connection succeeds for async request with longer connect timeout"
(testwebserver/with-test-webserver app port
(let [url (str "http://localhost:" port "/hello")]
(testing "java persistent async client"
(with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 2000)
(let [response (-> client
(.get (RequestOptions. url))
(is (= 200 (.getStatus response)))
(is (= "Hello, World!" (slurp (.getBody response)))))))
(testing "clojure persistent async client"
(with-open [client (async/create-client
{:connect-timeout-milliseconds 2000})]
(let [response @(common/get client url {:as :text})]
(is (= 200 (:status response)))
(is (= "Hello, World!" (:body response)))))))))))
(deftest short-socket-timeout-persistent-java-test-async
(testing (str "socket read times out properly for persistent java async "
"request with short timeout")
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 1)
server (ServerSocket. 0)]
(let [request-options (-> ""
(str (.getLocalPort server))
time-before-connect (System/currentTimeMillis)]
(is (instance? SocketTimeoutException (-> client
(.get request-options)
"Unexpected result for get attempt")
(is (elapsed-within-range? time-before-connect 2000)
"Get attempt took significantly longer than timeout")))))
(deftest short-socket-timeout-persistent-clojure-test-async
(testing (str "socket read times out properly for clojure persistent client "
"async request with short timeout")
(with-open [client (async/create-client
{:socket-timeout-milliseconds 250})
server (ServerSocket. 0)]
(let [url (str "" (.getLocalPort server))
time-before-connect (System/currentTimeMillis)]
(is (instance? SocketTimeoutException
(-> @(common/get client url)
"Unexpected result for get attempt")
(is (elapsed-within-range? time-before-connect 2000)
"Get attempt took significantly longer than timeout")))))
(deftest longer-socket-timeout-test-async
(testing "get succeeds for async request with longer socket timeout"
(testwebserver/with-test-webserver app port
(let [url (str "http://localhost:" port "/hello")]
(testing "java persistent async client"
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 2000)
(let [response (-> client
(.get (RequestOptions. url))
(is (= 200 (.getStatus response)))
(is (= "Hello, World!" (slurp (.getBody response)))))))
(testing "clojure persistent async client"
(with-open [client (async/create-client
{:socket-timeout-milliseconds 2000})]
(let [response @(common/get client url {:as :text})]
(is (= 200 (:status response)))
(is (= "Hello, World!" (:body response)))))))))))
(defn- build-content-handler
[data initial-bytes-read? wait-forever?]
(fn [_]
(let [outstream (PipedOutputStream.)
instream (PipedInputStream. 4)]
(.connect instream outstream)
;; Return the response immediately and asynchronously stream some data into it
(.write outstream (.getBytes data))
(.flush outstream)
(if wait-forever?
; The :socket-timeout-milliseconds setting on the client means we don't actually block forever and forces
; a SocketTimeoutException on the underlying socket
(deref (promise)))
; Block until the client confirms it has read the first few bytes
; Again :socket-timeout-milliseconds on the client ensures we can't really get stuck here, even if the
; test fails
(if initial-bytes-read?
(deref initial-bytes-read?))
; Write the last of the data
(.write outstream (.getBytes "xxxx"))
(.close outstream))
{:status 200
:body instream})))
(defn- clojure-streaming-success
[data-size opts]
(testing (str "clojure streaming success: " data-size " bytes with opts " opts)
(let [data (apply str (repeat (- data-size 4) "x"))
;; If :unbuffered-stream is enabled then we check that we can read some bytes from the response before all
;; bytes have actually been transmitted
;; We need to make sure the amount of data sent is enough to ensure it doesn't get buffered by the OS or Jetty
;; About 64K seems to be the threshold
initial-bytes-read? (if (and (= :unbuffered-stream (:as opts))
(not (:decompress-body opts))
(>= data-size (* 64 1024)))
(build-content-handler data initial-bytes-read? false) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100 :socket-timeout-milliseconds 20000})]
(let [response @(common/get
(str "http://localhost:" port "/hello")
{:keys [status body]} response]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
;; Make sure we can read a few chars off of the stream
(is (= "xxxx" (String. buf "UTF-8")))
;; Indicate we read some chars
(if initial-bytes-read? (deliver initial-bytes-read? true))
;; Read the rest and validate the content
(let [final-string (str "xxxx" (slurp instream))]
(is (= (str data "xxxx") final-string))))))))))
(defn- clojure-streaming-socket-timeout
[data-size opts]
(testing (str "clojure streaming socket timeout: " data-size " bytes with opts " opts)
(let [data (apply str (repeat (- data-size 4) "x"))]
(build-content-handler data nil true) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100 :socket-timeout-milliseconds 200})]
(let [response @(common/get
(str "http://localhost:" port "/hello")
{:keys [body error]} response]
(if (= :unbuffered-stream (:as opts))
(if error
;; If there's an error this should behave the same as :stream
(is (instance? SocketTimeoutException error))
;; If there is no error, let's consume the body to get the exception
(is (thrown? SocketTimeoutException (slurp body)))))
(is error)
(is (instance? SocketTimeoutException error)))))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
(defn- clojure-streaming-connection-error
(testing (str "clojure streaming connect error")
(with-open [client (async/create-client {:connect-timeout-milliseconds 100})]
(let [response @(common/get
(str "http://localhost:" 12345 "/hello")
{:keys [error]} response]
(is error)
(is (instance? ConnectException error))))))
(deftest clojure-streaming
(testing "clojure streaming is consistent with different payload sizes and opts"
(doseq [data-size [32 (* 1024) (* 1024 1024)]
decompress-body? [false true]
as [:unbuffered-stream :stream]]
(clojure-streaming-success data-size {:as as :decompress-body decompress-body?})
(clojure-streaming-socket-timeout data-size {:as as :decompress-body decompress-body?})
(clojure-streaming-connection-error {:as as :decompress-body decompress-body?})))))