(TK-316) Add metrics support

This commit adds metrics support to the http client (clojure and java, sync
and async). A metric registry can optionally be passed into the client as a
client option on creation. If a metric registry is present, timers will be
added to time each request.

By default, a timer is added for the URL (stripped of username, password,
query string, and path fragments) and the URL plus the method used for the
request. In addition, a request can include a `metric-id` option, which takes
a tuple of metric ids. If this request option is specified, a timer will be
created for each element of the metric id tuple - thus if the tuple is [:foo
:bar :baz] there will be a foo timer, a foo.bar timer, and a foo.bar.baz
timer.

In addition, each timer has a "MetricType" - currently there is only one
metric type, bytes-read, which is stopped when the full response has been
read. In the future, we may add "response-init" timers that get stopped when
the first byte of the response has been read.

This commit also adds a `get-client-metrics`/`.getClientMetrics` function that
takes a client instance and returns the http client-specific metrics from the
metric registry and a `get-client-metrics-data`/`.getClientMetricsData`
function for clojure and java sync and async clients to get out metrics data
from the client. This function takes a client instance and returns a map of
metric name to a map of metric data (for clojure) or a ClientMetricData object
(for java), both of which include the mean, count, and aggregate for the timer

These `get-client-metrics*`/`.getClientMetrics*` functions also have versions
that take a url, url and method, or metric id to allow for filtering of the
timers/metrics data returned by these functions.

The clojure versions of these functions take a metric filter map. There are
also metric filter builder functions to build up the type of metric filter
desired from a url, a url and method, or a metric id. These will prevent users
from having to know the specifics of how to build a metric themselves; instead
they can use a convenience function.

An empty metric id can be passed in to the filter to return all metric-id
timers.
This commit is contained in:
Ruth Linehan 2016-02-25 21:39:21 -08:00
parent fb39b41260
commit df4e36a1aa
22 changed files with 1456 additions and 76 deletions

View file

@ -15,9 +15,11 @@
:dependencies [[org.clojure/clojure "1.7.0"] :dependencies [[org.clojure/clojure "1.7.0"]
[org.apache.httpcomponents/httpasyncclient "4.1.1"] [org.apache.httpcomponents/httpasyncclient "4.1.1"]
[org.apache.commons/commons-lang3 "3.4"]
[prismatic/schema "1.0.4"] [prismatic/schema "1.0.4"]
[org.slf4j/slf4j-api "1.7.13"] [org.slf4j/slf4j-api "1.7.13"]
[commons-io "2.4"] [commons-io "2.4"]
[io.dropwizard.metrics/metrics-core "3.1.2"]
[puppetlabs/ssl-utils "0.8.1"]] [puppetlabs/ssl-utils "0.8.1"]]

View file

@ -15,6 +15,7 @@
(org.apache.http.client.utils URIBuilder) (org.apache.http.client.utils URIBuilder)
(org.apache.http.nio.client HttpAsyncClient)) (org.apache.http.nio.client HttpAsyncClient))
(:require [puppetlabs.http.client.common :as common] (:require [puppetlabs.http.client.common :as common]
[puppetlabs.http.client.metrics :as metrics]
[schema.core :as schema]) [schema.core :as schema])
(:refer-clojure :exclude (get))) (:refer-clojure :exclude (get)))
@ -24,7 +25,7 @@
(schema/defn ^:always-validate create-default-client :- HttpAsyncClient (schema/defn ^:always-validate create-default-client :- HttpAsyncClient
[{:keys [ssl-context ssl-ca-cert ssl-cert ssl-key ssl-protocols cipher-suites [{:keys [ssl-context ssl-ca-cert ssl-cert ssl-key ssl-protocols cipher-suites
follow-redirects force-redirects connect-timeout-milliseconds follow-redirects force-redirects connect-timeout-milliseconds
socket-timeout-milliseconds]}:- common/ClientOptions] socket-timeout-milliseconds metric-registry]}:- common/ClientOptions]
(let [client-options (ClientOptions.)] (let [client-options (ClientOptions.)]
(cond-> client-options (cond-> client-options
(some? ssl-context) (.setSslContext ssl-context) (some? ssl-context) (.setSslContext ssl-context)
@ -38,7 +39,8 @@
(some? connect-timeout-milliseconds) (some? connect-timeout-milliseconds)
(.setConnectTimeoutMilliseconds connect-timeout-milliseconds) (.setConnectTimeoutMilliseconds connect-timeout-milliseconds)
(some? socket-timeout-milliseconds) (some? socket-timeout-milliseconds)
(.setSocketTimeoutMilliseconds socket-timeout-milliseconds)) (.setSocketTimeoutMilliseconds socket-timeout-milliseconds)
(some? metric-registry) (.setMetricRegistry metric-registry))
(JavaClient/createClient client-options))) (JavaClient/createClient client-options)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -110,6 +112,11 @@
:text ResponseBodyType/TEXT :text ResponseBodyType/TEXT
ResponseBodyType/STREAM)) ResponseBodyType/STREAM))
(defn parse-metric-id
[opts]
(when-let [metric-id (:metric-id opts)]
(into-array (map name metric-id))))
(schema/defn clojure-options->java :- RequestOptions (schema/defn clojure-options->java :- RequestOptions
[opts :- common/RequestOptions] [opts :- common/RequestOptions]
(-> (parse-url opts) (-> (parse-url opts)
@ -117,7 +124,8 @@
(.setAs (clojure-response-body-type->java opts)) (.setAs (clojure-response-body-type->java opts))
(.setBody (:body opts)) (.setBody (:body opts))
(.setDecompressBody (clojure.core/get opts :decompress-body true)) (.setDecompressBody (clojure.core/get opts :decompress-body true))
(.setHeaders (:headers opts)))) (.setHeaders (:headers opts))
(.setMetricId (parse-metric-id opts))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Public ;;; Public
@ -144,10 +152,17 @@
* :as - used to control the data type of the response body. Supported values * :as - used to control the data type of the response body. Supported values
are `:text` and `:stream`, which will return a `String` or an are `:text` and `:stream`, which will return a `String` or an
`InputStream`, respectively. Defaults to `:stream`. `InputStream`, respectively. Defaults to `:stream`.
* :query-params - used to set the query parameters of an http request" * :query-params - used to set the query parameters of an http request
[opts :- common/RawUserRequestOptions * :metric-id - array of strings or keywords, used to set the metrics to be
timed for the request."
([opts :- common/RawUserRequestOptions
callback :- common/ResponseCallbackFn callback :- common/ResponseCallbackFn
client :- HttpAsyncClient] client :- HttpAsyncClient]
(request-with-client opts callback client nil))
([opts :- common/RawUserRequestOptions
callback :- common/ResponseCallbackFn
client :- HttpAsyncClient
metric-registry :- common/OptionalMetricRegistry]
(let [result (promise) (let [result (promise)
defaults {:headers {} defaults {:headers {}
:body nil :body nil
@ -157,8 +172,9 @@
java-request-options (clojure-options->java opts) java-request-options (clojure-options->java opts)
java-method (clojure-method->java opts) java-method (clojure-method->java opts)
response-delivery-delegate (get-response-delivery-delegate opts result)] response-delivery-delegate (get-response-delivery-delegate opts result)]
(JavaClient/requestWithClient java-request-options java-method callback client response-delivery-delegate) (JavaClient/requestWithClient java-request-options java-method callback
result)) client response-delivery-delegate metric-registry)
result)))
(schema/defn create-client :- (schema/protocol common/HTTPClient) (schema/defn create-client :- (schema/protocol common/HTTPClient)
"Creates a client to be used for making one or more HTTP requests. "Creates a client to be used for making one or more HTTP requests.
@ -186,6 +202,8 @@
* :cipher-suites - used to set the cipher suites that the client could * :cipher-suites - used to set the cipher suites that the client could
select from when talking to the server. Defaults to the complete select from when talking to the server. Defaults to the complete
set of suites supported by the underlying language runtime. set of suites supported by the underlying language runtime.
* :metric-registry - a MetricRegistry instance used to collect metrics
on client requests.
opts (ssl-specific where only one of the following combinations permitted): opts (ssl-specific where only one of the following combinations permitted):
@ -201,7 +219,8 @@
* :ssl-ca-cert - path to a PEM file containing the CA cert" * :ssl-ca-cert - path to a PEM file containing the CA cert"
[opts :- common/ClientOptions] [opts :- common/ClientOptions]
(let [client (create-default-client opts)] (let [client (create-default-client opts)
metric-registry (:metric-registry opts)]
(reify common/HTTPClient (reify common/HTTPClient
(get [this url] (common/get this url {})) (get [this url] (common/get this url {}))
(get [this url opts] (common/make-request this url :get opts)) (get [this url opts] (common/make-request this url :get opts))
@ -216,9 +235,15 @@
(trace [this url] (common/trace this url {})) (trace [this url] (common/trace this url {}))
(trace [this url opts] (common/make-request this url :trace opts)) (trace [this url opts] (common/make-request this url :trace opts))
(options [this url] (common/options this url {})) (options [this url] (common/options this url {}))
(options [this url opts] (common/make-request this url :post opts)) (options [this url opts] (common/make-request this url :options opts))
(patch [this url] (common/patch this url {})) (patch [this url] (common/patch this url {}))
(patch [this url opts] (common/make-request this url :patch opts)) (patch [this url opts] (common/make-request this url :patch opts))
(make-request [this url method] (common/make-request this url method {})) (make-request [this url method] (common/make-request this url method {}))
(make-request [_ url method opts] (request-with-client (assoc opts :method method :url url) nil client)) (make-request [_ url method opts] (request-with-client
(close [_] (.close client))))) (assoc opts :method method :url url)
nil client metric-registry))
(close [_] (.close client))
(get-client-metrics [_] (metrics/get-client-metrics metric-registry))
(get-client-metrics [_ metric-filter] (metrics/get-client-metrics metric-registry metric-filter))
(get-client-metrics-data [_] (metrics/get-client-metrics-data metric-registry))
(get-client-metrics-data [_ metric-filter] (metrics/get-client-metrics-data metric-registry metric-filter)))))

View file

@ -1,6 +1,7 @@
(ns puppetlabs.http.client.common (ns puppetlabs.http.client.common
(:import (java.net URL) (:import (java.net URL)
(javax.net.ssl SSLContext) (javax.net.ssl SSLContext)
(com.codahale.metrics MetricRegistry Timer)
(clojure.lang IBlockingDeref) (clojure.lang IBlockingDeref)
(java.io InputStream) (java.io InputStream)
(java.nio.charset Charset)) (java.nio.charset Charset))
@ -20,7 +21,9 @@
(options [this url] [this url opts]) (options [this url] [this url opts])
(patch [this url] [this url opts]) (patch [this url] [this url opts])
(make-request [this url method] [this url method opts]) (make-request [this url method] [this url method opts])
(close [this])) (close [this])
(get-client-metrics [this] [this metric-filter])
(get-client-metrics-data [this] [this metric-filter]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Schemas ;;; Schemas
@ -38,8 +41,10 @@
(def BodyType (def BodyType
(schema/enum :text :stream :unbuffered-stream)) (schema/enum :text :stream :unbuffered-stream))
(def MetricId [(schema/either schema/Str schema/Keyword)])
(def RawUserRequestClientOptions (def RawUserRequestClientOptions
"The list of Request and client options passed by a user into "The list of request and client options passed by a user into
the request function. Allows the user to configure the request function. Allows the user to configure
both a client and a request." both a client and a request."
{:url UrlOrString {:url UrlOrString
@ -49,6 +54,7 @@
(ok :decompress-body) schema/Bool (ok :decompress-body) schema/Bool
(ok :as) BodyType (ok :as) BodyType
(ok :query-params) {schema/Str schema/Str} (ok :query-params) {schema/Str schema/Str}
(ok :metric-id) [schema/Str]
(ok :ssl-context) SSLContext (ok :ssl-context) SSLContext
(ok :ssl-cert) UrlOrString (ok :ssl-cert) UrlOrString
@ -70,7 +76,8 @@
(ok :body) Body (ok :body) Body
(ok :decompress-body) schema/Bool (ok :decompress-body) schema/Bool
(ok :as) BodyType (ok :as) BodyType
(ok :query-params) {schema/Str schema/Str}}) (ok :query-params) {schema/Str schema/Str}
(ok :metric-id) MetricId})
(def RequestOptions (def RequestOptions
"The options from UserRequestOptions that have to do with the "The options from UserRequestOptions that have to do with the
@ -83,7 +90,8 @@
:body Body :body Body
:decompress-body schema/Bool :decompress-body schema/Bool
:as BodyType :as BodyType
(ok :query-params) {schema/Str schema/Str}}) (ok :query-params) {schema/Str schema/Str}
(ok :metric-id) MetricId})
(def SslContextOptions (def SslContextOptions
{:ssl-context SSLContext}) {:ssl-context SSLContext})
@ -104,7 +112,8 @@
{(ok :force-redirects) schema/Bool {(ok :force-redirects) schema/Bool
(ok :follow-redirects) schema/Bool (ok :follow-redirects) schema/Bool
(ok :connect-timeout-milliseconds) schema/Int (ok :connect-timeout-milliseconds) schema/Int
(ok :socket-timeout-milliseconds) schema/Int}) (ok :socket-timeout-milliseconds) schema/Int
(ok :metric-registry) MetricRegistry})
(def UserRequestOptions (def UserRequestOptions
"A cleaned-up version of RawUserRequestClientOptions, which is formed after "A cleaned-up version of RawUserRequestClientOptions, which is formed after
@ -149,3 +158,34 @@
(def Response (def Response
(schema/either NormalResponse ErrorResponse)) (schema/either NormalResponse ErrorResponse))
(def HTTPMethod
(schema/enum :delete :get :head :option :patch :post :put :trace))
(def OptionalMetricRegistry
(schema/maybe MetricRegistry))
(def Metrics
{schema/Str Timer})
(def MetricData
{:metric-name schema/Str
:count schema/Int
:mean schema/Num
:aggregate schema/Num})
(def MetricsData
{schema/Str MetricData})
(def MetricTypes
(schema/enum :bytes-read))
(def MetricFilter
(schema/conditional
#(contains? % :url)
{:metric-type MetricTypes
:url schema/Str
(ok :method) HTTPMethod}
#(contains? % :metric-id)
{:metric-type MetricTypes
:metric-id MetricId}))

View file

@ -0,0 +1,94 @@
(ns puppetlabs.http.client.metrics
(:require [schema.core :as schema]
[puppetlabs.http.client.common :as common])
(:import (com.codahale.metrics Timer)
(java.util.concurrent TimeUnit)
(com.puppetlabs.http.client.impl Metrics$MetricType Metrics)))
(schema/defn get-mean :- schema/Num
[timer :- Timer]
(->> timer
.getSnapshot
.getMean
(.toMillis TimeUnit/NANOSECONDS)))
(defn get-metric-data
[timer metric-name]
(let [count (.getCount timer)
mean (get-mean timer)
aggregate (* count mean)]
{:count count
:mean mean
:aggregate aggregate
:metric-name metric-name}))
(defn get-metrics-data
[timers]
(reduce (fn [acc [metric-name timer]]
(assoc acc metric-name (get-metric-data timer metric-name)))
{} timers))
(defn get-java-metric-type
[metric-type]
(case metric-type
:bytes-read Metrics$MetricType/BYTES_READ))
(defn uppercase-method
[method]
(clojure.string/upper-case (name method)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Public
(schema/defn ^:always-validate filter-with-metric-id :- common/MetricFilter
[metric-id :- common/MetricId]
{:metric-id metric-id
:metric-type :bytes-read})
(schema/defn ^:always-validate filter-with-url :- common/MetricFilter
[url :- schema/Str]
{:url url
:metric-type :bytes-read})
(schema/defn ^:always-validate filter-with-url-and-method :- common/MetricFilter
[url :- schema/Str
method :- common/HTTPMethod]
{:url url
:method method
:metric-type :bytes-read})
(schema/defn ^:always-validate get-client-metrics :- (schema/maybe common/Metrics)
"Returns the http client-specific metrics from the metric registry."
([metric-registry :- common/OptionalMetricRegistry]
(when metric-registry
(into {} (Metrics/getClientMetrics metric-registry))))
([metric-registry :- common/OptionalMetricRegistry
metric-filter :- common/MetricFilter]
(when metric-registry
(cond
(:method metric-filter) (into {} (Metrics/getClientMetricsWithUrlAndMethod
metric-registry
(:url metric-filter)
(uppercase-method (:method metric-filter))
(get-java-metric-type (:metric-type metric-filter))))
(:url metric-filter) (into {} (Metrics/getClientMetricsWithUrl
metric-registry
(:url metric-filter)
(get-java-metric-type (:metric-type metric-filter))))
(:metric-id metric-filter) (into {} (Metrics/getClientMetricsWithMetricId
metric-registry
(into-array String (map name (:metric-id metric-filter)))
(get-java-metric-type (:metric-type metric-filter))))
:else (throw (IllegalArgumentException. "Not an allowed metric filter."))))))
(schema/defn ^:always-validate get-client-metrics-data :- (schema/maybe common/MetricsData)
"Returns a map of metric-id to metric data summary."
([metric-registry :- common/OptionalMetricRegistry]
(when metric-registry
(let [timers (get-client-metrics metric-registry)]
(get-metrics-data timers))))
([metric-registry :- common/OptionalMetricRegistry
metric-filter :- common/MetricFilter]
(when metric-registry
(let [timers (get-client-metrics metric-registry metric-filter)]
(get-metrics-data timers)))))

View file

@ -4,7 +4,8 @@
(ns puppetlabs.http.client.sync (ns puppetlabs.http.client.sync
(:require [puppetlabs.http.client.async :as async] (:require [puppetlabs.http.client.async :as async]
[schema.core :as schema] [schema.core :as schema]
[puppetlabs.http.client.common :as common]) [puppetlabs.http.client.common :as common]
[puppetlabs.http.client.metrics :as metrics])
(:refer-clojure :exclude (get))) (:refer-clojure :exclude (get)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -23,23 +24,26 @@
(select-keys opts [:url :method :headers :body :decompress-body :as :query-params])) (select-keys opts [:url :method :headers :body :decompress-body :as :query-params]))
(defn request-with-client (defn request-with-client
[req client] ([req client]
(let [{:keys [error] :as resp} @(async/request-with-client req nil client)] (request-with-client req client nil))
([req client metric-registry]
(let [{:keys [error] :as resp} @(async/request-with-client
req nil client metric-registry)]
(if error (if error
(throw error) (throw error)
resp))) resp))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Public ;;; Public
(defn request (defn request
[req] [req]
(with-open [client (async/create-default-client (extract-client-opts req))] (with-open [client (async/create-default-client (extract-client-opts req))]
(request-with-client (extract-request-opts req) client))) (request-with-client (extract-request-opts req) client nil)))
(schema/defn create-client :- (schema/protocol common/HTTPClient) (schema/defn create-client :- (schema/protocol common/HTTPClient)
[opts :- common/ClientOptions] [opts :- common/ClientOptions]
(let [client (async/create-default-client opts)] (let [client (async/create-default-client opts)
metric-registry (:metric-registry opts)]
(reify common/HTTPClient (reify common/HTTPClient
(get [this url] (common/get this url {})) (get [this url] (common/get this url {}))
(get [this url opts] (common/make-request this url :get opts)) (get [this url opts] (common/make-request this url :get opts))
@ -58,8 +62,14 @@
(patch [this url] (common/patch this url {})) (patch [this url] (common/patch this url {}))
(patch [this url opts] (common/make-request this url :patch opts)) (patch [this url opts] (common/make-request this url :patch opts))
(make-request [this url method] (common/make-request this url method {})) (make-request [this url method] (common/make-request this url method {}))
(make-request [_ url method opts] (request-with-client (assoc opts :method method :url url) client)) (make-request [_ url method opts] (request-with-client
(close [_] (.close client))))) (assoc opts :method method :url url)
client metric-registry))
(close [_] (.close client))
(get-client-metrics [_] (metrics/get-client-metrics metric-registry))
(get-client-metrics [_ metric-filter] (metrics/get-client-metrics metric-registry metric-filter))
(get-client-metrics-data [_] (metrics/get-client-metrics-data metric-registry))
(get-client-metrics-data [_ metric-filter] (metrics/get-client-metrics-data metric-registry metric-filter)))))
(defn get (defn get
"Issue a synchronous HTTP GET request. This will raise an exception if an "Issue a synchronous HTTP GET request. This will raise an exception if an

View file

@ -4,6 +4,7 @@ import com.puppetlabs.http.client.impl.SslUtils;
import com.puppetlabs.http.client.impl.JavaClient; import com.puppetlabs.http.client.impl.JavaClient;
import com.puppetlabs.http.client.impl.PersistentAsyncHttpClient; import com.puppetlabs.http.client.impl.PersistentAsyncHttpClient;
import com.puppetlabs.http.client.impl.CoercedClientOptions; import com.puppetlabs.http.client.impl.CoercedClientOptions;
import com.codahale.metrics.MetricRegistry;
/** /**
* This class allows you to create an AsyncHttpClient for use in making * This class allows you to create an AsyncHttpClient for use in making
@ -20,6 +21,6 @@ public class Async {
* @return an AsyncHttpClient that can be used to make requests * @return an AsyncHttpClient that can be used to make requests
*/ */
public static AsyncHttpClient createClient(ClientOptions clientOptions) { public static AsyncHttpClient createClient(ClientOptions clientOptions) {
return new PersistentAsyncHttpClient(JavaClient.createClient(clientOptions)); return new PersistentAsyncHttpClient(JavaClient.createClient(clientOptions), clientOptions.getMetricRegistry());
} }
} }

View file

@ -1,10 +1,14 @@
package com.puppetlabs.http.client; package com.puppetlabs.http.client;
import com.codahale.metrics.Timer;
import com.puppetlabs.http.client.impl.ClientMetricData;
import com.puppetlabs.http.client.impl.Metrics;
import com.puppetlabs.http.client.impl.Promise; import com.puppetlabs.http.client.impl.Promise;
import java.io.Closeable; import java.io.Closeable;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map;
/** /**
* This interface represents an asynchronous HTTP client with which * This interface represents an asynchronous HTTP client with which
@ -13,6 +17,60 @@ import java.net.URISyntaxException;
*/ */
public interface AsyncHttpClient extends Closeable{ public interface AsyncHttpClient extends Closeable{
/**
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics();
/**
* @param url a url to filter on
* @param metricType the type of metric to return timers for
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics(String url, Metrics.MetricType metricType);
/**
* @param url a url to filter on
* @param method an HTTP method to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics(String url, String method, Metrics.MetricType metricType);
/**
* @param metricId a metric id to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics(String[] metricId, Metrics.MetricType metricType);
/**
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData();
/**
* @param url a url to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData(String url, Metrics.MetricType metricType);
/**
* @param url a url to filter on
* @param method an HTTP method to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData(String url, String method, Metrics.MetricType metricType);
/**
* @param metricId a metric id to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData(String[] metricId, Metrics.MetricType metricType);
/** /**
* Performs a GET request * Performs a GET request
* @param url the URL against which to make the GET request * @param url the URL against which to make the GET request

View file

@ -1,5 +1,7 @@
package com.puppetlabs.http.client; package com.puppetlabs.http.client;
import com.codahale.metrics.MetricRegistry;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
/** /**
@ -24,6 +26,7 @@ public class ClientOptions {
private boolean followRedirects = true; private boolean followRedirects = true;
private int connectTimeoutMilliseconds = -1; private int connectTimeoutMilliseconds = -1;
private int socketTimeoutMilliseconds = -1; private int socketTimeoutMilliseconds = -1;
private MetricRegistry metricRegistry;
/** /**
* Constructor for the ClientOptions class. When this constructor is called, * Constructor for the ClientOptions class. When this constructor is called,
@ -171,4 +174,13 @@ public class ClientOptions {
this.socketTimeoutMilliseconds = socketTimeoutMilliseconds; this.socketTimeoutMilliseconds = socketTimeoutMilliseconds;
return this; return this;
} }
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
public ClientOptions setMetricRegistry(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
return this;
}
} }

View file

@ -2,6 +2,7 @@ package com.puppetlabs.http.client;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Map; import java.util.Map;
/** /**
@ -14,6 +15,7 @@ public class RequestOptions {
private Object body; private Object body;
private boolean decompressBody = true; private boolean decompressBody = true;
private ResponseBodyType as = ResponseBodyType.STREAM; private ResponseBodyType as = ResponseBodyType.STREAM;
private String[] metricId;
/** /**
* Constructor for the RequestOptions class. When this constructor is called, * Constructor for the RequestOptions class. When this constructor is called,
@ -90,4 +92,13 @@ public class RequestOptions {
this.as = as; this.as = as;
return this; return this;
} }
public String[] getMetricId() {
return metricId;
}
public RequestOptions setMetricId(String[] metricId) {
this.metricId = metricId;
return this;
}
} }

View file

@ -6,6 +6,7 @@ import com.puppetlabs.http.client.impl.PersistentSyncHttpClient;
import com.puppetlabs.http.client.impl.CoercedClientOptions; import com.puppetlabs.http.client.impl.CoercedClientOptions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
@ -86,8 +87,7 @@ public class Sync {
* @return A persistent synchronous HTTP client * @return A persistent synchronous HTTP client
*/ */
public static SyncHttpClient createClient(ClientOptions clientOptions) { public static SyncHttpClient createClient(ClientOptions clientOptions) {
return new PersistentSyncHttpClient( return new PersistentSyncHttpClient(JavaClient.createClient(clientOptions), clientOptions.getMetricRegistry());
JavaClient.createClient(clientOptions));
} }
/** /**

View file

@ -1,8 +1,13 @@
package com.puppetlabs.http.client; package com.puppetlabs.http.client;
import com.codahale.metrics.Timer;
import com.puppetlabs.http.client.impl.ClientMetricData;
import com.puppetlabs.http.client.impl.Metrics;
import java.io.Closeable; import java.io.Closeable;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map;
/** /**
* This interface represents a synchronous HTTP client with which * This interface represents a synchronous HTTP client with which
@ -11,6 +16,60 @@ import java.net.URISyntaxException;
*/ */
public interface SyncHttpClient extends Closeable { public interface SyncHttpClient extends Closeable {
/**
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics();
/**
* @param url a url to filter on
* @param metricType the type of metric to return timers for
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics(String url, Metrics.MetricType metricType);
/**
* @param url a url to filter on
* @param method an HTTP method to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics(String url, String method, Metrics.MetricType metricType);
/**
* @param metricId a metric id to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to Timer instance
*/
public Map<String, Timer> getClientMetrics(String[] metricId, Metrics.MetricType metricType);
/**
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData();
/**
* @param url a url to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData(String url, Metrics.MetricType metricType);
/**
* @param url a url to filter on
* @param method an HTTP method to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData(String url, String method, Metrics.MetricType metricType);
/**
* @param metricId a metric id to filter on
* @param metricType the type of metric to return data for
* @return a Map of metric name to metric data
*/
public Map<String, ClientMetricData> getClientMetricsData(String[] metricId, Metrics.MetricType metricType);
/** /**
* Makes a configurable HTTP request * Makes a configurable HTTP request
* @param requestOptions the options to configure the request * @param requestOptions the options to configure the request

View file

@ -0,0 +1,32 @@
package com.puppetlabs.http.client.impl;
public class ClientMetricData {
private String metricName;
private Long count;
private Long mean;
private Long aggregate;
ClientMetricData(String metricName, Long count, Long mean, Long aggregate) {
this.metricName = metricName;
this.count = count;
this.mean = mean;
this.aggregate = aggregate;
}
public String getMetricName() {
return metricName;
}
public Long getCount() {
return count;
}
public Long getMean() {
return mean;
}
public Long getAggregate() {
return aggregate;
}
}

View file

@ -0,0 +1,32 @@
package com.puppetlabs.http.client.impl;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.Metric;
public class ClientMetricFilter implements MetricFilter{
private String name;
private String nameEnd;
public ClientMetricFilter() {
this.name = null;
}
public ClientMetricFilter(String name) {
this.name = name;
}
public ClientMetricFilter(String name, String nameEnd) {
this.name = name;
this.nameEnd = nameEnd;
}
public boolean matches(String s, Metric metric) {
if ( name == null ) {
return s.startsWith(Metrics.METRIC_NAMESPACE);
} else if ( nameEnd == null ){
return s.equals(name);
} else {
return s.startsWith(name) && s.endsWith(nameEnd);
}
}
}

View file

@ -5,6 +5,7 @@ import com.puppetlabs.http.client.HttpClientException;
import com.puppetlabs.http.client.HttpMethod; import com.puppetlabs.http.client.HttpMethod;
import com.puppetlabs.http.client.RequestOptions; import com.puppetlabs.http.client.RequestOptions;
import com.puppetlabs.http.client.ResponseBodyType; import com.puppetlabs.http.client.ResponseBodyType;
import com.codahale.metrics.MetricRegistry;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.Consts; import org.apache.http.Consts;
@ -259,7 +260,10 @@ public class JavaClient {
private static void executeWithConsumer(final CloseableHttpAsyncClient client, private static void executeWithConsumer(final CloseableHttpAsyncClient client,
final FutureCallback<HttpResponse> futureCallback, final FutureCallback<HttpResponse> futureCallback,
final HttpRequestBase request) { final HttpRequestBase request,
final MetricRegistry metricRegistry,
final String[] metricId) {
/* /*
* Create an Apache AsyncResponseConsumer that will return the response to us as soon as it is available, * Create an Apache AsyncResponseConsumer that will return the response to us as soon as it is available,
* then send the response body asynchronously * then send the response body asynchronously
@ -287,6 +291,7 @@ public class JavaClient {
@Override @Override
public void completed(HttpResponse httpResponse) { public void completed(HttpResponse httpResponse) {
consumer.setFinalResult(null); consumer.setFinalResult(null);
futureCallback.completed(httpResponse); futureCallback.completed(httpResponse);
} }
@ -307,14 +312,18 @@ public class JavaClient {
} }
}; };
client.execute(HttpAsyncMethods.create(request), consumer, streamingCompleteCallback); TimedFutureCallback<HttpResponse> timedStreamingCompleteCallback =
new TimedFutureCallback<>(streamingCompleteCallback,
Metrics.startBytesReadTimers(metricRegistry, request, metricId));
client.execute(HttpAsyncMethods.create(request), consumer, timedStreamingCompleteCallback);
} }
public static void requestWithClient(final RequestOptions requestOptions, public static void requestWithClient(final RequestOptions requestOptions,
final HttpMethod method, final HttpMethod method,
final IResponseCallback callback, final IResponseCallback callback,
final CloseableHttpAsyncClient client, final CloseableHttpAsyncClient client,
final ResponseDeliveryDelegate responseDeliveryDelegate) { final ResponseDeliveryDelegate responseDeliveryDelegate,
final MetricRegistry registry) {
final CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions, method); final CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions, method);
@ -343,10 +352,13 @@ public class JavaClient {
} }
}; };
final String[] metricId = requestOptions.getMetricId();
if (requestOptions.getAs() == ResponseBodyType.UNBUFFERED_STREAM) { if (requestOptions.getAs() == ResponseBodyType.UNBUFFERED_STREAM) {
executeWithConsumer(client, futureCallback, request); executeWithConsumer(client, futureCallback, request, registry, metricId);
} else { } else {
client.execute(request, futureCallback); TimedFutureCallback<HttpResponse> timedFutureCallback =
new TimedFutureCallback<>(futureCallback, Metrics.startBytesReadTimers(registry, request, metricId));
client.execute(request, timedFutureCallback);
} }
} }

View file

@ -0,0 +1,193 @@
package com.puppetlabs.http.client.impl;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpRequest;
import org.apache.http.RequestLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Metrics {
public static final String METRIC_NAMESPACE = "puppetlabs.http-client.experimental";
public static final String URL_NAMESPACE = "with-url";
public static final String ID_NAMESPACE = "with-metric-id";
public static final String BYTES_READ_STRING = "bytes-read";
public enum MetricType { BYTES_READ; }
public static String metricTypeString(Metrics.MetricType metricType) {
// currently this is the only metric type we have; in the future when
// there are multiple types this will do something more useful
return BYTES_READ_STRING;
}
private static final Logger LOGGER = LoggerFactory.getLogger(Metrics.class);
private static ArrayList<Timer.Context> startBytesReadMetricIdTimers(MetricRegistry registry,
String[] metricId) {
ArrayList<Timer.Context> timers = new ArrayList<>();
for (int i = 0; i < metricId.length; i++) {
ArrayList<String> currentId = new ArrayList<>();
currentId.add(ID_NAMESPACE);
for (int j = 0; j <= i; j++) {
currentId.add(metricId[j]);
}
currentId.add(BYTES_READ_STRING);
String metric_name = MetricRegistry.name(METRIC_NAMESPACE,
currentId.toArray(new String[currentId.size()]));
timers.add(registry.timer(metric_name).time());
}
return timers;
}
private static ArrayList<Timer.Context> startBytesReadUrlTimers(MetricRegistry registry,
HttpRequest request) {
ArrayList<Timer.Context> timers = new ArrayList<>();
try {
final RequestLine requestLine = request.getRequestLine();
final URI uri = new URI(requestLine.getUri());
// if the port is not specified, `getPort()` returns -1
final String port = uri.getPort() == -1 ? "" : ":" + uri.getPort();
final String strippedUrl = uri.getScheme() + "://" + uri.getHost()
+ port + uri.getRawPath();
final String urlName = MetricRegistry.name(METRIC_NAMESPACE, URL_NAMESPACE,
strippedUrl, BYTES_READ_STRING);
final String urlAndMethodName = MetricRegistry.name(METRIC_NAMESPACE, URL_NAMESPACE,
strippedUrl, requestLine.getMethod(), BYTES_READ_STRING);
timers.add(registry.timer(urlName).time());
timers.add(registry.timer(urlAndMethodName).time());
} catch (URISyntaxException e) {
// this shouldn't be possible
LOGGER.warn("Could not build URI out of the request URI. Will not create URI timers. " +
"We recommend you read http://www.stilldrinking.com/programming-sucks. " +
"'now all your snowflakes are urine and you can't even find the cat.'");
}
return timers;
}
public static ArrayList<Timer.Context> startBytesReadTimers(MetricRegistry registry,
HttpRequest request,
String[] metricId) {
if (registry != null) {
ArrayList<Timer.Context> urlTimers = startBytesReadUrlTimers(registry, request);
ArrayList<Timer.Context> allTimers = new ArrayList<>(urlTimers);
if (metricId != null) {
ArrayList<Timer.Context> metricIdTimers =
startBytesReadMetricIdTimers(registry, metricId);
allTimers.addAll(metricIdTimers);
}
return allTimers;
}
else {
return null;
}
}
public static Map<String, Timer> getClientMetrics(MetricRegistry metricRegistry){
if (metricRegistry != null) {
return metricRegistry.getTimers(new ClientMetricFilter());
} else {
return null;
}
}
public static Map<String, Timer> getClientMetricsWithUrl(MetricRegistry metricRegistry,
final String url,
final MetricType metricType){
if (metricRegistry != null) {
String metricName = MetricRegistry.name(METRIC_NAMESPACE, URL_NAMESPACE,
url, metricTypeString(metricType));
return metricRegistry.getTimers(new ClientMetricFilter(metricName));
} else {
return null;
}
}
public static Map<String, Timer> getClientMetricsWithUrlAndMethod(MetricRegistry metricRegistry,
final String url,
final String method,
final MetricType metricType){
if (metricRegistry != null) {
String metricName = MetricRegistry.name(METRIC_NAMESPACE, URL_NAMESPACE,
url, method, metricTypeString(metricType));
return metricRegistry.getTimers(new ClientMetricFilter(metricName));
} else {
return null;
}
}
public static Map<String, Timer> getClientMetricsWithMetricId(MetricRegistry metricRegistry,
final String[] metricId,
final MetricType metricType){
if (metricRegistry != null) {
if (metricId.length == 0) {
String metricNameStart = MetricRegistry.name(METRIC_NAMESPACE, ID_NAMESPACE);
String metricNameEnd = metricTypeString(metricType);
return metricRegistry.getTimers(new ClientMetricFilter(metricNameStart, metricNameEnd));
} else {
String metricName = MetricRegistry.name(METRIC_NAMESPACE, ID_NAMESPACE,
StringUtils.join(metricId, "."), metricTypeString(metricType));
return metricRegistry.getTimers(new ClientMetricFilter(metricName));
}
} else {
return null;
}
}
public static Map<String, ClientMetricData> computeClientMetricsData(Map<String, Timer> timers){
if (timers != null) {
Map<String, ClientMetricData> metricsData = new HashMap<>();
for (Map.Entry<String, Timer> entry : timers.entrySet()) {
Timer timer = entry.getValue();
String metricName = entry.getKey();
Double mean = timer.getSnapshot().getMean();
Long meanMillis = TimeUnit.NANOSECONDS.toMillis(mean.longValue());
Long count = timer.getCount();
Long aggregate = count * meanMillis;
ClientMetricData data = new ClientMetricData(metricName, count, meanMillis, aggregate);
metricsData.put(metricName, data);
}
return metricsData;
} else {
return null;
}
}
public static Map<String, ClientMetricData> getClientMetricsData(MetricRegistry metricRegistry){
Map<String, Timer> timers = getClientMetrics(metricRegistry);
return computeClientMetricsData(timers);
}
public static Map<String, ClientMetricData> getClientMetricsDataWithUrl(MetricRegistry metricRegistry,
String url,
MetricType metricType){
Map<String, Timer> timers = getClientMetricsWithUrl(metricRegistry, url, metricType);
return computeClientMetricsData(timers);
}
public static Map<String, ClientMetricData> getClientMetricsDataWithUrlAndMethod(MetricRegistry metricRegistry,
String url,
String method,
MetricType metricType){
Map<String, Timer> timers = getClientMetricsWithUrlAndMethod(metricRegistry, url, method, metricType);
return computeClientMetricsData(timers);
}
public static Map<String, ClientMetricData> getClientMetricsDataWithMetricId(MetricRegistry metricRegistry,
String[] metricId,
MetricType metricType){
Map<String, Timer> timers = getClientMetricsWithMetricId(metricRegistry, metricId, metricType);
return computeClientMetricsData(timers);
}
}

View file

@ -1,5 +1,7 @@
package com.puppetlabs.http.client.impl; package com.puppetlabs.http.client.impl;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.puppetlabs.http.client.Response; import com.puppetlabs.http.client.Response;
import com.puppetlabs.http.client.RequestOptions; import com.puppetlabs.http.client.RequestOptions;
import com.puppetlabs.http.client.HttpMethod; import com.puppetlabs.http.client.HttpMethod;
@ -9,22 +11,57 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map;
public class PersistentAsyncHttpClient implements AsyncHttpClient { public class PersistentAsyncHttpClient implements AsyncHttpClient {
private CloseableHttpAsyncClient client; private CloseableHttpAsyncClient client;
private MetricRegistry metricRegistry;
public PersistentAsyncHttpClient(CloseableHttpAsyncClient client) { public PersistentAsyncHttpClient(CloseableHttpAsyncClient client, MetricRegistry metricRegistry) {
this.client = client; this.client = client;
this.metricRegistry = metricRegistry;
} }
public void close() throws IOException { public void close() throws IOException {
client.close(); client.close();
} }
public Map<String, Timer> getClientMetrics(){
return Metrics.getClientMetrics(metricRegistry);
}
public Map<String, Timer> getClientMetrics(String url, Metrics.MetricType metricType) {
return Metrics.getClientMetricsWithUrl(metricRegistry, url, metricType);
}
public Map<String, Timer> getClientMetrics(String url, String method, Metrics.MetricType metricType) {
return Metrics.getClientMetricsWithUrlAndMethod(metricRegistry, url, method, metricType);
}
public Map<String, Timer> getClientMetrics(String[] metricId, Metrics.MetricType metricType) {
return Metrics.getClientMetricsWithMetricId(metricRegistry, metricId, metricType);
}
public Map<String, ClientMetricData> getClientMetricsData(){
return Metrics.getClientMetricsData(metricRegistry);
}
public Map<String, ClientMetricData> getClientMetricsData(String url, Metrics.MetricType metricType) {
return Metrics.getClientMetricsDataWithUrl(metricRegistry, url, metricType);
}
public Map<String, ClientMetricData> getClientMetricsData(String url, String method, Metrics.MetricType metricType) {
return Metrics.getClientMetricsDataWithUrlAndMethod(metricRegistry, url, method, metricType);
}
public Map<String, ClientMetricData> getClientMetricsData(String[] metricId, Metrics.MetricType metricType) {
return Metrics.getClientMetricsDataWithMetricId(metricRegistry, metricId, metricType);
}
private Promise<Response> request(RequestOptions requestOptions, HttpMethod method) { private Promise<Response> request(RequestOptions requestOptions, HttpMethod method) {
final Promise<Response> promise = new Promise<>(); final Promise<Response> promise = new Promise<>();
final JavaResponseDeliveryDelegate responseDelivery = new JavaResponseDeliveryDelegate(promise); final JavaResponseDeliveryDelegate responseDelivery = new JavaResponseDeliveryDelegate(promise);
JavaClient.requestWithClient(requestOptions, method, null, client, responseDelivery); JavaClient.requestWithClient(requestOptions, method, null, client, responseDelivery, metricRegistry);
return promise; return promise;
} }

View file

@ -8,17 +8,22 @@ import com.puppetlabs.http.client.SyncHttpClient;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map;
public class PersistentSyncHttpClient implements SyncHttpClient { public class PersistentSyncHttpClient implements SyncHttpClient {
private CloseableHttpAsyncClient client; private CloseableHttpAsyncClient client;
private MetricRegistry metricRegistry;
private static final Logger LOGGER = LoggerFactory.getLogger(PersistentSyncHttpClient.class); private static final Logger LOGGER = LoggerFactory.getLogger(PersistentSyncHttpClient.class);
public PersistentSyncHttpClient(CloseableHttpAsyncClient client) { public PersistentSyncHttpClient(CloseableHttpAsyncClient client, MetricRegistry metricRegistry) {
this.client = client; this.client = client;
this.metricRegistry = metricRegistry;
} }
private static void logAndRethrow(String msg, Throwable t) { private static void logAndRethrow(String msg, Throwable t) {
@ -26,10 +31,42 @@ public class PersistentSyncHttpClient implements SyncHttpClient {
throw new HttpClientException(msg, t); throw new HttpClientException(msg, t);
} }
public Map<String, Timer> getClientMetrics(){
return Metrics.getClientMetrics(metricRegistry);
}
public Map<String, Timer> getClientMetrics(String url, Metrics.MetricType metricType) {
return Metrics.getClientMetricsWithUrl(metricRegistry, url, metricType);
}
public Map<String, Timer> getClientMetrics(String url, String method, Metrics.MetricType metricType) {
return Metrics.getClientMetricsWithUrlAndMethod(metricRegistry, url, method, metricType);
}
public Map<String, Timer> getClientMetrics(String[] metricId, Metrics.MetricType metricType) {
return Metrics.getClientMetricsWithMetricId(metricRegistry, metricId, metricType);
}
public Map<String, ClientMetricData> getClientMetricsData(){
return Metrics.getClientMetricsData(metricRegistry);
}
public Map<String, ClientMetricData> getClientMetricsData(String url, Metrics.MetricType metricType) {
return Metrics.getClientMetricsDataWithUrl(metricRegistry, url, metricType);
}
public Map<String, ClientMetricData> getClientMetricsData(String url, String method, Metrics.MetricType metricType) {
return Metrics.getClientMetricsDataWithUrlAndMethod(metricRegistry, url, method, metricType);
}
public Map<String, ClientMetricData> getClientMetricsData(String[] metricId, Metrics.MetricType metricType) {
return Metrics.getClientMetricsDataWithMetricId(metricRegistry, metricId, metricType);
}
public Response request(RequestOptions requestOptions, HttpMethod method) { public Response request(RequestOptions requestOptions, HttpMethod method) {
final Promise<Response> promise = new Promise<>(); final Promise<Response> promise = new Promise<>();
final JavaResponseDeliveryDelegate responseDelivery = new JavaResponseDeliveryDelegate(promise); final JavaResponseDeliveryDelegate responseDelivery = new JavaResponseDeliveryDelegate(promise);
JavaClient.requestWithClient(requestOptions, method, null, client, responseDelivery); JavaClient.requestWithClient(requestOptions, method, null, client, responseDelivery, metricRegistry);
Response response = null; Response response = null;
try { try {
response = promise.deref(); response = promise.deref();

View file

@ -0,0 +1,42 @@
package com.puppetlabs.http.client.impl;
import com.codahale.metrics.Timer;
import org.apache.http.concurrent.FutureCallback;
import java.util.ArrayList;
public final class TimedFutureCallback<T> implements FutureCallback<T> {
private final FutureCallback<T> delegate;
private final ArrayList<Timer.Context> timerContexts;
public TimedFutureCallback(FutureCallback<T> delegate, ArrayList<Timer.Context> timerContexts) {
this.delegate = delegate;
this.timerContexts = timerContexts;
}
public void completed(T result) {
stopTimerContexts();
delegate.completed(result);
}
public void failed(Exception ex) {
stopTimerContexts();
delegate.failed(ex);
}
public void cancelled() {
stopTimerContexts();
delegate.cancelled();
}
private void stopTimerContexts() {
if (timerContexts != null) {
for (Timer.Context timerContext : timerContexts) {
timerContext.stop();
}
}
}
}

View file

@ -0,0 +1,198 @@
(ns com.puppetlabs.http.client.impl.metrics-unit-test
(:require [clojure.test :refer :all]
[puppetlabs.http.client.metrics :as metrics])
(:import (com.codahale.metrics MetricRegistry)
(com.puppetlabs.http.client.impl Metrics Metrics$MetricType)
(org.apache.http.message BasicHttpRequest)))
(def bytes-read Metrics$MetricType/BYTES_READ)
(defn add-metric-ns [string]
(str "puppetlabs.http-client.experimental." string))
(deftest start-bytes-read-timers-test
(testing "startBytesReadTimers creates the right timers"
(let [url-id (add-metric-ns "with-url.http://localhost/foo.bytes-read")
url-method-id (add-metric-ns "with-url.http://localhost/foo.GET.bytes-read")]
(testing "metric id timers are not created for a request without a metric id"
(let [metric-registry (MetricRegistry.)]
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://localhost/foo")
nil)
(is (= (set (list url-id url-method-id)) (set (keys (.getTimers metric-registry)))))))
(testing "metric id timers are not created for a request with an empty metric id"
(let [metric-registry (MetricRegistry.)]
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://localhost/foo")
(into-array String []))
(is (= (set (list url-id url-method-id)) (set (keys (.getTimers metric-registry)))))))
(testing "metric id timers are created correctly for a request with a metric id"
(let [metric-registry (MetricRegistry.)]
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://localhost/foo")
(into-array ["foo" "bar" "baz"]))
(is (= (set (list url-id url-method-id
(add-metric-ns "with-metric-id.foo.bytes-read")
(add-metric-ns "with-metric-id.foo.bar.bytes-read")
(add-metric-ns "with-metric-id.foo.bar.baz.bytes-read")))
(set (keys (.getTimers metric-registry)))))))
(testing "url timers should strip off username, password, query string, and fragment"
(let [metric-registry (MetricRegistry.)]
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://user:pwd@localhost:1234/foo%2cbar/baz?te%2cst=one")
nil)
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://user:pwd@localhost:1234/foo%2cbar/baz#x%2cyz")
nil)
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://user:pwd@localhost:1234/foo%2cbar/baz?te%2cst=one#x%2cyz")
nil)
(Metrics/startBytesReadTimers metric-registry
(BasicHttpRequest. "GET" "http://user:pwd@localhost:1234/foo%2cbar/baz?#x%2cyz")
nil)
(is (= (set (list (add-metric-ns "with-url.http://localhost:1234/foo%2cbar/baz.bytes-read")
(add-metric-ns "with-url.http://localhost:1234/foo%2cbar/baz.GET.bytes-read")))
(set (keys (.getTimers metric-registry))))))))))
(defn start-and-stop-timers! [registry req id]
(doseq [timer (Metrics/startBytesReadTimers
registry
req
id)]
(.stop timer)))
(deftest get-client-metrics-data-test
(let [registry (MetricRegistry.)
url "http://test.com/one"
url2 "http://test.com/one/two"]
(start-and-stop-timers! registry (BasicHttpRequest. "GET" url) nil)
(start-and-stop-timers! registry (BasicHttpRequest. "POST" url) nil)
(start-and-stop-timers! registry (BasicHttpRequest. "POST" url) (into-array ["foo" "bar"]))
(start-and-stop-timers! registry (BasicHttpRequest. "GET" url2) (into-array ["foo" "abc"]))
(testing "getClientMetrics without args returns all timers"
(is (= (set
["puppetlabs.http-client.experimental.with-url.http://test.com/one.bytes-read"
"puppetlabs.http-client.experimental.with-url.http://test.com/one.GET.bytes-read"
"puppetlabs.http-client.experimental.with-url.http://test.com/one.POST.bytes-read"
"puppetlabs.http-client.experimental.with-metric-id.foo.bytes-read"
"puppetlabs.http-client.experimental.with-metric-id.foo.bar.bytes-read"
"puppetlabs.http-client.experimental.with-url.http://test.com/one/two.bytes-read"
"puppetlabs.http-client.experimental.with-url.http://test.com/one/two.GET.bytes-read"
"puppetlabs.http-client.experimental.with-metric-id.foo.abc.bytes-read"])
(set (keys (Metrics/getClientMetrics registry)))
(set (keys (Metrics/getClientMetricsData registry))))))
(testing "getClientMetricsData with url returns the right thing"
(let [java-data (Metrics/getClientMetricsDataWithUrl registry url bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:url url :metric-type :bytes-read})]
(is (= (add-metric-ns "with-url.http://test.com/one.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 3 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(let [java-data (Metrics/getClientMetricsDataWithUrl registry url2 bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:url url2 :metric-type :bytes-read})]
(is (= (add-metric-ns "with-url.http://test.com/one/two.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 1 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(testing "getClientMetricsData with url returns nothing if url is not a full match"
(is (= {} (Metrics/getClientMetricsDataWithUrl registry "http://test.com" bytes-read)
(metrics/get-client-metrics-data
registry {:url "http://test.com" :metric-type :bytes-read})))))
(testing "getClientMetricsData with url and method returns the right thing"
(let [java-data (Metrics/getClientMetricsDataWithUrlAndMethod registry url "GET" bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:url url :method :get :metric-type :bytes-read})]
(is (= (add-metric-ns "with-url.http://test.com/one.GET.bytes-read")
(first (keys clj-data))
(first (keys java-data))))
(is (= 1 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(let [java-data (Metrics/getClientMetricsDataWithUrlAndMethod registry url "POST" bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:url url :method :post :metric-type :bytes-read})]
(is (= (add-metric-ns "with-url.http://test.com/one.POST.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 2 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(let [java-data (Metrics/getClientMetricsDataWithUrlAndMethod registry url2 "GET" bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:url url2 :method :get :metric-type :bytes-read})]
(is (= (add-metric-ns "with-url.http://test.com/one/two.GET.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 1 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(testing "getClientMetricsData with url and method returns nothing if method is not a match"
(is (= {} (Metrics/getClientMetricsDataWithUrlAndMethod
registry "http://test.com" "PUT" bytes-read)
(metrics/get-client-metrics-data
registry {:url "http://test.com" :method :put :metric-type :bytes-read})))))
(testing "getClientMetricsData with metric id returns the right thing"
(let [java-data (Metrics/getClientMetricsDataWithMetricId
registry (into-array ["foo"]) bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:metric-id ["foo"] :metric-type :bytes-read})]
(is (= (add-metric-ns "with-metric-id.foo.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 2 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(let [java-data (Metrics/getClientMetricsDataWithMetricId
registry (into-array ["foo" "bar"]) bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:metric-id ["foo" "bar"] :metric-type :bytes-read})]
(is (= (add-metric-ns "with-metric-id.foo.bar.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 1 (.getCount (first (vals java-data)))
(:count (first (vals clj-data))))))
(let [java-data (Metrics/getClientMetricsDataWithMetricId
registry (into-array ["foo" "abc"]) bytes-read)
clj-data (metrics/get-client-metrics-data
registry {:metric-id ["foo" "abc"] :metric-type :bytes-read})]
(is (= (add-metric-ns "with-metric-id.foo.abc.bytes-read")
(first (keys java-data))
(first (keys clj-data))))
(is (= 1 (.getCount (first (vals java-data)))
(:count (first (vals clj-data)))))
(testing "metric id can be specified as keyword or string"
(is (= clj-data
(metrics/get-client-metrics-data
registry {:metric-id ["foo" :abc] :metric-type :bytes-read})))))
(testing "getClientMetricsData with metric id returns nothing if id is not a match"
(is (= {} (Metrics/getClientMetricsDataWithMetricId
registry (into-array ["foo" "cat"]) bytes-read)
(metrics/get-client-metrics-data
registry {:metric-id ["foo" "cat"] :metric-type :bytes-read})))))))
(deftest empty-metric-id-filter-test
(testing "a metric id filter with an empty array returns all metric id timers"
(let [registry (MetricRegistry.)
url "http://test.com/foo/bar"
foo-id (add-metric-ns "with-metric-id.foo.bytes-read")
foo-bar-id (add-metric-ns "with-metric-id.foo.bar.bytes-read")
foo-bar-baz-id (add-metric-ns "with-metric-id.foo.bar.baz.bytes-read")]
(start-and-stop-timers! registry (BasicHttpRequest. "GET" url) (into-array ["foo" "bar" "baz"]))
(testing "empty metric filter returns all metric id timers"
(is (= (set (list foo-id foo-bar-id foo-bar-baz-id))
(set (keys (Metrics/getClientMetricsDataWithMetricId registry (into-array String []) bytes-read)))
(set (keys (metrics/get-client-metrics-data registry (metrics/filter-with-metric-id []))))))))))
(deftest metrics-filter-builder-test
(let [metric-registry (MetricRegistry.)
url "http://test.com/foo/bar"]
(start-and-stop-timers! metric-registry (BasicHttpRequest. "GET" url) (into-array ["foo" "bar"]))
(testing "url-filter works"
(is (= (metrics/get-client-metrics-data metric-registry {:url url :metric-type :bytes-read})
(metrics/get-client-metrics-data metric-registry (metrics/filter-with-url url)))))
(testing "url-method-filter works"
(is (= (metrics/get-client-metrics-data metric-registry {:url url :method :get :metric-type :bytes-read})
(metrics/get-client-metrics-data metric-registry (metrics/filter-with-url-and-method url :get)))))
(testing "metric-id-filter works"
(is (= (metrics/get-client-metrics-data metric-registry {:metric-id [:foo :bar] :metric-type :bytes-read})
(metrics/get-client-metrics-data metric-registry (metrics/filter-with-metric-id [:foo :bar])))))))

View file

@ -14,12 +14,12 @@
(use-fixtures :once schema-test/validate-schemas) (use-fixtures :once schema-test/validate-schemas)
(defn- generate-data (defn generate-data
"Generate data of approximately the requested size, which is moderately compressible" "Generate data of approximately the requested size, which is moderately compressible"
[data-size] [data-size]
(apply str "xxxx" (repeatedly (/ data-size 35) #(UUID/randomUUID)))) (apply str "xxxx" (repeatedly (/ data-size 35) #(UUID/randomUUID))))
(defn- successful-handler (defn successful-handler
"A Ring handler that asynchronously sends some data, waits for confirmation the data has been received then sends "A Ring handler that asynchronously sends some data, waits for confirmation the data has been received then sends
some more data" some more data"
[data send-more-data] [data send-more-data]
@ -39,7 +39,7 @@
{:status 200 {:status 200
:body instream}))) :body instream})))
(defn- blocking-handler (defn blocking-handler
"A Ring handler that sends some data but then never closes the socket" "A Ring handler that sends some data but then never closes the socket"
[data] [data]
(fn [_] (fn [_]
@ -61,7 +61,7 @@
(let [data (generate-data (* 32 1024 1024)) (let [data (generate-data (* 32 1024 1024))
opts {:as :unbuffered-stream :decompress-body decompress-body?}] opts {:as :unbuffered-stream :decompress-body decompress-body?}]
(testing " - check data can be streamed successfully success" (testing " - check data can be streamed successfully"
(let [send-more-data (promise)] (let [send-more-data (promise)]
(testwebserver/with-test-webserver-and-config (testwebserver/with-test-webserver-and-config
(successful-handler data send-more-data) port {:shutdown-timeout-seconds 1} (successful-handler data send-more-data) port {:shutdown-timeout-seconds 1}
@ -190,9 +190,9 @@
(.setSocketTimeoutMilliseconds 20000) (.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100) (.setConnectTimeoutMilliseconds 100)
(Async/createClient))] (Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello")) (let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options ResponseBodyType/UNBUFFERED_STREAM) (.setAs ResponseBodyType/UNBUFFERED_STREAM)
_ (.setDecompressBody request-options decompress-body?) (.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref) response (-> client (.get request-options) .deref)
status (.getStatus response) status (.getStatus response)
body (.getBody response)] body (.getBody response)]
@ -212,9 +212,9 @@
(.setSocketTimeoutMilliseconds 200) (.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100) (.setConnectTimeoutMilliseconds 100)
(Async/createClient))] (Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello")) (let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options ResponseBodyType/UNBUFFERED_STREAM) (.setAs ResponseBodyType/UNBUFFERED_STREAM)
_ (.setDecompressBody request-options decompress-body?) (.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref) response (-> client (.get request-options) .deref)
body (.getBody response) body (.getBody response)
error (.getError response)] error (.getError response)]
@ -229,9 +229,9 @@
(with-open [client (-> (ClientOptions.) (with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 100) (.setConnectTimeoutMilliseconds 100)
(Async/createClient))] (Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" 12345 "/bad")) (let [request-options (doto (RequestOptions. (str "http://localhost:" 12345 "/bad"))
_ (.setAs request-options ResponseBodyType/UNBUFFERED_STREAM) (.setAs ResponseBodyType/UNBUFFERED_STREAM)
_ (.setDecompressBody request-options decompress-body?) (.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref) response (-> client (.get request-options) .deref)
error (.getError response)] error (.getError response)]
(is error) (is error)
@ -250,16 +250,16 @@
[data response-body-type decompress-body?] [data response-body-type decompress-body?]
(testlogging/with-test-logging (testlogging/with-test-logging
(testing " - check data can be streamed successfully success" (testing " - check data can be streamed successfully"
(testwebserver/with-test-webserver-and-config (testwebserver/with-test-webserver-and-config
(successful-handler data nil) port {:shutdown-timeout-seconds 1} (successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.) (with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 20000) (.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100) (.setConnectTimeoutMilliseconds 100)
(Async/createClient))] (Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello")) (let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options response-body-type) (.setAs response-body-type)
_ (.setDecompressBody request-options decompress-body?) (.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref) response (-> client (.get request-options) .deref)
status (.getStatus response) status (.getStatus response)
body (.getBody response)] body (.getBody response)]
@ -278,9 +278,9 @@
(.setSocketTimeoutMilliseconds 200) (.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100) (.setConnectTimeoutMilliseconds 100)
(Async/createClient))] (Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" port "/hello")) (let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
_ (.setAs request-options response-body-type) (.setAs response-body-type)
_ (.setDecompressBody request-options decompress-body?) (.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref) response (-> client (.get request-options) .deref)
error (.getError response)] error (.getError response)]
(is (instance? SocketTimeoutException error))))) (is (instance? SocketTimeoutException error)))))
@ -292,9 +292,9 @@
(with-open [client (-> (ClientOptions.) (with-open [client (-> (ClientOptions.)
(.setConnectTimeoutMilliseconds 100) (.setConnectTimeoutMilliseconds 100)
(Async/createClient))] (Async/createClient))]
(let [request-options (RequestOptions. (str "http://localhost:" 12345 "/bad")) (let [request-options (doto (RequestOptions. (str "http://localhost:" 12345 "/bad"))
_ (.setAs request-options response-body-type) (.setAs response-body-type)
_ (.setDecompressBody request-options decompress-body?) (.setDecompressBody decompress-body?))
response (-> client (.get request-options) .deref) response (-> client (.get request-options) .deref)
error (.getError response)] error (.getError response)]
(is error) (is error)
@ -318,7 +318,7 @@
(deftest java-existing-streaming-with-small-payload-with-decompression (deftest java-existing-streaming-with-small-payload-with-decompression
(testing "java :stream with 1K payload and decompression" (testing "java :stream with 1K payload and decompression"
(java-blocking-streaming (generate-data 1024) ResponseBodyType/STREAM false))) (java-blocking-streaming (generate-data 1024) ResponseBodyType/STREAM true)))
(deftest java-existing-streaming-with-large-payload-without-decompression (deftest java-existing-streaming-with-large-payload-without-decompression
(testing "java :stream with 32M payload and no decompression" (testing "java :stream with 32M payload and no decompression"

View file

@ -0,0 +1,485 @@
(ns puppetlabs.http.client.metrics-test
(:require [clojure.test :refer :all]
[puppetlabs.http.client.async-unbuffered-test :as unbuffered-test]
[puppetlabs.trapperkeeper.services.webserver.jetty9-service :as jetty9]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[puppetlabs.trapperkeeper.testutils.webserver :as testwebserver]
[puppetlabs.http.client.async :as async]
[puppetlabs.http.client.sync :as sync]
[puppetlabs.http.client.common :as common]
[puppetlabs.trapperkeeper.core :as tk])
(:import (com.puppetlabs.http.client.impl ClientMetricData)
(com.puppetlabs.http.client Async RequestOptions ClientOptions ResponseBodyType Sync)
(com.codahale.metrics Timer MetricRegistry)
(java.net SocketTimeoutException)
(java.util.concurrent TimeoutException)))
(tk/defservice test-metric-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler (fn [_] {:status 200 :body "Hello, World!"}) "/hello")
(add-ring-handler (fn [_]
(do
(Thread/sleep 5)
{:status 200 :body "short"}))
"/short")
(add-ring-handler (fn [_]
(do
(Thread/sleep 100)
{:status 200 :body "long"}))
"/long")
context))
(def hello-url "http://localhost:10000/hello")
(def short-url "http://localhost:10000/short")
(def long-url "http://localhost:10000/long")
(def short-name-base "puppetlabs.http-client.experimental.with-url.http://localhost:10000/short")
(def short-name (str short-name-base ".bytes-read"))
(def short-name-with-get (str short-name-base ".GET" ".bytes-read"))
(def short-name-with-post (str short-name-base ".POST" ".bytes-read"))
(def long-name-base "puppetlabs.http-client.experimental.with-url.http://localhost:10000/long")
(def long-name (str long-name-base ".bytes-read"))
(def long-name-with-method (str long-name-base ".GET" ".bytes-read"))
(def long-foo-name "puppetlabs.http-client.experimental.with-metric-id.foo.bytes-read")
(def long-foo-bar-name "puppetlabs.http-client.experimental.with-metric-id.foo.bar.bytes-read")
(def long-foo-bar-baz-name "puppetlabs.http-client.experimental.with-metric-id.foo.bar.baz.bytes-read")
(def hello-name-base "puppetlabs.http-client.experimental.with-url.http://localhost:10000/hello")
(def hello-name (str hello-name-base ".bytes-read"))
(def hello-name-with-method (str hello-name-base ".GET" ".bytes-read"))
(deftest metrics-test-java-async
(testing "metrics work with java async client"
(testlogging/with-test-logging
(testutils/with-app-with-config
app
[jetty9/jetty9-service test-metric-web-service]
{:webserver {:port 10000}}
(let [metric-registry (MetricRegistry.)
hello-request-opts (RequestOptions. hello-url)
short-request-opts (RequestOptions. short-url)
long-request-opts (doto (RequestOptions. long-url)
(.setMetricId (into-array ["foo" "bar" "baz"])))]
(with-open [client (Async/createClient (doto (ClientOptions.)
(.setMetricRegistry metric-registry)))]
(-> client (.get hello-request-opts) (.deref)) ; warm it up
(let [short-response (-> client (.get short-request-opts) (.deref))
long-response (-> client (.get long-request-opts) (.deref))]
(-> client (.post short-request-opts) (.deref))
(is (= 200 (.getStatus short-response)))
(is (= "short" (slurp (.getBody short-response))))
(is (= 200 (.getStatus long-response)))
(is (= "long" (slurp (.getBody long-response))))
(.timer metric-registry "fake")
(let [client-metrics (.getClientMetrics client)
client-metrics-data (.getClientMetricsData client)
all-metrics (.getMetrics metric-registry)]
(testing ".getClientMetrics returns only http client metrics"
(is (= 11 (count all-metrics)))
(is (= 10 (count client-metrics)))
(is (= 10 (count client-metrics-data))))
(testing "get-client-metrics returns a map of metric name to timer instance"
(is (= (set (list hello-name hello-name-with-method short-name short-name-with-get
short-name-with-post long-name long-name-with-method
long-foo-name long-foo-bar-name long-foo-bar-baz-name))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics))))
(testing "get-client-metrics-data returns a map of metric name to metric data"
(let [short-data (get client-metrics-data short-name)
short-data-get (get client-metrics-data short-name-with-get)
short-data-post (get client-metrics-data short-name-with-post)
long-data (get client-metrics-data long-name)]
(is (every? #(instance? ClientMetricData %) (vals client-metrics-data)))
(is (= short-name (.getMetricName short-data)))
(is (= 2 (.getCount short-data)))
(is (<= 5 (.getMean short-data)))
(is (<= 10 (.getAggregate short-data)))
(is (= short-name-with-get (.getMetricName short-data-get)))
(is (= 1 (.getCount short-data-get)))
(is (<= 5 (.getMean short-data-get)))
(is (<= 5 (.getAggregate short-data-get)))
(is (= short-name-with-post (.getMetricName short-data-post)))
(is (= 1 (.getCount short-data-post)))
(is (<= 5 (.getMean short-data-post)))
(is (<= 5 (.getAggregate short-data-post)))
(is (>= 1 (Math/abs (- (.getAggregate short-data)
(+ (.getAggregate short-data-get)
(.getAggregate short-data-post))))))
(is (= long-name (.getMetricName long-data)))
(is (= 1 (.getCount long-data)))
(is (<= 100 (.getMean long-data)))
(is (<= 100 (.getAggregate long-data)))
(is (> (.getAggregate long-data) (.getAggregate short-data))))))))
(with-open [client (Async/createClient (ClientOptions.))]
(testing ".getClientMetrics returns nil if no metrics registry passed in"
(let [response (-> client (.get hello-request-opts) (.deref))]
(is (= 200 (.getStatus response)))
(is (= "Hello, World!" (slurp (.getBody response))))
(is (= nil (.getClientMetrics client)))
(is (= nil (.getClientMetricsData client)))))))))))
(deftest metrics-test-clojure-async
(testing "metrics work with clojure async client"
(testlogging/with-test-logging
(testutils/with-app-with-config
app
[jetty9/jetty9-service test-metric-web-service]
{:webserver {:port 10000}}
(let [metric-registry (MetricRegistry.)]
(with-open [client (async/create-client {:metric-registry metric-registry})]
@(common/get client hello-url) ; warm it up
(let [short-response @(common/get client short-url {:as :text :metric-id ["foo" "bar" "baz"]})
long-response @(common/get client long-url)]
@(common/post client short-url)
(is (= {:status 200 :body "short"} (select-keys short-response [:status :body])))
(is (= 200 (:status long-response)))
(is (= "long" (slurp (:body long-response))))
(.timer metric-registry "fake")
(let [client-metrics (common/get-client-metrics client)
client-metrics-data (common/get-client-metrics-data client)
all-metrics (.getMetrics metric-registry)]
(testing "get-client-metrics and get-client-metrics data return only http client metrics"
(is (= 11 (count all-metrics)))
(is (= 10 (count client-metrics)))
(is (= 10 (count client-metrics-data))))
(testing "get-client-metrics returns a map of metric name to timer instance"
(is (= (set (list hello-name hello-name-with-method short-name short-name-with-get
short-name-with-post long-name long-name-with-method
long-foo-name long-foo-bar-name long-foo-bar-baz-name))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics))))
(testing "get-client-metrics-data returns a map of metric name to metrics data"
(let [short-data (get client-metrics-data short-name)
short-data-get (get client-metrics-data short-name-with-get)
short-data-post (get client-metrics-data short-name-with-post)
long-data (get client-metrics-data long-name)]
(is (= short-name (:metric-name short-data)))
(is (= 2 (:count short-data)))
(is (<= 5 (:mean short-data)))
(is (<= 10 (:aggregate short-data)))
(is (= short-name-with-get (:metric-name short-data-get)))
(is (= 1 (:count short-data-get)))
(is (<= 5 (:mean short-data-get)))
(is (<= 5 (:aggregate short-data-get)))
(is (= short-name-with-post (:metric-name short-data-post)))
(is (= 1 (:count short-data-post)))
(is (<= 5 (:mean short-data-post)))
(is (<= 5 (:aggregate short-data-post)))
(is (>= 1 (Math/abs (- (:aggregate short-data)
(+ (:aggregate short-data-get)
(:aggregate short-data-post))))))
(is (= long-name (:metric-name long-data)))
(is (= 1 (:count long-data)))
(is (<= 100 (:mean long-data)))
(is (<= 100 (:aggregate long-data)))
(is (> (:mean long-data) (:mean short-data)))))))))
(with-open [client (async/create-client {})]
(testing "get-client-metrics returns nil if no metrics registry passed in"
(let [response (common/get client hello-url)]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))
(is (= nil (common/get-client-metrics client)))
(is (= nil (common/get-client-metrics-data client))))))))))
(deftest metrics-test-java-sync
(testing "metrics work with java sync client"
(testlogging/with-test-logging
(testutils/with-app-with-config
app
[jetty9/jetty9-service test-metric-web-service]
{:webserver {:port 10000}}
(let [metric-registry (MetricRegistry.)
hello-request-opts (RequestOptions. hello-url)
short-request-opts (RequestOptions. short-url)
long-request-opts (doto (RequestOptions. long-url)
(.setMetricId (into-array ["foo" "bar" "baz"])))]
(with-open [client (Sync/createClient (doto (ClientOptions.)
(.setMetricRegistry metric-registry)))]
(.get client hello-request-opts) ; warm it up
(let [short-response (.get client short-request-opts)
long-response (.get client long-request-opts)]
(.post client short-request-opts)
(is (= 200 (.getStatus short-response)))
(is (= "short" (slurp (.getBody short-response))))
(is (= 200 (.getStatus long-response)))
(is (= "long" (slurp (.getBody long-response))))
(.timer metric-registry "fake")
(let [client-metrics (.getClientMetrics client)
client-metrics-data (.getClientMetricsData client)
all-metrics (.getMetrics metric-registry)]
(testing ".getClientMetrics returns only http client metrics"
(is (= 11 (count all-metrics)))
(is (= 10 (count client-metrics)))
(is (= 10 (count client-metrics-data))))
(testing ".getClientMetrics returns a map of metric name to timer instance"
(is (= (set (list hello-name hello-name-with-method short-name short-name-with-get
short-name-with-post long-name long-name-with-method
long-foo-name long-foo-bar-name long-foo-bar-baz-name))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics))))
(testing ".getClientMetricsData returns a map of metric name to metric data"
(let [short-data (get client-metrics-data short-name)
short-data-get (get client-metrics-data short-name-with-get)
short-data-post (get client-metrics-data short-name-with-post)
long-data (get client-metrics-data long-name)]
(is (every? #(instance? ClientMetricData %) (vals client-metrics-data)))
(is (= short-name (.getMetricName short-data)))
(is (= 2 (.getCount short-data)))
(is (<= 5 (.getMean short-data)))
(is (<= 10 (.getAggregate short-data)))
(is (= short-name-with-get (.getMetricName short-data-get)))
(is (= 1 (.getCount short-data-get)))
(is (<= 5 (.getMean short-data-get)))
(is (<= 5 (.getAggregate short-data-get)))
(is (= short-name-with-post (.getMetricName short-data-post)))
(is (= 1 (.getCount short-data-post)))
(is (<= 5 (.getMean short-data-post)))
(is (<= 5 (.getAggregate short-data-post)))
(is (>= 1 (Math/abs (- (.getAggregate short-data)
(+ (.getAggregate short-data-get)
(.getAggregate short-data-post))))))
(is (= long-name (.getMetricName long-data)))
(is (= 1 (.getCount long-data)))
(is (<= 100 (.getMean long-data)))
(is (<= 100 (.getAggregate long-data)))
(is (> (.getMean long-data) (.getMean short-data))))))))
(with-open [client (Sync/createClient (ClientOptions.))]
(testing ".getClientMetrics returns nil if no metrics registry passed in"
(let [response (.get client hello-request-opts)]
(is (= 200 (.getStatus response)))
(is (= "Hello, World!" (slurp (.getBody response))))
(is (= nil (.getClientMetrics client)))
(is (= nil (.getClientMetricsData client)))))))))))
(deftest metrics-test-clojure-sync
(testing "metrics work with clojure sync client"
(testlogging/with-test-logging
(testutils/with-app-with-config
app
[jetty9/jetty9-service test-metric-web-service]
{:webserver {:port 10000}}
(let [metric-registry (MetricRegistry.)]
(with-open [client (sync/create-client {:metric-registry metric-registry})]
(common/get client hello-url) ; warm it up
(let [short-response (common/get client short-url {:as :text})
long-response (common/get client long-url {:as :text :metric-id ["foo" "bar" "baz"]})]
(common/post client short-url)
(is (= {:status 200 :body "short"} (select-keys short-response [:status :body])))
(is (= {:status 200 :body "long"} (select-keys long-response [:status :body])))
(.timer metric-registry "fake")
(let [client-metrics (common/get-client-metrics client)
client-metrics-data (common/get-client-metrics-data client)
all-metrics (.getMetrics metric-registry)]
(testing "get-client-metrics and get-client-metrics data return only http client metrics"
(is (= 11 (count all-metrics)))
(is (= 10 (count client-metrics)))
(is (= 10 (count client-metrics-data))))
(testing "get-client-metrics returns a map of metric name to timer instance"
(is (= (set (list hello-name hello-name-with-method short-name short-name-with-get
short-name-with-post long-name long-name-with-method
long-foo-name long-foo-bar-name long-foo-bar-baz-name))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics))))
(testing "get-client-metrics-data returns a map of metric name to metrics data"
(let [short-data (get client-metrics-data short-name)
short-data-get (get client-metrics-data short-name-with-get)
short-data-post (get client-metrics-data short-name-with-post)
long-data (get client-metrics-data long-name)]
(is (= short-name (:metric-name short-data)))
(is (= 2 (:count short-data)))
(is (<= 5 (:mean short-data)))
(is (<= 10 (:aggregate short-data)))
(is (= short-name-with-get (:metric-name short-data-get)))
(is (= 1 (:count short-data-get)))
(is (<= 5 (:mean short-data-get)))
(is (<= 5 (:aggregate short-data-get)))
(is (= short-name-with-post (:metric-name short-data-post)))
(is (= 1 (:count short-data-post)))
(is (<= 5 (:mean short-data-post)))
(is (<= 5 (:aggregate short-data-post)))
(is (>= 1 (Math/abs (- (:aggregate short-data)
(+ (:aggregate short-data-get)
(:aggregate short-data-post))))))
(is (= long-name (:metric-name long-data)))
(is (= 1 (:count long-data)))
(is (<= 100 (:mean long-data)))
(is (<= 100 (:aggregate long-data)))
(is (> (:mean long-data) (:mean short-data))))))))
(with-open [client (sync/create-client {})]
(testing "get-client-metrics returns nil if no metrics registry passed in"
(let [response (common/get client hello-url)]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))
(is (= nil (common/get-client-metrics client)))
(is (= nil (common/get-client-metrics-data client)))))))))))
(deftest java-metrics-for-unbuffered-streaming-test
(testlogging/with-test-logging
(let [data (unbuffered-test/generate-data (* 1024 1024))]
(testing "metrics work for a successful request"
(let [metric-registry (MetricRegistry.)]
(testwebserver/with-test-webserver-and-config
(unbuffered-test/successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 20000)
(.setConnectTimeoutMilliseconds 100)
(.setMetricRegistry metric-registry)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
(.setAs ResponseBodyType/UNBUFFERED_STREAM))
response (-> client (.get request-options) .deref)
status (.getStatus response)
body (.getBody response)]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)]
(.read instream buf)
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(Thread/sleep 1000) ;; check that the bytes-read metric takes this into account
(is (= (str data "yyyy") (str "xxxx" (slurp instream))))) ;; Read the rest and validate
(let [client-metrics (.getClientMetrics client)
client-metrics-data (.getClientMetricsData client)
base-metric-name (str "puppetlabs.http-client.experimental.with-url.http://localhost:" port "/hello")
bytes-read-name (str base-metric-name ".bytes-read")
bytes-read-name-with-method (str base-metric-name ".GET" ".bytes-read")]
(is (= (set (list bytes-read-name bytes-read-name-with-method))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics)))
(let [bytes-read-data (get client-metrics-data bytes-read-name)]
(is (every? #(instance? ClientMetricData %) (vals client-metrics-data)))
(is (= 1 (.getCount bytes-read-data)))
(is (= bytes-read-name (.getMetricName bytes-read-data)))
(is (<= 1000 (.getMean bytes-read-data)))
(is (<= 1000 (.getAggregate bytes-read-data))))))))))
(testing "metrics work for failed request"
(try
(testwebserver/with-test-webserver-and-config
(unbuffered-test/blocking-handler data) port {:shutdown-timeout-seconds 1}
(let [metric-registry (MetricRegistry.)]
(with-open [client (-> (ClientOptions.)
(.setSocketTimeoutMilliseconds 200)
(.setConnectTimeoutMilliseconds 100)
(.setMetricRegistry metric-registry)
(Async/createClient))]
(let [request-options (doto (RequestOptions. (str "http://localhost:" port "/hello"))
(.setAs ResponseBodyType/UNBUFFERED_STREAM))
response (-> client (.get request-options) .deref)
error (.getError response)
body (.getBody response)]
(is (nil? error))
(is (thrown? SocketTimeoutException (slurp body)))
(let [client-metrics (.getClientMetrics client)
client-metrics-data (.getClientMetricsData client)
base-metric-name (str "puppetlabs.http-client.experimental.with-url.http://localhost:" port "/hello")
bytes-read-name (str base-metric-name ".bytes-read")
bytes-read-name-with-method (str base-metric-name ".GET" ".bytes-read")]
(is (= (set (list bytes-read-name bytes-read-name-with-method))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics)))
(let [bytes-read-data (get client-metrics-data bytes-read-name)]
(is (every? #(instance? ClientMetricData %) (vals client-metrics-data)))
(is (= 1 (.getCount bytes-read-data)))
(is (= bytes-read-name (.getMetricName bytes-read-data)))
(is (<= 200 (.getMean bytes-read-data)))
(is (<= 200 (.getAggregate bytes-read-data)))))))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
))))))
(deftest clojure-metrics-for-unbuffered-streaming-test
(testlogging/with-test-logging
(let [data (unbuffered-test/generate-data (* 1024 1024))
opts {:as :unbuffered-stream}]
(testing "metrics work for a successful request"
(let [metric-registry (MetricRegistry.)]
(testwebserver/with-test-webserver-and-config
(unbuffered-test/successful-handler data nil) port {:shutdown-timeout-seconds 1}
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
:socket-timeout-milliseconds 20000
:metric-registry metric-registry})]
(let [response @(common/get client (str "http://localhost:" port "/hello") opts)
{:keys [status body]} response]
(is (= 200 status))
(let [instream body
buf (make-array Byte/TYPE 4)]
(.read instream buf)
(is (= "xxxx" (String. buf "UTF-8"))) ;; Make sure we can read a few chars off of the stream
(Thread/sleep 1000) ;; check that the bytes-read metric takes this into account
(is (= (str data "yyyy") (str "xxxx" (slurp instream))))) ;; Read the rest and validate
(let [client-metrics (common/get-client-metrics client)
client-metrics-data (common/get-client-metrics-data client)
base-metric-name (str "puppetlabs.http-client.experimental.with-url.http://localhost:" port "/hello")
bytes-read-name (str base-metric-name ".bytes-read")
bytes-read-name-with-method (str base-metric-name ".GET" ".bytes-read")]
(is (= (set (list bytes-read-name bytes-read-name-with-method))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics)))
(let [bytes-read-data (get client-metrics-data bytes-read-name)]
(is (= {:count 1 :metric-name bytes-read-name}
(select-keys bytes-read-data [:metric-name :count])))
(is (<= 1000 (:mean bytes-read-data)))
(is (<= 1000 (:aggregate bytes-read-data))))))))))
(testing "metrics work for a failed request"
(try
(testwebserver/with-test-webserver-and-config
(unbuffered-test/blocking-handler data) port {:shutdown-timeout-seconds 1}
(let [metric-registry (MetricRegistry.)]
(with-open [client (async/create-client {:connect-timeout-milliseconds 100
:socket-timeout-milliseconds 200
:metric-registry metric-registry})]
(let [response @(common/get client (str "http://localhost:" port "/hello") opts)
{:keys [body error]} response]
(is (nil? error))
;; Consume the body to get the exception
(is (thrown? SocketTimeoutException (slurp body))))
(let [client-metrics (common/get-client-metrics client)
client-metrics-data (common/get-client-metrics-data client)
base-metric-name (str "puppetlabs.http-client.experimental.with-url.http://localhost:" port "/hello")
bytes-read-name (str base-metric-name ".bytes-read")
bytes-read-name-with-method (str base-metric-name ".GET" ".bytes-read")]
(is (= (set (list bytes-read-name bytes-read-name-with-method))
(set (keys client-metrics))
(set (keys client-metrics-data))))
(is (every? #(instance? Timer %) (vals client-metrics)))
(let [bytes-read-data (get client-metrics-data bytes-read-name)]
(is (= {:count 1 :metric-name bytes-read-name}
(select-keys bytes-read-data [:metric-name :count])))
(is (<= 200 (:mean bytes-read-data)))
(is (<= 200 (:aggregate bytes-read-data))))))))
(catch TimeoutException e
;; Expected whenever a server-side failure is generated
))))))