clj-http-client/test/puppetlabs/http/client/async_unbuffered_test.clj
Ruth Linehan df4e36a1aa (TK-316) Add metrics support
This commit adds metrics support to the http client (clojure and java, sync
and async). A metric registry can optionally be passed into the client as a
client option on creation. If a metric registry is present, timers will be
added to time each request.

By default, a timer is added for the URL (stripped of username, password,
query string, and path fragments) and the URL plus the method used for the
request. In addition, a request can include a `metric-id` option, which takes
a tuple of metric ids. If this request option is specified, a timer will be
created for each element of the metric id tuple - thus if the tuple is [:foo
:bar :baz] there will be a foo timer, a foo.bar timer, and a foo.bar.baz
timer.

In addition, each timer has a "MetricType" - currently there is only one
metric type, bytes-read, which is stopped when the full response has been
read. In the future, we may add "response-init" timers that get stopped when
the first byte of the response has been read.

This commit also adds a `get-client-metrics`/`.getClientMetrics` function that
takes a client instance and returns the http client-specific metrics from the
metric registry and a `get-client-metrics-data`/`.getClientMetricsData`
function for clojure and java sync and async clients to get out metrics data
from the client. This function takes a client instance and returns a map of
metric name to a map of metric data (for clojure) or a ClientMetricData object
(for java), both of which include the mean, count, and aggregate for the timer

These `get-client-metrics*`/`.getClientMetrics*` functions also have versions
that take a url, url and method, or metric id to allow for filtering of the
timers/metrics data returned by these functions.

The clojure versions of these functions take a metric filter map. There are
also metric filter builder functions to build up the type of metric filter
desired from a url, a url and method, or a metric id. These will prevent users
from having to know the specifics of how to build a metric themselves; instead
they can use a convenience function.

An empty metric id can be passed in to the filter to return all metric-id
timers.
2016-04-19 13:13:10 -07:00

329 lines
17 KiB
Clojure

(ns puppetlabs.http.client.async-unbuffered-test
(:import (com.puppetlabs.http.client Async RequestOptions ClientOptions ResponseBodyType)
(java.net SocketTimeoutException ConnectException)
(java.io PipedInputStream PipedOutputStream)
(java.util.concurrent TimeoutException)
(java.util UUID))
(:require [clojure.test :refer :all]
[puppetlabs.http.client.test-common :refer :all]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[puppetlabs.trapperkeeper.testutils.webserver :as testwebserver]
[puppetlabs.http.client.common :as common]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]))
(use-fixtures :once schema-test/validate-schemas)
(defn generate-data
"Generate data of approximately the requested size, which is moderately compressible"
[data-size]
(apply str "xxxx" (repeatedly (/ data-size 35) #(UUID/randomUUID))))
(defn successful-handler
"A Ring handler that asynchronously sends some data, waits for confirmation the data has been received then sends
some more data"
[data send-more-data]
(fn [_]
(let [outstream (PipedOutputStream.)
instream (PipedInputStream.)]
(.connect instream outstream)
;; Return the response immediately and asynchronously stream some data into it
(future
(.write outstream (.getBytes data))
; Block until the client confirms it has read the first few bytes
; :socket-timeout-milliseconds on the client ensures we can't really get stuck here, even if the test fails
(if send-more-data (deref send-more-data))
; Write the last of the data
(.write outstream (.getBytes "yyyy"))
(.close outstream))
{:status 200
:body instream})))
(defn blocking-handler
"A Ring handler that sends some data but then never closes the socket"
[data]
(fn [_]
(let [outstream (PipedOutputStream.)
instream (PipedInputStream.)]
(.connect instream outstream)
;; Return the response immediately and asynchronously stream some data into it
(future
(.write outstream (.getBytes data)))
{:status 200
:body instream})))
(defn- clojure-non-blocking-streaming
"Stream 32M of data (roughly) which is large enough to ensure the client won't buffer it all. Checks the data is
streamed in a non-blocking manner i.e some data is received by the client before the server has finished
transmission"
[decompress-body?]
(testlogging/with-test-logging
(let [data (generate-data (* 32 1024 1024))
opts {:as :unbuffered-stream :decompress-body decompress-body?}]
(testing " - check data can be streamed successfully"
(let [send-more-data (promise)]
(testwebserver/with-test-webserver-and-config
(successful-handler data send-more-data) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
:socket-timeout-milliseconds 20000})]
(let [response @(common/get client (str "http://localhost:" port "/hello") opts)
{:keys [status body]} response]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(deliver send-more-data true) ;; Indicate we read some chars
(is (= (str data "yyyy") (str "xxxx" (slurp instream)))))))))) ;; Read the rest and validate
(testing " - check socket timeout is handled"
(try
(testwebserver/with-test-webserver-and-config
(blocking-handler data) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
:socket-timeout-milliseconds 200})]
(let [response @(common/get client (str "http://localhost:" port "/hello") opts)
{:keys [body error]} response]
(is (nil? error))
;; Consume the body to get the exception
(is (thrown? SocketTimeoutException (slurp body))))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
)))
(testing " - check connection timeout is handled"
(with-open [client (async/create-client {:connect-timeout-milliseconds 100})]
(let [response @(common/get client (str "http://localhost:" 12345 "/bad") opts)
{:keys [error]} response]
(is error)
(is (instance? ConnectException error))))))))
(deftest clojure-non-blocking-streaming-without-decompression
(testing "clojure :unbuffered-stream with 32MB payload and no decompression"
(clojure-non-blocking-streaming false)))
(deftest clojure-non-blocking-streaming-with-decompression
(testing "clojure :unbuffered-stream with 32MB payload and decompression"
(clojure-non-blocking-streaming true)))
(defn- clojure-blocking-streaming
"Stream data that is buffered client-side i.e. in a blocking manner"
[data opts]
(testlogging/with-test-logging
(testing " - check data can be streamed successfully"
(testwebserver/with-test-webserver-and-config
(successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
:socket-timeout-milliseconds 20000})]
(let [response @(common/get client (str "http://localhost:" port "/hello") opts)
{:keys [status body]} response]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(is (= (str data "yyyy") (str "xxxx" (slurp instream))))))))) ;; Read the rest and validate
(testing " - check socket timeout is handled"
(try
(testwebserver/with-test-webserver-and-config
(blocking-handler data) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
:socket-timeout-milliseconds 200})]
(let [response @(common/get client (str "http://localhost:" port "/hello") opts)
{:keys [error]} response]
(is (instance? SocketTimeoutException error)))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
)))
(testing " - check connection timeout is handled"
(with-open [client (async/create-client {:connect-timeout-milliseconds 100})]
(let [response @(common/get client (str "http://localhost:" 12345 "/bad") opts)
{:keys [error]} response]
(is error)
(is (instance? ConnectException error)))))))
(deftest clojure-blocking-streaming-without-decompression
(testing "clojure :unbuffered-stream with 1K payload and no decompression"
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(clojure-blocking-streaming (generate-data 1024) {:as :unbuffered-stream :decompress-body false})))
(deftest clojure-blocking-streaming-with-decompression
(testing "clojure :unbuffered-stream with 1K payload and decompression"
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(clojure-blocking-streaming (generate-data 1024) {:as :unbuffered-stream :decompress-body true})))
(deftest clojure-existing-streaming-with-small-payload-without-decompression
(testing "clojure :stream with 1K payload and no decompression"
(clojure-blocking-streaming (generate-data 1024) {:as :stream :decompress-body false})))
(deftest clojure-existing-streaming-with-small-payload-with-decompression
(testing "clojure :stream with 1K payload and decompression"
(clojure-blocking-streaming (generate-data 1024) {:as :stream :decompress-body true})))
(deftest clojure-existing-streaming-with-large-payload-without-decompression
(testing "clojure :stream with 32M payload and no decompression"
(clojure-blocking-streaming (generate-data (* 32 1024 1024)) {:as :stream :decompress-body false})))
(deftest clojure-existing-streaming-with-large-payload-with-decompression
(testing "clojure :stream with 32M payload and decompression"
(clojure-blocking-streaming (generate-data (* 32 1024 1024)) {:as :stream :decompress-body true})))
(defn- java-non-blocking-streaming
"Stream 32M of data (roughly) which is large enough to ensure the client won't buffer it all. Checks the data is
streamed in a non-blocking manner i.e some data is received by the client before the server has finished
transmission"
[decompress-body?]
(testlogging/with-test-logging
(let [data (generate-data (* 32 1024 1024))]
(testing " - check data can be streamed successfully"
(let [send-more-data (promise)]
(testwebserver/with-test-webserver-and-config
(successful-handler data send-more-data) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
(.setAs ResponseBodyType/UNBUFFERED_STREAM)
(.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref)
status (.getStatus response)
body (.getBody response)]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(deliver send-more-data true) ;; Indicate we read some chars
(is (= (str data "yyyy") (str "xxxx" (slurp instream)))))))))) ;; Read the rest and validate
(testing " - check socket timeout is handled"
(try
(testwebserver/with-test-webserver-and-config
(blocking-handler data) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
(.setAs ResponseBodyType/UNBUFFERED_STREAM)
(.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref)
body (.getBody response)
error (.getError response)]
(is (nil? error))
;; Consume the body to get the exception
(is (thrown? SocketTimeoutException (slurp body))))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
)))
(testing " - check connection timeout is handled"
(with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" 12345 "/bad"))
(.setAs ResponseBodyType/UNBUFFERED_STREAM)
(.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref)
error (.getError response)]
(is error)
(is (instance? ConnectException error))))))))
(deftest java-non-blocking-streaming-without-decompression
(testing "java :unbuffered-stream with 32MB payload and no decompression"
(java-non-blocking-streaming false)))
(deftest java-non-blocking-streaming-with-decompression
(testing "java :unbuffered-stream with 32MB payload and decompression"
(java-non-blocking-streaming true)))
(defn- java-blocking-streaming
"Stream data that is buffered client-side i.e. in a blocking manner"
[data response-body-type decompress-body?]
(testlogging/with-test-logging
(testing " - check data can be streamed successfully"
(testwebserver/with-test-webserver-and-config
(successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
(.setAs response-body-type)
(.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref)
status (.getStatus response)
body (.getBody response)]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)
_ (.read instream buf)]
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(is (= (str data "yyyy") (str "xxxx" (slurp instream))))))))) ;; Read the rest and validate
(testing " - check socket timeout is handled"
(try
(testwebserver/with-test-webserver-and-config
(blocking-handler data) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
(.setAs response-body-type)
(.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref)
error (.getError response)]
(is (instance? SocketTimeoutException error)))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
)))
(testing " - check connection timeout is handled"
(with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 100)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" 12345 "/bad"))
(.setAs response-body-type)
(.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref)
error (.getError response)]
(is error)
(is (instance? ConnectException error)))))))
(deftest java-blocking-streaming-without-decompression
(testing "java :unbuffered-stream with 1K payload and no decompression"
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(java-blocking-streaming (generate-data 1024) ResponseBodyType/UNBUFFERED_STREAM false)))
(deftest java-blocking-streaming-with-decompression
(testing "java :unbuffered-stream with 1K payload and decompression"
;; This is a small enough payload that :unbuffered-stream still buffers it all in memory and so it behaves
;; identically to :stream
(java-blocking-streaming (generate-data 1024) ResponseBodyType/UNBUFFERED_STREAM true)))
(deftest java-existing-streaming-with-small-payload-without-decompression
(testing "java :stream with 1K payload and no decompression"
(java-blocking-streaming (generate-data 1024) ResponseBodyType/STREAM false)))
(deftest java-existing-streaming-with-small-payload-with-decompression
(testing "java :stream with 1K payload and decompression"
(java-blocking-streaming (generate-data 1024) ResponseBodyType/STREAM true)))
(deftest java-existing-streaming-with-large-payload-without-decompression
(testing "java :stream with 32M payload and no decompression"
(java-blocking-streaming (generate-data (* 32 1024 1024)) ResponseBodyType/STREAM false)))
(deftest java-existing-streaming-with-large-payload-with-decompression
(testing "java :stream with 32M payload and decompression"
(java-blocking-streaming (generate-data (* 32 1024 1024)) ResponseBodyType/STREAM true)))