Merge pull request #8 from fpringvaldsen/TK-27-manage-clients-explicitly

(TK-27) Allow user to manage clients explicitly
This commit is contained in:
Chris Price 2014-07-23 12:14:28 -07:00
commit 0466c7592b
5 changed files with 292 additions and 57 deletions

View file

@ -26,7 +26,7 @@
(:require [puppetlabs.certificate-authority.core :as ssl]
[clojure.string :as str]
[puppetlabs.kitchensink.core :as ks]
[puppetlabs.http.client.schemas :as schemas]
[puppetlabs.http.client.common :as common]
[schema.core :as schema]
[clojure.tools.logging :as log])
(:refer-clojure :exclude (get)))
@ -60,12 +60,12 @@
[req]
(initialize-ssl-context-from-ca-pem req))
(schema/defn configure-ssl :- (schema/either {} schemas/SslContextOptions)
(schema/defn configure-ssl :- (schema/either {} common/SslContextOptions)
"Configures a request map to have an SSLContext. It will use an existing one
(stored in :ssl-context) if already present, and will fall back to a set of
PEM files (stored in :ssl-cert, :ssl-key, and :ssl-ca-cert) if those are present.
If none of these are present this does not modify the request map."
[opts :- schemas/SslOptions]
[opts :- common/SslOptions]
(cond
(:ssl-context opts) opts
(every? opts [:ssl-cert :ssl-key :ssl-ca-cert]) (configure-ssl-from-pems opts)
@ -169,16 +169,16 @@
:body (when-let [entity (.getEntity http-response)]
(.getContent entity))}))
(schema/defn error-response :- schemas/ErrorResponse
[opts :- schemas/UserRequestOptions
(schema/defn error-response :- common/ErrorResponse
[opts :- common/UserRequestOptions
e :- Exception]
{:opts opts
:error e})
(schema/defn callback-response :- schemas/Response
[opts :- schemas/UserRequestOptions
callback :- schemas/ResponseCallbackFn
response :- schemas/Response]
(schema/defn callback-response :- common/Response
[opts :- common/UserRequestOptions
callback :- common/ResponseCallbackFn
response :- common/Response]
(if callback
(try
(callback response)
@ -187,21 +187,22 @@
response))
(schema/defn deliver-result
[client :- schemas/Client
result :- schemas/ResponsePromise
opts :- schemas/UserRequestOptions
callback :- schemas/ResponseCallbackFn
response :- schemas/Response]
[client :- common/Client
result :- common/ResponsePromise
opts :- common/UserRequestOptions
callback :- common/ResponseCallbackFn
response :- common/Response]
(try
(deliver result (callback-response opts callback response))
(finally
(.close client))))
(if (not (:persistent opts))
(.close client)))))
(schema/defn future-callback
[client :- schemas/Client
result :- schemas/ResponsePromise
opts :- schemas/UserRequestOptions
callback :- schemas/ResponseCallbackFn]
[client :- common/Client
result :- common/ResponsePromise
opts :- common/RequestOptions
callback :- common/ResponseCallbackFn]
(reify FutureCallback
(completed [this http-response]
(try
@ -222,23 +223,50 @@
opts
(HttpClientException. "Request cancelled"))))))
(schema/defn extract-client-opts :- schemas/ClientOptions
[opts :- schemas/UserRequestOptions]
(schema/defn extract-client-opts :- common/ClientOptions
[opts :- common/UserRequestOptions]
(select-keys opts [:ssl-context :ssl-ca-cert :ssl-cert :ssl-key]))
(schema/defn create-client :- schemas/Client
[opts :- schemas/ClientOptions]
(let [opts (configure-ssl opts)
client (if (:ssl-context opts)
(.. (HttpAsyncClients/custom) (setSSLContext (:ssl-context opts)) build)
(HttpAsyncClients/createDefault))]
(schema/defn extract-request-opts :- common/RequestOptions
[opts :- common/UserRequestOptions]
(select-keys opts [:url :method :headers :body :decompress-body :as :persistent]))
(schema/defn create-default-client :- common/Client
[opts :- common/ClientOptions]
(let [configured-opts (configure-ssl opts)
client (if (:ssl-context configured-opts)
(.. (HttpAsyncClients/custom) (setSSLContext (:ssl-context configured-opts)) build)
(HttpAsyncClients/createDefault))]
(.start client)
client))
(schema/defn ^:always-validate request-with-client :- common/ResponsePromise
[opts :- common/RawUserRequestOptions
callback :- common/ResponseCallbackFn
client]
(let [persistent (not (nil? client))
defaults {:headers {}
:body nil
:decompress-body true
:as :stream}
opts (assoc (merge defaults opts) :persistent persistent)
client-opts (extract-client-opts opts)
request-opts (extract-request-opts opts)
client (or client (create-default-client client-opts))
{:keys [method url body] :as coerced-opts} (coerce-opts opts)
request (construct-request method url)
result (promise)]
(.setHeaders request (:headers coerced-opts))
(when body
(.setEntity request body))
(.execute client request
(future-callback client result request-opts callback))
result))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Public
(schema/defn ^:always-validate request :- schemas/ResponsePromise
(schema/defn ^:always-validate request :- common/ResponsePromise
"Issues an async HTTP request and returns a promise object to which the value
of `(callback {:opts _ :status _ :headers _ :body _})` or
`(callback {:opts _ :error _})` will be delivered.
@ -268,26 +296,37 @@
* :ssl-cert - path to a PEM file containing the client cert
* :ssl-key - path to a PEM file containing the client private key
* :ssl-ca-cert - path to a PEM file containing the CA cert"
([opts :- schemas/RawUserRequestOptions]
([opts :- common/RawUserRequestOptions]
(request opts nil))
([opts :- schemas/RawUserRequestOptions
callback :- schemas/ResponseCallbackFn]
(let [defaults {:headers {}
:body nil
:decompress-body true
:as :stream}
opts (merge defaults opts)
client-opts (extract-client-opts opts)
client (create-client client-opts)
{:keys [method url body] :as coerced-opts} (coerce-opts opts)
request (construct-request method url)
result (promise)]
(.setHeaders request (:headers coerced-opts))
(when body
(.setEntity request body))
(.execute client request
(future-callback client result opts callback))
result)))
([opts :- common/RawUserRequestOptions
callback :- common/ResponseCallbackFn]
(request-with-client opts callback nil)))
(schema/defn create-client :- common/HTTPClient
[opts :- common/ClientOptions]
(let [opts (configure-ssl opts)
client (if (:ssl-context opts)
(.. (HttpAsyncClients/custom) (setSSLContext (:ssl-context opts)) build)
(HttpAsyncClients/createDefault))]
(.start client)
(reify common/HTTPClient
(get [this url] (common/get this url {}))
(get [_ url opts] (request-with-client (assoc opts :method :get :url url) nil client))
(head [this url] (common/head this url {}))
(head [_ url opts] (request-with-client (assoc opts :method :head :url url) nil client))
(post [this url] (common/post this url {}))
(post [_ url opts] (request-with-client (assoc opts :method :post :url url) nil client))
(put [this url] (common/put this url {}))
(put [_ url opts] (request-with-client (assoc opts :method :put :url url) nil client))
(delete [this url] (common/delete this url {}))
(delete [_ url opts] (request-with-client (assoc opts :method :delete :url url) nil client))
(trace [this url] (common/trace this url {}))
(trace [_ url opts] (request-with-client (assoc opts :method :trace :url url) nil client))
(options [this url] (common/options this url {}))
(options [_ url opts] (request-with-client (assoc opts :method :options :url url) nil client))
(patch [this url] (common/patch this url {}))
(patch [_ url opts] (request-with-client (assoc opts :method :patch :url url) nil client))
(close [_] (.close client)))))
(defn get
"Issue an asynchronous HTTP GET request. This will raise an exception if an

View file

@ -1,11 +1,26 @@
(ns puppetlabs.http.client.schemas
(ns puppetlabs.http.client.common
(:import (java.net URL)
(javax.net.ssl SSLContext)
(org.apache.http.impl.nio.client CloseableHttpAsyncClient)
(clojure.lang IBlockingDeref)
(java.io InputStream)
(java.nio.charset Charset))
(:require [schema.core :as schema]))
(:require [schema.core :as schema])
(:refer-clojure :exclude (get)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Client Protocol
(defprotocol HTTPClient
(get [this url] [this url opts])
(head [this url] [this url opts])
(post [this url] [this url opts])
(put [this url] [this url opts])
(delete [this url] [this url opts])
(trace [this url] [this url opts])
(options [this url] [this url opts])
(patch [this url] [this url opts])
(close [this]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Schemas
@ -14,8 +29,8 @@
(def UrlOrString (schema/either schema/Str URL))
;; TODO: replace this with a protocol
(def Client CloseableHttpAsyncClient)
-;; TODO: replace this with a protocol
-(def Client CloseableHttpAsyncClient)
(def Headers
{schema/Str schema/Str})
@ -27,6 +42,9 @@
(schema/enum :text :stream))
(def RawUserRequestOptions
"The list of Request options passed by a user into
the request function. Allows the user to configure
both a client and a request."
{:url UrlOrString
:method schema/Keyword
(ok :headers) Headers
@ -40,12 +58,17 @@
(ok :ssl-ca-cert) UrlOrString})
(def RequestOptions
"The options from UserRequestOptions that have to do with the
configuration and settings for an individual request. This is
everything from UserRequestOptions not included in
ClientOptions."
{:url UrlOrString
:method schema/Keyword
:headers Headers
:body Body
:decompress-body schema/Bool
:as BodyType})
:as BodyType
(ok :persistent) schema/Bool})
(def SslContextOptions
{:ssl-context SSLContext})
@ -62,6 +85,8 @@
(schema/either {} SslContextOptions SslCertOptions SslCaCertOptions))
(def UserRequestOptions
"A cleaned-up version of RawUserRequestOptions, which is formed after
validating the RawUserRequestOptions and merging it with the defaults."
(schema/either
RequestOptions
(merge RequestOptions SslContextOptions)
@ -69,6 +94,9 @@
(merge RequestOptions SslCertOptions)))
(def ClientOptions
"The options from UserRequestOptions that are related to the
instantiation/management of a client. This is everything
from UserRequestOptions not included in RequestOptions."
SslOptions)
(def ResponseCallbackFn
@ -96,3 +124,4 @@
(def Response
(schema/either NormalResponse ErrorResponse))

View file

@ -2,9 +2,26 @@
;; defined in puppetlabs.http.client
(ns puppetlabs.http.client.sync
(:require [puppetlabs.http.client.async :as async])
(:import (org.apache.http.impl.nio.client HttpAsyncClients))
(:require [puppetlabs.http.client.async :as async]
[schema.core :as schema]
[puppetlabs.http.client.common :as common])
(:refer-clojure :exclude (get)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Private utility functions
(defn request-with-client
[req client]
(let [{:keys [error] :as resp} @(async/request-with-client req nil client)]
(if error
(throw error)
resp)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Public
(defn request
[req]
(let [{:keys [error] :as resp} @(async/request req nil)]
@ -12,6 +29,32 @@
(throw error)
resp)))
(schema/defn create-client :- common/HTTPClient
[opts :- common/ClientOptions]
(let [opts (async/configure-ssl opts)
client (if (:ssl-context opts)
(.. (HttpAsyncClients/custom) (setSSLContext (:ssl-context opts)) build)
(HttpAsyncClients/createDefault))]
(.start client)
(reify common/HTTPClient
(get [this url] (common/get this url {}))
(get [_ url opts] (request-with-client (assoc opts :method :get :url url) client))
(head [this url] (common/head this url {}))
(head [_ url opts] (request-with-client (assoc opts :method :head :url url) client))
(post [this url] (common/post this url {}))
(post [_ url opts] (request-with-client (assoc opts :method :post :url url) client))
(put [this url] (common/put this url {}))
(put [_ url opts] (request-with-client (assoc opts :method :put :url url) client))
(delete [this url] (common/delete this url {}))
(delete [_ url opts] (request-with-client (assoc opts :method :delete :url url) client))
(trace [this url] (common/trace this url {}))
(trace [_ url opts] (request-with-client (assoc opts :method :trace :url url) client))
(options [this url] (common/options this url {}))
(options [_ url opts] (request-with-client (assoc opts :method :options :url url) client))
(patch [this url] (common/patch this url {}))
(patch [_ url opts] (request-with-client (assoc opts :method :patch :url url) client))
(close [_] (.close client)))))
(defn get
"Issue a synchronous HTTP GET request. This will raise an exception if an
error is returned."

View file

@ -1,10 +1,12 @@
(ns puppetlabs.http.client.async-plaintext-test
(:import (com.puppetlabs.http.client AsyncHttpClient RequestOptions))
(:import (com.puppetlabs.http.client AsyncHttpClient RequestOptions)
(org.apache.http.impl.nio.client HttpAsyncClients))
(:require [clojure.test :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[puppetlabs.trapperkeeper.services.webserver.jetty9-service :as jetty9]
[puppetlabs.http.client.common :as common]
[puppetlabs.http.client.async :as async]
[schema.test :as schema-test]))
@ -43,7 +45,7 @@
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(testing "java sync client"
(testing "java async client"
(let [options (RequestOptions. "http://localhost:10000/hello/")
response (AsyncHttpClient/head options)]
(is (= 200 (.getStatus (.deref response))))
@ -72,4 +74,64 @@
(basic-test "OPTIONS" #(AsyncHttpClient/options %) async/options))
(deftest async-client-patch-test
(basic-test "PATCH" #(AsyncHttpClient/patch %) async/patch))
(basic-test "PATCH" #(AsyncHttpClient/patch %) async/patch))
(deftest persistent-async-client-test
(testlogging/with-test-logging
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(let [client (async/create-client {})]
(testing "HEAD request with persistent async client"
(let [response (common/head client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= nil (:body @response)))))
(testing "GET request with persistent async client"
(let [response (common/get client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "POST request with persistent async client"
(let [response (common/post client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PUT request with persistent async client"
(let [response (common/put client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "DELETE request with persistent async client"
(let [response (common/delete client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "TRACE request with persistent async client"
(let [response (common/trace client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "OPTIONS request with persistent async client"
(let [response (common/options client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PATCH request with persistent async client"
(let [response (common/patch client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "client closes properly"
(common/close client)
(is (thrown? IllegalStateException (common/get client "http://localhost:10000/hello/"))))))))
(deftest request-with-client-test
(testlogging/with-test-logging
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(let [client (HttpAsyncClients/createDefault)
opts {:method :get :url "http://localhost:10000/hello/"}]
(.start client)
(testing "GET request works with request-with-client"
(let [response (async/request-with-client opts nil client)]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "Client persists when passed to request-with-client"
(let [response (async/request-with-client opts nil client)]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(.close client)))))

View file

@ -3,13 +3,15 @@
HttpClientException ResponseBodyType)
(javax.net.ssl SSLHandshakeException)
(java.io ByteArrayInputStream InputStream)
(java.nio.charset Charset))
(java.nio.charset Charset)
(org.apache.http.impl.nio.client HttpAsyncClients))
(:require [clojure.test :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
[puppetlabs.trapperkeeper.testutils.bootstrap :as testutils]
[puppetlabs.trapperkeeper.testutils.logging :as testlogging]
[puppetlabs.trapperkeeper.services.webserver.jetty9-service :as jetty9]
[puppetlabs.http.client.sync :as sync]
[puppetlabs.http.client.common :as common]
[schema.test :as schema-test]
[clojure.java.io :as io]))
@ -79,6 +81,48 @@
(deftest sync-client-patch-test
(basic-test "PATCH" #(SyncHttpClient/patch %) sync/patch))
(deftest sync-client-persistent-test
(testlogging/with-test-logging
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(let [client (sync/create-client {})]
(testing "HEAD request with persistent sync client"
(let [response (common/head client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= nil (:body response)))))
(testing "GET request with persistent sync client"
(let [response (common/get client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "POST request with persistent sync client"
(let [response (common/post client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "PUT request with persistent sync client"
(let [response (common/put client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "DELETE request with persistent sync client"
(let [response (common/delete client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "TRACE request with persistent sync client"
(let [response (common/trace client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "OPTIONS request with persistent sync client"
(let [response (common/options client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "PATCH request with persistent sync client"
(let [response (common/patch client "http://localhost:10000/hello/")]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(common/close client)
(is (thrown? IllegalStateException
(common/get client "http://localhost:10000/hello")))))))
(deftest sync-client-as-test
(testlogging/with-test-logging
(testutils/with-app-with-config app
@ -120,6 +164,24 @@
(is (string? (:body response)))
(is (= "Hello, World!" (:body response))))))))
(deftest request-with-client-test
(testlogging/with-test-logging
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(let [client (HttpAsyncClients/createDefault)
opts {:method :get :url "http://localhost:10000/hello/"}]
(.start client)
(testing "GET request works with request-with-client"
(let [response (sync/request-with-client opts client)]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(testing "Client persists when passed to request-with-client"
(let [response (sync/request-with-client opts client)]
(is (= 200 (:status response)))
(is (= "Hello, World!" (slurp (:body response))))))
(.close client)))))
(defn header-app
[req]
(let [val (get-in req [:headers "fooheader"])]