(TK-23) Port to apache HttpAsyncClient
Kevin Corcoran 2014-07-09 09:39:19 -07:00
commit 59a676f107
28 changed files with 1416 additions and 747 deletions

@ -1,7 +1,7 @@
(def ks-version "0.6.0")
(def tk-version "0.3.10")
(def ks-version "0.7.2")
(def tk-version "0.4.2")
(defproject puppetlabs/http-client "0.1.8-SNAPSHOT"
(defproject puppetlabs/http-client "0.2.0-SNAPSHOT"
:description "HTTP client wrapper"
:license {:name "Apache License, Version 2.0"
:url ""}
@ -12,15 +12,18 @@
:pedantic? :abort
:dependencies [[org.clojure/clojure "1.5.1"]
[http-kit "2.1.16"]
[puppetlabs/certificate-authority "0.1.5"]
[org.clojure/tools.logging "0.2.6"]
[org.slf4j/slf4j-api "1.7.6"]]
[puppetlabs/kitchensink ~ks-version]
[org.slf4j/slf4j-api "1.7.6"]
[org.apache.httpcomponents/httpasyncclient "4.0.1"]
[org.apache.httpcomponents/httpcore "4.3.2"]
[commons-io "2.1"]
[prismatic/schema "0.2.1"]]
:source-paths ["src/clj"]
:java-source-paths ["src/java"]
:jar-exclusions [#".*\.java$"]
:javac-options ["-target" "1.6" "-source" "1.6" "-Xlint:-options"]
;; By declaring a classifier here and a corresponding profile below we'll get an additional jar
;; during `lein jar` that has all the source code (including the java source). Downstream projects can then
@ -30,7 +33,9 @@
:profiles {:dev {:dependencies [[puppetlabs/kitchensink ~ks-version :classifier "test"]
[puppetlabs/trapperkeeper ~tk-version]
[puppetlabs/trapperkeeper ~tk-version :classifier "test"]
[puppetlabs/trapperkeeper-webserver-jetty9 "0.5.0"]]}
[puppetlabs/trapperkeeper-webserver-jetty9 "0.5.0"]
[spyscope "0.1.4"]]
:injections [(require 'spyscope.core)]}
:sources-jar {:java-source-paths ^:replace []
:jar-exclusions ^:replace []
:source-paths ^:replace ["src/clj" "src/java"]}}

@ -1,6 +1,5 @@
;; This namespace is a thin wrapper around the http client functionality provided
;; by org.httpkit.client. It makes SSL configuration more flexible, and allows
;; the use of PEM files.
;; This namespace is a wrapper around the http client functionality provided
;; by Apache HttpAsyncClient. It allows the use of PEM files for HTTPS configuration.
;; In the options to any request method, an existing SSLContext object can be
;; supplied under :ssl-context. If this is present it will be used. If it's
@ -13,20 +12,36 @@
;; these methods.
(ns puppetlabs.http.client.async
(:require [org.httpkit.client :as http]
[puppetlabs.certificate-authority.core :as ssl])
(:import (com.puppetlabs.http.client HttpMethod HttpClientException)
(org.apache.http.nio.client HttpAsyncClient)
(org.apache.http.impl.nio.client HttpAsyncClients)
(org.apache.http.client.methods HttpGet HttpHead HttpPost HttpPut HttpTrace HttpDelete HttpOptions HttpPatch)
(org.apache.http.concurrent FutureCallback)
(org.apache.http.message BasicHeader)
(org.apache.http Header)
(org.apache.http.nio.entity NStringEntity)
(org.apache.http.entity InputStreamEntity ContentType)
( InputStream)
(com.puppetlabs.http.client.impl Compression))
(:require [puppetlabs.certificate-authority.core :as ssl]
[clojure.string :as str]
[puppetlabs.kitchensink.core :as ks]
[puppetlabs.http.client.schemas :as schemas]
[schema.core :as schema]
[ :as log])
(:refer-clojure :exclude (get)))
;;; Private SSL configuration functions
;;; Private SSL configuration functions
(defn- initialize-ssl-context-from-pems
(-> req
(assoc :ssl-context (ssl/pems->ssl-context
(:ssl-cert req)
(:ssl-key req)
(:ssl-ca-cert req)))
(dissoc :ssl-cert :ssl-key :ssl-ca-cert)))
(assoc :ssl-context (ssl/pems->ssl-context
(:ssl-cert req)
(:ssl-key req)
(:ssl-ca-cert req)))
(dissoc :ssl-cert :ssl-key :ssl-ca-cert)))
(defn- initialize-ssl-context-from-ca-pem
@ -35,89 +50,289 @@
(:ssl-ca-cert req)))
(dissoc :ssl-ca-cert)))
(defn- configure-ssl-from-context
"Configures an SSLEngine in the request starting from an SSLContext"
(-> req
(assoc :sslengine (.createSSLEngine (:ssl-context req)))
(dissoc :ssl-context)))
(defn- configure-ssl-from-pems
"Configures an SSLEngine in the request starting from a set of PEM files"
(-> req
(initialize-ssl-context-from-pems req))
(defn- configure-ssl-from-ca-pem
"Configures an SSLEngine in the request starting from a CA PEM file"
(-> req
(initialize-ssl-context-from-ca-pem req))
(defn configure-ssl
"Configures a request map to have an SSLEngine. It will use an existing one
if already present, , then use an SSLContext (stored in :ssl-context) if
that is present, and will fall back to a set of PEM files (stored in
:ssl-cert, :ssl-key, and :ssl-ca-cert) if those are present. If none of
these are present this does not modify the request map."
(schema/defn configure-ssl :- (schema/either {} schemas/SslContextOptions)
"Configures a request map to have an SSLContext. It will use an existing one
(stored in :ssl-context) if already present, and will fall back to a set of
PEM files (stored in :ssl-cert, :ssl-key, and :ssl-ca-cert) if those are present.
If none of these are present this does not modify the request map."
[opts :- schemas/SslOptions]
(:sslengine req) req
(:ssl-context req) (configure-ssl-from-context req)
(every? (partial req) [:ssl-cert :ssl-key :ssl-ca-cert]) (configure-ssl-from-pems req)
(:ssl-ca-cert req) (configure-ssl-from-ca-pem req)
:else req))
(:ssl-context opts) opts
(every? opts [:ssl-cert :ssl-key :ssl-ca-cert]) (configure-ssl-from-pems opts)
(:ssl-ca-cert opts) (configure-ssl-from-ca-pem opts)
:else opts))
(defn- check-url! [url]
(when (nil? url)
(throw (IllegalArgumentException. "Host URL cannot be nil"))))
;;; Private utility functions
(defn request
[opts callback]
(check-url! (:url opts))
(http/request (configure-ssl opts) callback))
(defn- add-accept-encoding-header
[decompress-body? headers]
(if (and decompress-body?
(not (contains? headers "accept-encoding")))
(assoc headers "accept-encoding" (BasicHeader. "accept-encoding" "gzip, deflate"))
(defn- wrap-with-ssl-config
(fn wrapped-fn
(wrapped-fn url {} nil))
(defn- prepare-headers
[{:keys [headers decompress-body]}]
(->> headers
(fn [acc [k v]]
(assoc acc (str/lower-case k) (BasicHeader. k v)))
(add-accept-encoding-header decompress-body)
(into-array Header)))
([url callback-or-opts]
(if (map? callback-or-opts)
(wrapped-fn url callback-or-opts nil)
(wrapped-fn url {} callback-or-opts)))
(defn- coerce-opts
[{:keys [url body] :as opts}]
{:url url
:method (clojure.core/get opts :method :get)
:headers (prepare-headers opts)
:body (cond
(string? body) (NStringEntity. body)
(instance? InputStream body) (InputStreamEntity. body)
:else body)})
([url opts callback]
(check-url! url)
(method url (configure-ssl opts) callback))))
(defn- construct-request
[method url]
(condp = method
:get (HttpGet. url)
:head (HttpHead. url)
:post (HttpPost. url)
:put (HttpPut. url)
:delete (HttpDelete. url)
:trace (HttpTrace. url)
:options (HttpOptions. url)
:patch (HttpPatch. url)
(throw (IllegalArgumentException. (format "Unsupported request method: %s" method)))))
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} get
"Issue an async HTTP GET request."
(wrap-with-ssl-config http/get))
(defn- get-resp-headers
(fn [acc h]
(assoc acc (.. h getName toLowerCase) (.getValue h)))
(.getAllHeaders http-response)))
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} head
"Issue an async HTTP HEAD request."
(wrap-with-ssl-config http/head))
(defmulti decompress (fn [resp] (get-in resp [:headers "content-encoding"])))
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} post
"Issue an async HTTP POST request."
(wrap-with-ssl-config http/post))
(defmethod decompress "gzip"
(-> resp
(ks/dissoc-in [:headers "content-encoding"])
(update-in [:body] #(Compression/gunzip %))))
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} put
"Issue an async HTTP PUT request."
(wrap-with-ssl-config http/put))
(defmethod decompress "deflate"
(-> resp
(ks/dissoc-in [:headers "content-encoding"])
(update-in [:body] #(Compression/inflate %))))
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} delete
"Issue an async HTTP DELETE request."
(wrap-with-ssl-config http/delete))
(defmethod decompress nil
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} options
"Issue an async HTTP OPTIONS request."
(wrap-with-ssl-config http/options))
(defn- parse-content-type
(if (empty? content-type-header)
(let [content-type (ContentType/parse content-type-header)]
{:mime-type (.getMimeType content-type)
:charset (.getCharset content-type)})))
(def ^{:arglists '([url] [url callback-or-opts] [url opts callback])} patch
"Issue an async HTTP PATCH request."
(wrap-with-ssl-config http/patch))
(defmulti coerce-body-type (fn [resp] (get-in resp [:opts :as])))
(defmethod coerce-body-type :text
(let [charset (or (get-in resp [:content-type-params :charset] "UTF-8"))]
(assoc resp :body (slurp (:body resp) :encoding charset))))
(defn- response-map
[opts http-response]
(let [headers (get-resp-headers http-response)
orig-encoding (headers "content-encoding")]
{:opts opts
:orig-content-encoding orig-encoding
:status (.. http-response getStatusLine getStatusCode)
:headers headers
:content-type (parse-content-type (headers "content-type"))
:body (when-let [entity (.getEntity http-response)]
(.getContent entity))}))
(schema/defn error-response :- schemas/ErrorResponse
[opts :- schemas/UserRequestOptions
e :- Exception]
{:opts opts
:error e})
(schema/defn callback-response :- schemas/Response
[opts :- schemas/UserRequestOptions
callback :- schemas/ResponseCallbackFn
response :- schemas/Response]
(if callback
(callback response)
(catch Exception e
(error-response opts e)))
(schema/defn deliver-result
[client :- schemas/Client
result :- schemas/ResponsePromise
opts :- schemas/UserRequestOptions
callback :- schemas/ResponseCallbackFn
response :- schemas/Response]
(deliver result (callback-response opts callback response))
(.close client))))
(schema/defn future-callback
[client :- schemas/Client
result :- schemas/ResponsePromise
opts :- schemas/UserRequestOptions
callback :- schemas/ResponseCallbackFn]
(reify FutureCallback
(completed [this http-response]
(let [response (cond-> (response-map opts http-response)
(:decompress-body opts) (decompress)
(not= :stream (:as opts)) (coerce-body-type))]
(deliver-result client result opts callback response))
(catch Exception e
(log/warn e "Error when delivering response")
(deliver-result client result opts callback
(error-response opts e)))))
(failed [this e]
(deliver-result client result opts callback
(error-response opts e)))
(cancelled [this]
(deliver-result client result opts callback
(HttpClientException. "Request cancelled"))))))
(schema/defn extract-client-opts :- schemas/ClientOptions
[opts :- schemas/UserRequestOptions]
(select-keys opts [:ssl-context :ssl-ca-cert :ssl-cert :ssl-key]))
(schema/defn create-client :- schemas/Client
[opts :- schemas/ClientOptions]
(let [opts (configure-ssl opts)
client (if (:ssl-context opts)
(.. (HttpAsyncClients/custom) (setSSLContext (:ssl-context opts)) build)
(.start client)
;;; Public
(schema/defn ^:always-validate request :- schemas/ResponsePromise
"Issues an async HTTP request and returns a promise object to which the value
of `(callback {:opts _ :status _ :headers _ :body _})` or
`(callback {:opts _ :error _})` will be delivered.
When unspecified, `callback` is the identity function.
Request options:
* :url
* :method - the HTTP method (:get, :head, :post, :put, :delete, :options, :patch
* :headers - a map of headers
* :body - the body; may be a String or any type supported by clojure's reader
* :decompress-body - if `true`, an 'accept-encoding' header with a value of
'gzip, deflate' will be added to the request, and the response will be
automatically decompressed if it contains a recognized 'content-encoding'
header. defaults to `true`.
:as - used to control the data type of the response body. Supported values are
`:text` and `:stream`, which will return a `String` or an `InputStream`,
respectively. Defaults to `:stream`.
SSL options:
* :ssl-context - an instance of SSLContext
* :ssl-cert - path to a PEM file containing the client cert
* :ssl-key - path to a PEM file containing the client private key
* :ssl-ca-cert - path to a PEM file containing the CA cert"
([opts :- schemas/RawUserRequestOptions]
(request opts nil))
([opts :- schemas/RawUserRequestOptions
callback :- schemas/ResponseCallbackFn]
(let [defaults {:headers {}
:body nil
:decompress-body true
:as :stream}
opts (merge defaults opts)
client-opts (extract-client-opts opts)
client (create-client client-opts)
{:keys [method url body] :as coerced-opts} (coerce-opts opts)
request (construct-request method url)
result (promise)]
(.setHeaders request (:headers coerced-opts))
(when body
(.setEntity request body))
(.execute client request
(future-callback client result opts callback))
(defn get
"Issue an asynchronous HTTP GET request. This will raise an exception if an
error is returned."
([url] (get url {}))
([url opts] (request (assoc opts :method :get :url url))))
(defn head
"Issue an asynchronous HTTP head request. This will raise an exception if an
error is returned."
([url] (head url {}))
([url opts] (request (assoc opts :method :head :url url))))
(defn post
"Issue an asynchronous HTTP POST request. This will raise an exception if an
error is returned."
([url] (post url {}))
([url opts] (request (assoc opts :method :post :url url))))
(defn put
"Issue an asynchronous HTTP PUT request. This will raise an exception if an
error is returned."
([url] (put url {}))
([url opts] (request (assoc opts :method :put :url url))))
(defn delete
"Issue an asynchronous HTTP DELETE request. This will raise an exception if an
error is returned."
([url] (delete url {}))
([url opts] (request (assoc opts :method :delete :url url))))
(defn trace
"Issue an asynchronous HTTP TRACE request. This will raise an exception if an
error is returned."
([url] (trace url {}))
([url opts] (request (assoc opts :method :trace :url url))))
(defn options
"Issue an asynchronous HTTP OPTIONS request. This will raise an exception if an
error is returned."
([url] (options url {}))
([url opts] (request (assoc opts :method :options :url url))))
(defn patch
"Issue an asynchronous HTTP PATCH request. This will raise an exception if an
error is returned."
([url] (patch url {}))
([url opts] (request (assoc opts :method :patch :url url))))

@ -0,0 +1,98 @@
(ns puppetlabs.http.client.schemas
(:import ( URL)
( SSLContext)
(org.apache.http.impl.nio.client CloseableHttpAsyncClient)
(clojure.lang IBlockingDeref)
( InputStream)
(java.nio.charset Charset))
(:require [schema.core :as schema]))
;;; Schemas
(def ok schema/optional-key)
(def UrlOrString (schema/either schema/Str URL))
;; TODO: replace this with a protocol
(def Client CloseableHttpAsyncClient)
(def Headers
{schema/Str schema/Str})
(def Body
(schema/maybe (schema/either String InputStream)))
(def BodyType
(schema/enum :text :stream))
(def RawUserRequestOptions
{:url UrlOrString
:method schema/Keyword
(ok :headers) Headers
(ok :body) Body
(ok :decompress-body) schema/Bool
(ok :as) BodyType
(ok :ssl-context) SSLContext
(ok :ssl-cert) UrlOrString
(ok :ssl-key) UrlOrString
(ok :ssl-ca-cert) UrlOrString})
(def RequestOptions
{:url UrlOrString
:method schema/Keyword
:headers Headers
:body Body
:decompress-body schema/Bool
:as BodyType})
(def SslContextOptions
{:ssl-context SSLContext})
(def SslCaCertOptions
{:ssl-ca-cert UrlOrString})
(def SslCertOptions
{:ssl-cert UrlOrString
:ssl-key UrlOrString
:ssl-ca-cert UrlOrString})
(def SslOptions
(schema/either {} SslContextOptions SslCertOptions SslCaCertOptions))
(def UserRequestOptions
(merge RequestOptions SslContextOptions)
(merge RequestOptions SslCaCertOptions)
(merge RequestOptions SslCertOptions)))
(def ClientOptions
(def ResponseCallbackFn
(schema/maybe (schema/pred ifn?)))
(def ResponsePromise
(def ContentType
(schema/maybe {:mime-type schema/Str
:charset (schema/maybe Charset)}))
(def NormalResponse
{:opts UserRequestOptions
:orig-content-encoding (schema/maybe schema/Str)
:body Body
:headers Headers
:status schema/Int
:content-type ContentType})
(def ErrorResponse
{:opts UserRequestOptions
:error Exception})
(def Response
(schema/either NormalResponse ErrorResponse))

@ -42,6 +42,12 @@
([url] (delete url {}))
([url opts] (request (assoc opts :method :delete :url url))))
(defn trace
"Issue a synchronous HTTP TRACE request. This will raise an exception if an
error is returned."
([url] (trace url {}))
([url opts] (request (assoc opts :method :trace :url url))))
(defn options
"Issue a synchronous HTTP OPTIONS request. This will raise an exception if an
error is returned."

@ -0,0 +1,73 @@
package com.puppetlabs.http.client;
import com.puppetlabs.http.client.impl.JavaClient;
import com.puppetlabs.http.client.impl.Promise;
import com.puppetlabs.http.client.impl.SslUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncHttpClient {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncHttpClient.class);
public static Promise<Response> request(RequestOptions options) {
options = SslUtils.configureSsl(options);
return JavaClient.request(options, null);
public static Promise<Response> get(String url) {
return get(new RequestOptions(url));
public static Promise<Response> get(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.GET));
public static Promise<Response> head(String url) {
return head(new RequestOptions(url));
public static Promise<Response> head(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.HEAD));
public static Promise<Response> post(String url) {
return post(new RequestOptions(url));
public static Promise<Response> post(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.POST));
public static Promise<Response> put(String url) {
return put(new RequestOptions(url));
public static Promise<Response> put(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.PUT));
public static Promise<Response> delete(String url) {
return delete(new RequestOptions(url));
public static Promise<Response> delete(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.DELETE));
public static Promise<Response> trace(String url) {
return trace(new RequestOptions(url));
public static Promise<Response> trace(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.TRACE));
public static Promise<Response> options(String url) {
return options(new RequestOptions(url));
public static Promise<Response> options(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.OPTIONS));
public static Promise<Response> patch(String url) {
return patch(new RequestOptions(url));
public static Promise<Response> patch(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.PATCH));

@ -1,6 +1,9 @@
package com.puppetlabs.http.client;
public class HttpClientException extends RuntimeException {
public HttpClientException(String msg) {
public HttpClientException(String msg, Throwable t) {
super(msg, t);

@ -1,28 +1,25 @@
package com.puppetlabs.http.client;
// This is really dumb, but I didn't want to leak the HTTPKit class into the
// API for now.
import org.apache.http.client.methods.*;
public enum HttpMethod {
private Class<? extends HttpRequestBase> httpMethod;
private org.httpkit.HttpMethod httpKitMethod;
HttpMethod(org.httpkit.HttpMethod httpKitMethod) {
this.httpKitMethod = httpKitMethod;
HttpMethod(Class<? extends HttpRequestBase> httpMethod) {
this.httpMethod = httpMethod;
public org.httpkit.HttpMethod getValue() {
return this.httpKitMethod;
public Class<? extends HttpRequestBase> getValue() {
return this.httpMethod;

@ -1,46 +0,0 @@
package com.puppetlabs.http.client;
import com.puppetlabs.http.client.HttpResponse;
import com.puppetlabs.http.client.RequestOptions;
import java.util.Map;
public class HttpResponse {
private RequestOptions options;
private Throwable error;
private Object body;
private Map<String, Object> headers;
private Integer status;
public HttpResponse(RequestOptions options, Throwable error) {
this.options = options;
this.error = error;
public HttpResponse(RequestOptions options, Object body, Map<String, Object> headers, int status) {
this.options = options;
this.body = body;
this.headers = headers;
this.status = status;
public RequestOptions getOptions() {
return options;
public Throwable getError() {
return error;
public Object getBody() {
return body;
public Map<String, Object> getHeaders() {
return headers;
public Integer getStatus() {
return status;

@ -1,60 +1,37 @@
package com.puppetlabs.http.client;
import com.puppetlabs.http.client.impl.*;
import org.httpkit.client.HttpClient;
import org.httpkit.client.IFilter;
import org.httpkit.client.MultipartEntity;
import org.apache.http.nio.client.HttpAsyncClient;
//import org.httpkit.client.HttpClient;
//import org.httpkit.client.IFilter;
//import org.httpkit.client.MultipartEntity;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class RequestOptions {
private HttpClient client = DefaultClient.getInstance();
private int timeout = 60000;
private boolean followRedirects = true;
private int maxRedirects = 10;
// TODO: we are technically leaking this http-kit class into our API,
// but since we're not using it anywhere I decided not to worry about it yet.
private IFilter filter = IFilter.ACCEPT_ALL;
private ExecutorService workerPool = DefaultWorkerPool.getInstance();
private Promise<HttpResponse> promise = new Promise<HttpResponse>();
private int keepalive = 120000;
private ResponseBodyType as = ResponseBodyType.AUTO;
private HttpAsyncClient client = null;
private String url;
private HttpMethod method = null;
private List<String> traceRedirects = new ArrayList<String>();
private Map<String, Object> headers;
private Map<String, String> formParams;
private BasicAuth basicAuth;
private String oauthToken;
private String userAgent;
private Map<String, String> queryParams;
private SSLEngine sslEngine;
private Map<String, String> headers;
private SSLContext sslContext;
private String sslCert;
private String sslKey;
private String sslCaCert;
private boolean insecure = false;
private Object body;
// TODO: we are technically leaking this http-kit class into our API,
// but since we're not using it anywhere I decided not to worry about it yet.
private List<MultipartEntity> multipartEntities;
private boolean decompressBody = true;
private ResponseBodyType as = ResponseBodyType.STREAM;
public RequestOptions(String url) {
this.url = url;
public HttpClient getClient() {
public HttpAsyncClient getClient() {
return client;
public RequestOptions setClient(HttpClient client) {
public RequestOptions setClient(HttpAsyncClient client) {
this.client = client;
return this;
@ -67,46 +44,6 @@
return this;
public int getTimeout() {
return timeout;
public RequestOptions setTimeout(int timeout) {
this.timeout = timeout;
return this;
public int getKeepalive() {
return keepalive;
public RequestOptions setKeepalive(int keepalive) {
this.keepalive = keepalive;
return this;
public boolean getFollowRedirects() {
return followRedirects;
public RequestOptions setFollowRedirects(boolean followRedirects) {
this.followRedirects = followRedirects;
return this;
public int getMaxRedirects() {
return maxRedirects;
public RequestOptions setMaxRedirects(int maxRedirects) {
this.maxRedirects = maxRedirects;
return this;
public ResponseBodyType getAs() {
return as;
public RequestOptions setAs(ResponseBodyType as) { = as;
return this;
public HttpMethod getMethod() {
return method;
@ -115,90 +52,14 @@
return this;
public IFilter getFilter() {
return filter;
public RequestOptions setFilter(IFilter filter) {
this.filter = filter;
return this;
public ExecutorService getWorkerPool() {
return workerPool;
public Promise<HttpResponse> getPromise() {
return this.promise;
public RequestOptions setPromise(Promise<HttpResponse> promise) {
this.promise = promise;
return this;
public List<String> getTraceRedirects() {
return traceRedirects;
public RequestOptions addTraceRedirect(String url) {
return this;
public Map<String, Object> getHeaders() {
public Map<String, String> getHeaders() {
return headers;
public RequestOptions setHeaders(Map<String, Object> headers) {
public RequestOptions setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
public Map<String, String> getFormParams() {
return formParams;
public RequestOptions setFormParams(Map<String, String> formParams) {
this.formParams = formParams;
return this;
public BasicAuth getBasicAuth() {
return basicAuth;
public RequestOptions setBasicAuth(BasicAuth basicAuth) {
this.basicAuth = basicAuth;
return this;
public String getOAuthToken() {
return oauthToken;
public RequestOptions setOAuthToken(String oauthToken) {
this.oauthToken = oauthToken;
return this;
public String getUserAgent() {
return userAgent;
public RequestOptions setUserAgent(String userAgent) {
this.userAgent = userAgent;
return this;
public Map<String, String> getQueryParams() {
return queryParams;
public RequestOptions setQueryParams(Map<String, String> queryParams) {
this.queryParams = queryParams;
return this;
public SSLEngine getSslEngine() {
return sslEngine;
public RequestOptions setSslEngine(SSLEngine sslEngine) {
this.sslEngine = sslEngine;
return this;
public SSLContext getSslContext() {
return sslContext;
@ -247,12 +108,17 @@
return this;
public List<MultipartEntity> getMultipartEntities() {
return multipartEntities;
public RequestOptions setMultipartEntities(List<MultipartEntity> entities) {
this.multipartEntities = entities;
public boolean getDecompressBody() { return decompressBody; }
public RequestOptions setDecompressBody(boolean decompressBody) {
this.decompressBody = decompressBody;
return this;
public ResponseBodyType getAs() {
return as;
public RequestOptions setAs(ResponseBodyType as) { = as;
return this;

@ -0,0 +1,56 @@
package com.puppetlabs.http.client;
import com.puppetlabs.http.client.RequestOptions;
import org.apache.http.entity.ContentType;
import java.util.Map;
public class Response {
private RequestOptions options;
private String origContentEncoding;
private Throwable error;
private Object body;
private Map<String, String> headers;
private Integer status;
private ContentType contentType;
public Response(RequestOptions options, Throwable error) {
this.options = options;
this.error = error;
public Response(RequestOptions options, String origContentEncoding,
Object body, Map<String, String> headers, int status,
ContentType contentType) {
this.options = options;
this.origContentEncoding = origContentEncoding;
this.body = body;
this.headers = headers;
this.status = status;
this.contentType = contentType;
public RequestOptions getOptions() {
return options;
public String getOrigContentEncoding() { return origContentEncoding; }
public Throwable getError() {
return error;
public Object getBody() {
return body;
public Map<String, String> getHeaders() {
return headers;
public Integer getStatus() {
return status;
public ContentType getContentType() { return contentType; }

@ -1,19 +1,6 @@
package com.puppetlabs.http.client;
public enum ResponseBodyType {
private int value;
ResponseBodyType(int value) {
this.value = value;
public int getValue() {
return this.value;

@ -3,6 +3,7 @@ package com.puppetlabs.http.client;
import com.puppetlabs.certificate_authority.CertificateAuthority;
import com.puppetlabs.http.client.impl.JavaClient;
import com.puppetlabs.http.client.impl.Promise;
import com.puppetlabs.http.client.impl.SslUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -23,90 +24,16 @@ public class SyncHttpClient {
throw new HttpClientException(msg, t);
private static RequestOptions configureSslFromContext(RequestOptions options) {
return options;
// TODO: move this into the async java API if we ever add one
private static RequestOptions configureSsl(RequestOptions options) {
if (options.getSslEngine() != null) {
return options;
if (options.getSslContext() != null) {
return configureSslFromContext(options);
if ((options.getSslCert() != null) &&
(options.getSslKey() != null) &&
(options.getSslCaCert() != null)) {
try {
new FileReader(options.getSslCert()),
new FileReader(options.getSslKey()),
new FileReader(options.getSslCaCert()))
} catch (KeyStoreException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (CertificateException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (IOException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (NoSuchAlgorithmException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (KeyManagementException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (UnrecoverableKeyException e) {
logAndRethrow("Error while configuring SSL", e);
return configureSslFromContext(options);
if (options.getSslCaCert() != null) {
try {
new FileReader(options.getSslCaCert()))
} catch (KeyStoreException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (CertificateException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (IOException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (NoSuchAlgorithmException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (KeyManagementException e) {
logAndRethrow("Error while configuring SSL", e);
return configureSslFromContext(options);
return options;
public static HttpResponse request(RequestOptions options) {
public static Response request(RequestOptions options) {
// TODO: if we end up implementing an async version of the java API,
// we should refactor this implementation so that it is based on the
// async one, as Patrick has done in the clojure API.
options = configureSsl(options);
options = SslUtils.configureSsl(options);
Promise<HttpResponse> promise = null;
try {
promise = JavaClient.request(options, null);
} catch (IOException e) {
logAndRethrow("Error submitting http request", e);
Promise<Response> promise = JavaClient.request(options, null);
HttpResponse response = null;
Response response = null;
try {
response = promise.deref();
} catch (InterruptedException e) {
@ -119,19 +46,59 @@ public class SyncHttpClient {
public static HttpResponse get(String url) {
public static Response get(String url) {
return get(new RequestOptions(url));
public static HttpResponse get(RequestOptions requestOptions) {
public static Response get(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.GET));
public static HttpResponse post(String url) {
return post(new RequestOptions(url));
public static Response head(String url) {
return head(new RequestOptions(url));
public static Response head(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.HEAD));
public static HttpResponse post(RequestOptions requestOptions) {
public static Response post(String url) {
return post(new RequestOptions(url));
public static Response post(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.POST));
public static Response put(String url) {
return put(new RequestOptions(url));
public static Response put(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.PUT));
public static Response delete(String url) {
return delete(new RequestOptions(url));
public static Response delete(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.DELETE));
public static Response trace(String url) {
return trace(new RequestOptions(url));
public static Response trace(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.TRACE));
public static Response options(String url) {
return options(new RequestOptions(url));
public static Response options(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.OPTIONS));
public static Response patch(String url) {
return patch(new RequestOptions(url));
public static Response patch(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.PATCH));

@ -1,19 +0,0 @@
package com.puppetlabs.http.client.impl;
public class BasicAuth {
private final String user;
private final String password;
public BasicAuth(String user, String password) {
this.user = user;
this.password = password;
public String getUser() {
return user;
public String getPassword() {
return password;

@ -1,28 +1,29 @@
package com.puppetlabs.http.client.impl;
import org.httpkit.HttpMethod;
import com.puppetlabs.http.client.HttpMethod;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import java.util.Map;
public class CoercedRequestOptions {
private final String url;
private final HttpMethod method;
private final Map<String, Object> headers;
private final Object body;
private final SSLEngine sslEngine;
private final Header[] headers;
private final HttpEntity body;
private final SSLContext sslContext;
public CoercedRequestOptions(String url,
HttpMethod method,
Map<String, Object> headers,
Object body,
SSLEngine sslEngine) {
Header[] headers,
HttpEntity body,
SSLContext sslContext) {
this.url = url;
this.method = method;
this.headers = headers;
this.body = body;
this.sslEngine = sslEngine;
this.sslContext = sslContext;
public String getUrl() {
@ -33,15 +34,15 @@ public class CoercedRequestOptions {
return method;
public Map<String, Object> getHeaders() {
public Header[] getHeaders() {
return headers;
public Object getBody() {
public HttpEntity getBody() {
return body;
public SSLEngine getSslEngine() {
return sslEngine;
public SSLContext getSslContext() {
return sslContext;

@ -0,0 +1,23 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.http.client.HttpClientException;
import java.util.Map;
public class Compression {
public static InputStream gunzip(InputStream gzipped) {
try {
return new GZIPInputStream(gzipped);
} catch (IOException e) {
throw new HttpClientException("Unable to gunzip stream", e);
public static InputStream inflate(InputStream deflated) {
return new InflaterInputStream(deflated);

@ -1,20 +0,0 @@
package com.puppetlabs.http.client.impl;
import org.httpkit.client.HttpClient;
public class DefaultClient {
private static HttpClient instance;
public synchronized static HttpClient getInstance() {
if (instance == null) {
try {
instance = new HttpClient();
} catch (IOException e) {
throw new RuntimeException("Error attempting to instantiate HttpClient", e);
return instance;

@ -1,21 +0,0 @@
package com.puppetlabs.http.client.impl;
import org.httpkit.PrefixThreadFactory;
import java.util.concurrent.*;
public class DefaultWorkerPool {
private static ExecutorService instance;
public static synchronized ExecutorService getInstance() {
if (instance == null) {
int max = Runtime.getRuntime().availableProcessors();
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
PrefixThreadFactory factory = new PrefixThreadFactory("client-worker-");
instance = new ThreadPoolExecutor(0, max, 60, TimeUnit.SECONDS, queue, factory);
return instance;

@ -1,7 +1,7 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.http.client.HttpResponse;
import com.puppetlabs.http.client.Response;
public interface IResponseCallback {
HttpResponse handleResponse(HttpResponse response);
Response handleResponse(Response response);

@ -1,146 +1,275 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.http.client.HttpResponse;
import com.puppetlabs.http.client.RequestOptions;
import org.httpkit.HttpMethod;
import org.httpkit.client.*;
import com.puppetlabs.http.client.*;
import javax.xml.bind.DatatypeConverter;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.*;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import java.util.HashMap;
import java.util.Map;
public class JavaClient {
private static HttpClient defaultClient = null;
private static final String PROTOCOL = "TLS";
private static HttpClient getDefaultClient() throws IOException {
if (defaultClient == null) {
defaultClient = new HttpClient();
private static Header[] prepareHeaders(RequestOptions options) {
Map<String, Header> result = new HashMap<String, Header>();
Map<String, String> origHeaders = options.getHeaders();
if (origHeaders == null) {
origHeaders = new HashMap<String, String>();
return defaultClient;
for (Map.Entry<String, String> entry : origHeaders.entrySet()) {
result.put(entry.getKey().toLowerCase(), new BasicHeader(entry.getKey(), entry.getValue()));
if (options.getDecompressBody() &&
(! origHeaders.containsKey("accept-encoding"))) {
result.put("accept-encoding", new BasicHeader("Accept-Encoding", "gzip, deflate"));
return result.values().toArray(new Header[result.size()]);
private static String buildQueryString(Map<String, String> params) {
// TODO: add support for nested query params. For now we assume a flat,
// String->String data structure.
StringBuilder sb = new StringBuilder();
boolean first = true;
for (Map.Entry<String, String> entry : params.entrySet()) {
if (!first) {
first = false;
try {
sb.append(URLEncoder.encode(entry.getKey(), "utf8"));
sb.append(URLEncoder.encode(entry.getValue(), "utf8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Error while url-encoding query string", e);
return sb.toString();
private static CoercedRequestOptions coerceRequestOptions(RequestOptions options) {
String url = options.getUrl();
private static String getBasicAuthValue(BasicAuth auth) {
String userPasswordStr = auth.getUser() + ":" + auth.getPassword();
try {
return "Basic " + DatatypeConverter.printBase64Binary(userPasswordStr.getBytes("utf8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Error while attmempting to encode basic auth", e);
private static Map<String, Object> prepareHeaders(RequestOptions options) {
Map<String, Object> result = new HashMap<String, Object>();
if (options.getHeaders() != null) {
for (Map.Entry<String, Object> entry : options.getHeaders().entrySet()) {
result.put(entry.getKey(), entry.getValue());
if (options.getFormParams() != null) {
result.put("Content-Type", "application/x-www-form-urlencoded");
if (options.getBasicAuth() != null) {
result.put("Authorization", getBasicAuthValue(options.getBasicAuth()));
if (options.getOAuthToken() != null) {
result.put("Authorization", "Bearer " + options.getOAuthToken());
if (options.getUserAgent() != null) {
result.put("User-Agent", options.getUserAgent());
return result;
private static CoercedRequestOptions coerceRequestOptions(RequestOptions options) throws IOException {
String url;
if (options.getQueryParams() != null) {
if (options.getUrl().indexOf('?') == -1) {
url = options.getUrl() + "?" + buildQueryString(options.getQueryParams());
} else {
url = options.getUrl() + "&" + buildQueryString(options.getQueryParams());
} else {
url = options.getUrl();
SSLEngine sslEngine = null;
if (options.getSslEngine() != null) {
sslEngine = options.getSslEngine();
SSLContext sslContext = null;
if (options.getSslContext() != null) {
sslContext = options.getSslContext();
} else if (options.getInsecure()) {
sslEngine = SslContextFactory.trustAnybody();
sslContext = getInsecureSslContext();
HttpMethod method = options.getMethod().getValue();
HttpMethod method = options.getMethod();
if (method == null) {
method = HttpMethod.GET;
Map<String, Object> headers = prepareHeaders(options);
Header[] headers = prepareHeaders(options);
Object body;
if (options.getFormParams() != null) {
body = buildQueryString(options.getFormParams());
} else {
body = options.getBody();
HttpEntity body = null;
if (options.getBody() instanceof String) {
try {
body = new NStringEntity((String)options.getBody());
} catch (UnsupportedEncodingException e) {
throw new HttpClientException("Unable to create request body", e);
} else if (options.getBody() instanceof InputStream) {
body = new InputStreamEntity((InputStream)options.getBody());
if (options.getMultipartEntities() != null) {
String boundary = MultipartEntity.genBoundary(options.getMultipartEntities());
headers = options.getHeaders();
headers.put("Content-Type", "multipart/form-data; boundary=" + boundary);
body = MultipartEntity.encode(boundary, options.getMultipartEntities());
return new CoercedRequestOptions(url, method, headers, body, sslEngine);
return new CoercedRequestOptions(url, method, headers, body, sslContext);
public static Promise<HttpResponse> request(RequestOptions options, IResponseCallback callback)
throws IOException {
HttpClient client = options.getClient();
if (client == null) {
client = getDefaultClient();
private static SSLContext getInsecureSslContext() {
SSLContext context = null;
try {
context = SSLContext.getInstance(PROTOCOL);
} catch (NoSuchAlgorithmException e) {
throw new HttpClientException("Unable to construct HTTP context", e);
try {
context.init(null, new TrustManager[] {
new X509TrustManager() {
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
public void checkClientTrusted(X509Certificate[] chain,
String authType) throws CertificateException {
// Always trust
public void checkServerTrusted(X509Certificate[] chain,
String authType) throws CertificateException {
// Always trust
} catch (KeyManagementException e) {
throw new HttpClientException("Unable to initialize insecure SSL context", e);
return context;
public static Promise<Response> request(final RequestOptions options, final IResponseCallback callback) {
CoercedRequestOptions coercedOptions = coerceRequestOptions(options);
RequestConfig config = new RequestConfig(coercedOptions.getMethod(),
coercedOptions.getHeaders(), coercedOptions.getBody(),
options.getTimeout(), options.getKeepalive());
final CloseableHttpAsyncClient client = createClient(coercedOptions);
RespListener listener = new RespListener(
new ResponseHandler(options, coercedOptions, callback), options.getFilter(),
options.getWorkerPool(), options.getAs().getValue());
HttpRequestBase request = constructRequest(coercedOptions.getMethod(),
coercedOptions.getUrl(), coercedOptions.getBody());
client.exec(options.getUrl(), config, coercedOptions.getSslEngine(), listener);
final Promise<Response> promise = new Promise<Response>();
return options.getPromise();
client.execute(request, new FutureCallback<org.apache.http.HttpResponse>() {
public void completed(org.apache.http.HttpResponse httpResponse) {
try {
Object body = null;
HttpEntity entity = httpResponse.getEntity();
if (entity != null) {
body = entity.getContent();
Map<String, String> headers = new HashMap<String, String>();
for (Header h : httpResponse.getAllHeaders()) {
headers.put(h.getName().toLowerCase(), h.getValue());
String origContentEncoding = headers.get("content-encoding");
if (options.getDecompressBody()) {
body = decompress((InputStream)body, headers);
ContentType contentType = null;
if (headers.get("content-type") != null) {
contentType = ContentType.parse(headers.get("content-type"));
if (options.getAs() != ResponseBodyType.STREAM) {
body = coerceBodyType((InputStream)body, options.getAs(), contentType);
deliverResponse(client, options,
new Response(options, origContentEncoding, body,
headers, httpResponse.getStatusLine().getStatusCode(),
callback, promise);
} catch (Exception e) {
deliverResponse(client, options, new Response(options, e), callback, promise);
public void failed(Exception e) {
deliverResponse(client, options, new Response(options, e), callback, promise);
public void cancelled() {
deliverResponse(client, options, new Response(options, new HttpClientException("Request cancelled", null)), callback, promise);
return promise;
private static CloseableHttpAsyncClient createClient(CoercedRequestOptions coercedOptions) {
CloseableHttpAsyncClient client;
if (coercedOptions.getSslContext() != null) {
client = HttpAsyncClients.custom().setSSLContext(coercedOptions.getSslContext()).build();
} else {
client = HttpAsyncClients.createDefault();
return client;
private static void deliverResponse(CloseableHttpAsyncClient client, RequestOptions options,
Response httpResponse, IResponseCallback callback,
Promise<Response> promise) {
try {
if (callback != null) {
try {
} catch (Exception ex) {
promise.deliver(new Response(options, ex));
} else {
} finally {
try {
} catch (IOException e) {
throw new HttpClientException("Unable to close client", e);
private static HttpRequestBase constructRequest(HttpMethod httpMethod, String url, HttpEntity body) {
switch (httpMethod) {
case GET:
return requestWithNoBody(new HttpGet(url), body, httpMethod);
case HEAD:
return requestWithNoBody(new HttpHead(url), body, httpMethod);
case POST:
return requestWithBody(new HttpPost(url), body);
case PUT:
return requestWithBody(new HttpPut(url), body);
case DELETE:
return requestWithNoBody(new HttpDelete(url), body, httpMethod);
case TRACE:
return requestWithNoBody(new HttpTrace(url), body, httpMethod);
return requestWithNoBody(new HttpOptions(url), body, httpMethod);
case PATCH:
return requestWithBody(new HttpPatch(url), body);
throw new HttpClientException("Unable to construct request for:" + httpMethod + ", " + url, null);
private static HttpRequestBase requestWithBody(HttpEntityEnclosingRequestBase request, HttpEntity body) {
if (body != null) {
return request;
private static HttpRequestBase requestWithNoBody(HttpRequestBase request, Object body, HttpMethod httpMethod) {
if (body != null) {
throw new HttpClientException("Request of type " + httpMethod + " does not support 'body'!");
return request;
public static InputStream decompress(InputStream compressed, Map<String, String> headers) {
String contentEncoding = headers.get("content-encoding");
if (contentEncoding == null) {
return compressed;
switch (contentEncoding) {
case "gzip":
return Compression.gunzip(compressed);
case "deflate":
return Compression.inflate(compressed);
return compressed;
private static Object coerceBodyType(InputStream body, ResponseBodyType as,
ContentType contentType) {
switch (as) {
case TEXT:
String charset = "UTF-8";
if (contentType != null) {
charset = contentType.getCharset().name();
try {
return IOUtils.toString(body, charset);
} catch (IOException e) {
throw new HttpClientException("Unable to read body as string", e);
throw new HttpClientException("Unsupported body type: " + as);

@ -1,88 +0,0 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.http.client.HttpMethod;
import com.puppetlabs.http.client.HttpResponse;
import com.puppetlabs.http.client.RequestOptions;
import org.httpkit.HttpUtils;
import org.httpkit.client.IResponseHandler;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class ResponseHandler implements IResponseHandler {
private static final Set<Integer> REDIRECT_STATUS_CODES =
new HashSet<Integer>(Arrays.asList(301, 302, 303, 307, 308));
private final RequestOptions options;
private final CoercedRequestOptions coercedOptions;
private final IResponseCallback callback;
public ResponseHandler(RequestOptions options,
CoercedRequestOptions coercedOptions,
IResponseCallback callback) {
this.options = options;
this.coercedOptions = coercedOptions;
this.callback = callback;
private HttpMethod getNewMethod(int status) {
if (status == 301 || status == 302 || status == 303) {
return HttpMethod.GET;
} else {
return options.getMethod();
private void deliverResponse(HttpResponse response) {
HttpResponse finalResponse = response;
try {
if (callback != null) {
finalResponse = callback.handleResponse(response);
} catch (Exception e) {
// dump stacktrace to stderr
HttpUtils.printError(coercedOptions.getMethod() + " " +
coercedOptions.getUrl() + "'s callback", e);
// return the error
options.getPromise().deliver(new HttpResponse(options, e));
public void onSuccess(int status, Map<String, Object> headers, Object body) {
if (options.getFollowRedirects() && REDIRECT_STATUS_CODES.contains(status)) {
if (options.getMaxRedirects() >= options.getTraceRedirects().size()) {
// follow 301 and 302 redirect
try {
options.setUrl(new URI(coercedOptions.getUrl()).resolve((String) headers.get("location")).toString())
} catch (IOException e) {
throw new RuntimeException("Error when attempting redirect", e);
} catch (URISyntaxException e) {
throw new RuntimeException("Error when attempting redirect", e);
} else {
deliverResponse(new HttpResponse(options,
new Exception("too many redirects: " + options.getTraceRedirects().size())));
} else {
deliverResponse(new HttpResponse(options, body, headers, status));
public void onThrowable(Throwable t) {
deliverResponse(new HttpResponse(options, t));

@ -0,0 +1,83 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.certificate_authority.CertificateAuthority;
import com.puppetlabs.http.client.HttpClientException;
import com.puppetlabs.http.client.RequestOptions;
import com.puppetlabs.http.client.SyncHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SslUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncHttpClient.class);
private static void logAndRethrow(String msg, Throwable t) {
LOGGER.error(msg, t);
throw new HttpClientException(msg, t);
public static RequestOptions configureSsl(RequestOptions options) {
if (options.getSslContext() != null) {
return options;
if ((options.getSslCert() != null) &&
(options.getSslKey() != null) &&
(options.getSslCaCert() != null)) {
try {
new FileReader(options.getSslCert()),
new FileReader(options.getSslKey()),
new FileReader(options.getSslCaCert()))
} catch (KeyStoreException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (CertificateException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (IOException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (NoSuchAlgorithmException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (KeyManagementException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (UnrecoverableKeyException e) {
logAndRethrow("Error while configuring SSL", e);
return options;
if (options.getSslCaCert() != null) {
try {
new FileReader(options.getSslCaCert()))
} catch (KeyStoreException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (CertificateException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (IOException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (NoSuchAlgorithmException e) {
logAndRethrow("Error while configuring SSL", e);
} catch (KeyManagementException e) {
logAndRethrow("Error while configuring SSL", e);
return options;
return options;

@ -1,6 +0,0 @@
* This package is basically just a straight port of the clojure code from
* the org.httpkit.client namespace, so that we can make requests from Java
* using this same library.
package com.puppetlabs.http.client.impl;

@ -0,0 +1,75 @@
(ns puppetlabs.http.client.async-plaintext-test
(:import (com.puppetlabs.http.client AsyncHttpClient RequestOptions))
(:require [clojure.test :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[ :as jetty9]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]))
(use-fixtures :once schema-test/validate-schemas)
(defn app
{:status 200
:body "Hello, World!"})
(tk/defservice test-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler app "/hello")
(defn basic-test
[http-method java-method clj-fn]
(testing (format "async client: HTTP method: '%s'" http-method)
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java async client"
(let [options (RequestOptions. "http://localhost:10000/hello/")
response (java-method options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "clojure async client"
(let [response (clj-fn "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))))))
(deftest async-client-head-test
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java sync client"
(let [options (RequestOptions. "http://localhost:10000/hello/")
response (AsyncHttpClient/head options)]
(is (= 200 (.getStatus (.deref response))))
(is (= nil (.getBody (.deref response))))))
(testing "clojure sync client"
(let [response (async/head "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= nil (:body @response))))))))
(deftest async-client-get-test
(basic-test "GET" #(AsyncHttpClient/get %) async/get))
(deftest async-client-post-test
(basic-test "POST" #(AsyncHttpClient/post %) async/post))
(deftest async-client-put-test
(basic-test "PUT" #(AsyncHttpClient/put %) async/put))
(deftest async-client-delete-test
(basic-test "DELETE" #(AsyncHttpClient/delete %) async/delete))
(deftest async-client-trace-test
(basic-test "TRACE" #(AsyncHttpClient/trace %) async/trace))
(deftest async-client-options-test
(basic-test "OPTIONS" #(AsyncHttpClient/options %) async/options))
(deftest async-client-patch-test
(basic-test "PATCH" #(AsyncHttpClient/patch %) async/patch))

@ -0,0 +1,49 @@
(ns puppetlabs.http.client.async-ssl-config-test
(:require [clojure.test :refer :all]
[ :refer [resource]]
[puppetlabs.certificate-authority.core :as ssl]
[puppetlabs.http.client.async :as http]
[schema.test :as schema-test])
(:import [ SSLContext]))
(use-fixtures :once schema-test/validate-schemas)
(deftest ssl-config-with-files
(let [opts {:ssl-cert (resource "ssl/cert.pem")
:ssl-key (resource "ssl/key.pem")
:ssl-ca-cert (resource "ssl/ca.pem")}
configured-opts (http/configure-ssl opts)]
(testing "configure-ssl sets up an SSLContext when given cert, key, ca-cert"
(is (instance? SSLContext (:ssl-context configured-opts))))
(testing "removes ssl-cert, ssl-key, ssl-ca-cert"
(is (not (:ssl-cert configured-opts)))
(is (not (:ssl-key configured-opts)))
(is (not (:ssl-ca-cert configured-opts))))))
(deftest ssl-config-with-ca-file
(let [opts {:ssl-ca-cert (resource "ssl/ca.pem")}
configured-opts (http/configure-ssl opts)]
(testing "configure-ssl sets up an SSLContext when given ca-cert"
(is (instance? SSLContext (:ssl-context configured-opts))))
(testing "removes ssl-ca-cert"
(is (not (:ssl-ca-cert configured-opts))))))
(deftest ssl-config-without-ssl-params
(let [configured-opts (http/configure-ssl {})]
(testing "configure-ssl does nothing when given no ssl parameters"
(is (= {} configured-opts)))))
(deftest ssl-config-with-context
(let [opts {:ssl-context (ssl/pems->ssl-context
(resource "ssl/cert.pem")
(resource "ssl/key.pem")
(resource "ssl/ca.pem"))}
configured-opts (http/configure-ssl opts)]
(testing "configure-ssl uses an existing ssl context"
(is (instance? SSLContext (:ssl-context configured-opts))))))

@ -1,63 +0,0 @@
(ns puppetlabs.http.client.async-test
(:require [clojure.test :refer :all]
[ :refer [resource]]
[puppetlabs.certificate-authority.core :as ssl]
[puppetlabs.http.client.async :as http])
(:import [ SSLEngine]))
(deftest ssl-config-with-files
(let [req {:url "http://localhost"
:method :get
:ssl-cert (resource "ssl/cert.pem")
:ssl-key (resource "ssl/key.pem")
:ssl-ca-cert (resource "ssl/ca.pem")}
configured-req (http/configure-ssl req)]
(testing "configure-ssl sets up an SSLEngine when given cert, key, ca-cert"
(is (instance? SSLEngine (:sslengine configured-req))))
(testing "removes ssl-cert, ssl-key, ssl-ca-cert"
(is (not (:ssl-cert configured-req)))
(is (not (:ssl-key configured-req)))
(is (not (:ssl-ca-cert configured-req))))))
(deftest ssl-config-with-ca-file
(let [req {:ssl-ca-cert (resource "ssl/ca.pem")}
configured-req (http/configure-ssl req)]
(testing "configure-ssl sets up an SSLEngine when given ca-cert"
(is (instance? SSLEngine (:sslengine configured-req))))
(testing "removes ssl-ca-cert"
(is (not (:ssl-ca-cert configured-req))))))
(deftest ssl-config-without-ssl-params
(let [req {:url "http://localhost"
:method :get}
configured-req (http/configure-ssl req)]
(testing "configure-ssl does nothing when given no ssl parameters"
(is (= req configured-req)))))
(deftest ssl-config-with-context
(let [req {:url "http://localhost"
:method :get
:ssl-context (ssl/pems->ssl-context
(resource "ssl/cert.pem")
(resource "ssl/key.pem")
(resource "ssl/ca.pem"))}
configured-req (http/configure-ssl req)]
(testing "configure-ssl uses an existing ssl context"
(is (instance? SSLEngine (:sslengine configured-req))))))
(deftest ssl-config-with-sslengine
(let [req {:url "http://localhost"
:method :get
:ssl-cert (resource "ssl/cert.pem")
:ssl-key (resource "ssl/key.pem")
:ssl-ca-cert (resource "ssl/ca.pem")
:sslengine "thing"}
configured-req (http/configure-ssl req)]
(testing "configure-ssl does nothing when :sslengine is given"
(is (= req configured-req)))))

@ -0,0 +1,57 @@
(ns puppetlabs.http.client.decompress-test
(:import ( ByteArrayOutputStream ByteArrayInputStream)
( GZIPOutputStream DeflaterInputStream)
( IOUtils)
(com.puppetlabs.http.client.impl JavaClient)
(java.util HashMap))
(:require [clojure.test :refer :all]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]))
(use-fixtures :once schema-test/validate-schemas)
(def compressible-body (apply str (repeat 1000 "f")))
(defn gzip
(let [baos (ByteArrayOutputStream.)
gos (GZIPOutputStream. baos)]
(-> s
(.getBytes "UTF-8")
(IOUtils/copy gos))
(.close gos)
(ByteArrayInputStream. (.toByteArray baos))))
(defn deflate
(-> s
(.getBytes "UTF-8")
(deftest gzip-compress-test
(testing "clojure gzip decompression"
(let [test-response {:headers {"content-encoding" "gzip"}
:body (gzip compressible-body)}
response (async/decompress test-response)]
(is (not (contains? (:headers response) "content-encoding")))
(is (= compressible-body (slurp (:body response))))))
(testing "java gzip decompression"
(let [headers (HashMap. {"content-encoding" "gzip"})
response (JavaClient/decompress (gzip compressible-body) headers)]
(is (not (.containsKey headers "content-encoding")))
(is (= compressible-body (slurp response))))))
(deftest deflate-compress-test
(testing "clojure deflate decompression"
(let [test-response {:headers {"content-encoding" "deflate"}
:body (deflate compressible-body)}
response (async/decompress test-response)]
(is (not (contains? (:headers response) "content-encoding")))
(is (= compressible-body (slurp (:body response))))))
(testing "java gzip decompression"
(let [headers (HashMap. {"content-encoding" "deflate"})
response (JavaClient/decompress (deflate compressible-body) headers)]
(is (not (.containsKey headers "content-encoding")))
(is (= compressible-body (slurp response))))))

@ -0,0 +1,239 @@
(ns puppetlabs.http.client.sync-plaintext-test
(:import (com.puppetlabs.http.client SyncHttpClient RequestOptions
HttpClientException ResponseBodyType)
( SSLHandshakeException)
( ByteArrayInputStream InputStream)
(java.nio.charset Charset))
(:require [clojure.test :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[ :as jetty9]
[puppetlabs.http.client.sync :as sync]
[schema.test :as schema-test]
[ :as io]))
(use-fixtures :once schema-test/validate-schemas)
(defn app
{:status 200
:body "Hello, World!"})
(tk/defservice test-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler app "/hello")
(defn basic-test
[http-method java-method clj-fn]
(testing (format "sync client: HTTP method: '%s'" http-method)
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java sync client"
(let [options (RequestOptions. "http://localhost:10000/hello/")
response (java-method options)]
(is (= 200 (.getStatus response)))
(is (= "Hello, World!" (slurp (.getBody response))))))
(testing "clojure sync client"
(let [response (clj-fn "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))))))
(deftest sync-client-head-test
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java sync client"
(let [options (RequestOptions. "http://localhost:10000/hello/")
response (SyncHttpClient/head options)]
(is (= 200 (.getStatus response)))
(is (= nil (.getBody response)))))
(testing "clojure sync client"
(let [response (sync/head "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= nil (:body response))))))))
(deftest sync-client-get-test
(basic-test "GET" #(SyncHttpClient/get %) sync/get))
(deftest sync-client-post-test
(basic-test "POST" #(SyncHttpClient/post %) sync/post))
(deftest sync-client-put-test
(basic-test "PUT" #(SyncHttpClient/put %) sync/put))
(deftest sync-client-delete-test
(basic-test "DELETE" #(SyncHttpClient/delete %) sync/delete))
(deftest sync-client-trace-test
(basic-test "TRACE" #(SyncHttpClient/trace %) sync/trace))
(deftest sync-client-options-test
(basic-test "OPTIONS" #(SyncHttpClient/options %) sync/options))
(deftest sync-client-patch-test
(basic-test "PATCH" #(SyncHttpClient/patch %) sync/patch))
(deftest sync-client-as-test
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java sync client: :as unspecified"
(let [options (RequestOptions. "http://localhost:10000/hello/")
response (SyncHttpClient/get options)]
(is (= 200 (.getStatus response)))
(is (instance? InputStream (.getBody response)))
(is (= "Hello, World!" (slurp (.getBody response))))))
(testing "java sync client: :as :stream"
(let [options (.. (RequestOptions. "http://localhost:10000/hello/")
(setAs ResponseBodyType/STREAM))
response (SyncHttpClient/get options)]
(is (= 200 (.getStatus response)))
(is (instance? InputStream (.getBody response)))
(is (= "Hello, World!" (slurp (.getBody response))))))
(testing "java sync client: :as :text"
(let [options (.. (RequestOptions. "http://localhost:10000/hello/")
(setAs ResponseBodyType/TEXT))
response (SyncHttpClient/get options)]
(is (= 200 (.getStatus response)))
(is (string? (.getBody response)))
(is (= "Hello, World!" (.getBody response)))))
(testing "clojure sync client: :as unspecified"
(let [response (sync/get "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (instance? InputStream (:body response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "clojure sync client: :as :stream"
(let [response (sync/get "http://localhost:10000/hello/" {:as :stream})]
(is (= 200 (:status response)))
(is (instance? InputStream (:body response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "clojure sync client: :as :text"
(let [response (sync/get "http://localhost:10000/hello/" {:as :text})]
(is (= 200 (:status response)))
(is (string? (:body response)))
(is (= "Hello, World!" (:body response))))))))
(defn header-app
(let [val (get-in req [:headers "fooheader"])]
{:status 200
:headers {"myrespheader" val}
:body val}))
(tk/defservice test-header-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler header-app "/hello")
(deftest sync-client-request-headers-test
(testutils/with-app-with-config header-app
[jetty9/jetty9-service test-header-web-service]
{:webserver {:port 10000}}
(testing "java sync client"
(let [options (-> (RequestOptions. "http://localhost:10000/hello/")
(.setHeaders {"fooheader" "foo"}))
response (SyncHttpClient/post options)]
(is (= 200 (.getStatus response)))
(is (= "foo" (slurp (.getBody response))))
(is (= "foo" (-> (.getHeaders response) (.get "myrespheader"))))))
(testing "clojure sync client"
(let [response (sync/post "http://localhost:10000/hello/" {:headers {"fooheader" "foo"}})]
(is (= 200 (:status response)))
(is (= "foo" (slurp (:body response))))
(is (= "foo" (get-in response [:headers "myrespheader"]))))))))
(defn req-body-app
{:status 200
:body (slurp (:body req))})
(tk/defservice test-body-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler req-body-app "/hello")
(deftest sync-client-request-body-test
(testutils/with-app-with-config req-body-app
[jetty9/jetty9-service test-body-web-service]
{:webserver {:port 10000}}
(testing "java sync client: string body for post request"
(let [options (-> (RequestOptions. "http://localhost:10000/hello/")
(.setBody "foo"))
response (SyncHttpClient/post options)]
(is (= 200 (.getStatus response)))
(is (= "foo" (slurp (.getBody response)))))
(let [options (-> (RequestOptions. "http://localhost:10000/hello/")
(.setBody (ByteArrayInputStream. (.getBytes "foo" "UTF-8"))))
response (SyncHttpClient/post options)]
(is (= 200 (.getStatus response)))
(is (= "foo" (slurp (.getBody response))))))
(testing "clojure sync client: string body for post request"
(let [response (sync/post "http://localhost:10000/hello/" {:body (io/input-stream (.getBytes "foo" "UTF-8"))})]
(is (= 200 (:status response)))
(is (= "foo" (slurp (:body response)))))))))
(def compressible-body (apply str (repeat 1000 "f")))
(defn compression-app
{:status 200
:headers {"orig-accept-encoding" (get-in req [:headers "accept-encoding"])
"content-type" "text/plain"
"charset" "UTF-8"}
:body compressible-body})
(tk/defservice test-compression-web-service
[[:WebserverService add-ring-handler]]
(init [this context]
(add-ring-handler compression-app "/hello")
(defn test-compression
[desc opts accept-encoding content-encoding content-should-match?]
(testutils/with-app-with-config req-body-app
[jetty9/jetty9-service test-compression-web-service]
{:webserver {:port 10000}}
(testing (str "java sync client: compression headers / response: " desc)
(let [java-opts (cond-> (RequestOptions. "http://localhost:10000/hello/")
(contains? opts :decompress-body) (.setDecompressBody (:decompress-body opts))
(contains? opts :headers) (.setHeaders (:headers opts)))
response (SyncHttpClient/get java-opts)]
(is (= 200 (.getStatus response)))
(is (= accept-encoding (.. response getHeaders (get "orig-accept-encoding"))))
(is (= content-encoding (.. response getOrigContentEncoding)))
(if content-should-match?
(is (= compressible-body (slurp (.getBody response))))
(is (not= compressible-body (slurp (.getBody response)))))))
(testing (str "clojure sync client: compression headers / response: " desc)
(let [response (sync/post "http://localhost:10000/hello/" opts)]
(is (= 200 (:status response)))
(is (= accept-encoding (get-in response [:headers "orig-accept-encoding"])))
(is (= content-encoding (:orig-content-encoding response)))
(if content-should-match?
(is (= compressible-body (slurp (:body response))))
(is (not= compressible-body (slurp (:body response))))))))))
(deftest sync-client-compression-test
(test-compression "default" {} "gzip, deflate" "gzip" true))
(deftest sync-client-compression-gzip-test
(test-compression "explicit gzip" {:headers {"accept-encoding" "gzip"}} "gzip" "gzip" true))
(deftest sync-client-compression-disabled-test
(test-compression "explicit disable" {:decompress-body false} nil nil true))
(deftest sync-client-decompression-disabled-test
(test-compression "explicit disable" {:headers {"accept-encoding" "gzip"}
:decompress-body false} "gzip" "gzip" false))

@ -1,4 +1,4 @@
(ns puppetlabs.http.client.sync-test
(ns puppetlabs.http.client.sync-ssl-test
(:import (com.puppetlabs.http.client SyncHttpClient RequestOptions
( SSLHandshakeException))
@ -7,7 +7,10 @@
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[ :as jetty9]
[puppetlabs.http.client.sync :as sync]))
[puppetlabs.http.client.sync :as sync]
[schema.test :as schema-test]))
(use-fixtures :once schema-test/validate-schemas)
(defn app