diff --git a/src/java/com/puppetlabs/http/client/AsyncHttpClient.java b/src/java/com/puppetlabs/http/client/AsyncHttpClient.java index 2f76c56..deb612e 100644 --- a/src/java/com/puppetlabs/http/client/AsyncHttpClient.java +++ b/src/java/com/puppetlabs/http/client/AsyncHttpClient.java @@ -1,8 +1,6 @@ package com.puppetlabs.http.client; -import com.puppetlabs.http.client.impl.JavaClient; -import com.puppetlabs.http.client.impl.Promise; -import com.puppetlabs.http.client.impl.SslUtils; +import com.puppetlabs.http.client.impl.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,7 +15,13 @@ public class AsyncHttpClient { return JavaClient.request(requestOptions, clientOptions, null); } - + + public static PersistentAsyncHttpClient createClient(ClientOptions clientOptions) { + clientOptions = SslUtils.configureSsl(clientOptions); + CoercedClientOptions coercedClientOptions = JavaClient.coerceClientOptions(clientOptions); + return new PersistentAsyncHttpClient(JavaClient.createClient(coercedClientOptions)); + } + public static Promise get(String url) throws URISyntaxException { return get(new URI(url)); } diff --git a/src/java/com/puppetlabs/http/client/impl/JavaClient.java b/src/java/com/puppetlabs/http/client/impl/JavaClient.java index 05e8533..a656d8d 100644 --- a/src/java/com/puppetlabs/http/client/impl/JavaClient.java +++ b/src/java/com/puppetlabs/http/client/impl/JavaClient.java @@ -129,7 +129,7 @@ public class JavaClient { return new CoercedRequestOptions(uri, method, headers, body); } - private static CoercedClientOptions coerceClientOptions(ClientOptions options) { + public static CoercedClientOptions coerceClientOptions(ClientOptions options) { SSLContext sslContext = null; if (options.getSslContext() != null) { sslContext = options.getSslContext(); @@ -187,11 +187,19 @@ public class JavaClient { } public static Promise request(final RequestOptions requestOptions, final ClientOptions clientOptions, final IResponseCallback callback) { - CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions); CoercedClientOptions coercedClientOptions = coerceClientOptions(clientOptions); final CloseableHttpAsyncClient client = createClient(coercedClientOptions); + return requestWithClient(requestOptions, callback, client, false); + } + + public static Promise requestWithClient(final RequestOptions requestOptions, + final IResponseCallback callback, + final CloseableHttpAsyncClient client, + final boolean persistent) { + CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions); + HttpRequestBase request = constructRequest(coercedRequestOptions.getMethod(), coercedRequestOptions.getUri(), coercedRequestOptions.getBody()); request.setHeaders(coercedRequestOptions.getHeaders()); @@ -226,27 +234,27 @@ public class JavaClient { new Response(requestOptions, origContentEncoding, body, headers, httpResponse.getStatusLine().getStatusCode(), contentType), - callback, promise); + callback, promise, persistent); } catch (Exception e) { - deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise); + deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise, persistent); } } @Override public void failed(Exception e) { - deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise); + deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise, persistent); } @Override public void cancelled() { - deliverResponse(client, requestOptions, new Response(requestOptions, new HttpClientException("Request cancelled", null)), callback, promise); + deliverResponse(client, requestOptions, new Response(requestOptions, new HttpClientException("Request cancelled", null)), callback, promise, persistent); } }); return promise; } - private static CloseableHttpAsyncClient createClient(CoercedClientOptions coercedOptions) { + public static CloseableHttpAsyncClient createClient(CoercedClientOptions coercedOptions) { HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom(); if (coercedOptions.getSslContext() != null) { clientBuilder.setSSLStrategy( @@ -283,7 +291,7 @@ public class JavaClient { private static void deliverResponse(CloseableHttpAsyncClient client, RequestOptions options, Response httpResponse, IResponseCallback callback, - Promise promise) { + Promise promise, boolean persistent) { try { if (callback != null) { try { @@ -304,7 +312,9 @@ public class JavaClient { // great solution but avoids the deadlock until an implementation // that allows the originating request thread to perform the client // close can be done. - AsyncClose.close(client); + if (!persistent) { + AsyncClose.close(client); + } } } diff --git a/src/java/com/puppetlabs/http/client/impl/PersistentAsyncHttpClient.java b/src/java/com/puppetlabs/http/client/impl/PersistentAsyncHttpClient.java new file mode 100644 index 0000000..7cf0719 --- /dev/null +++ b/src/java/com/puppetlabs/http/client/impl/PersistentAsyncHttpClient.java @@ -0,0 +1,106 @@ +package com.puppetlabs.http.client.impl; + +import com.puppetlabs.http.client.*; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URISyntaxException; + +public class PersistentAsyncHttpClient { + private CloseableHttpAsyncClient client; + private static final Logger LOGGER = LoggerFactory.getLogger(SyncHttpClient.class); + + public PersistentAsyncHttpClient(CloseableHttpAsyncClient client) { + this.client = client; + } + + public void close() { + AsyncClose.close(client); + } + + public Promise request(RequestOptions requestOptions) { + return JavaClient.requestWithClient(requestOptions, null, client, true); + } + + public Promise get(String url) throws URISyntaxException { + return get(new URI(url)); + } + public Promise get(URI uri) { + return get(new RequestOptions(uri)); + } + public Promise get(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.GET)); + } + + public Promise head(String url) throws URISyntaxException { + return head(new URI(url)); + } + public Promise head(URI uri) { + return head(new RequestOptions(uri)); + } + public Promise head(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.HEAD)); + } + + public Promise post(String url) throws URISyntaxException { + return post(new URI(url)); + } + public Promise post(URI uri) { + return post(new RequestOptions(uri)); + } + public Promise post(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.POST)); + } + + public Promise put(String url) throws URISyntaxException { + return put(new URI(url)); + } + public Promise put(URI uri) { + return put(new RequestOptions(uri)); + } + public Promise put(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.PUT)); + } + + public Promise delete(String url) throws URISyntaxException { + return delete(new URI(url)); + } + public Promise delete(URI uri) { + return delete(new RequestOptions(uri)); + } + public Promise delete(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.DELETE)); + } + + public Promise trace(String url) throws URISyntaxException { + return trace(new URI(url)); + } + public Promise trace(URI uri) { + return trace(new RequestOptions(uri)); + } + public Promise trace(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.TRACE)); + } + + public Promise options(String url) throws URISyntaxException { + return options(new URI(url)); + } + public Promise options(URI uri) { + return options(new RequestOptions(uri)); + } + public Promise options(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.OPTIONS)); + } + + public Promise patch(String url) throws URISyntaxException { + return patch(new URI(url)); + } + public Promise patch(URI uri) { + return patch(new RequestOptions(uri)); + } + public Promise patch(RequestOptions requestOptions) { + return request(requestOptions.setMethod(HttpMethod.PATCH)); + } +} diff --git a/test/puppetlabs/http/client/async_plaintext_test.clj b/test/puppetlabs/http/client/async_plaintext_test.clj index ef95a41..53dca67 100644 --- a/test/puppetlabs/http/client/async_plaintext_test.clj +++ b/test/puppetlabs/http/client/async_plaintext_test.clj @@ -1,7 +1,8 @@ (ns puppetlabs.http.client.async-plaintext-test (:import (com.puppetlabs.http.client AsyncHttpClient RequestOptions ClientOptions) (org.apache.http.impl.nio.client HttpAsyncClients) - (java.net URI)) + (java.net URI) + (com.puppetlabs.http.client.impl PersistentAsyncHttpClient)) (:require [clojure.test :refer :all] [puppetlabs.http.client.test-common :refer :all] [puppetlabs.trapperkeeper.core :as tk] @@ -54,7 +55,7 @@ response (AsyncHttpClient/head request-options client-options)] (is (= 200 (.getStatus (.deref response)))) (is (= nil (.getBody (.deref response)))))) - (testing "clojure sync client" + (testing "clojure async client" (let [response (async/head "http://localhost:10000/hello/")] (is (= 200 (:status @response))) (is (= nil (:body @response)))))))) @@ -85,42 +86,85 @@ (testutils/with-app-with-config app [jetty9/jetty9-service test-web-service] {:webserver {:port 10000}} - (let [client (async/create-client {})] - (testing "HEAD request with persistent async client" - (let [response (common/head client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= nil (:body @response))))) - (testing "GET request with persistent async client" - (let [response (common/get client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "POST request with persistent async client" - (let [response (common/post client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "PUT request with persistent async client" - (let [response (common/put client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "DELETE request with persistent async client" - (let [response (common/delete client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "TRACE request with persistent async client" - (let [response (common/trace client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "OPTIONS request with persistent async client" - (let [response (common/options client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "PATCH request with persistent async client" - (let [response (common/patch client "http://localhost:10000/hello/")] - (is (= 200 (:status @response))) - (is (= "Hello, World!" (slurp (:body @response)))))) - (testing "client closes properly" - (common/close client) - (is (thrown? IllegalStateException (common/get client "http://localhost:10000/hello/")))))))) + (testing "java async client" + (let [request-options (RequestOptions. (URI. "http://localhost:10000/hello/")) + client-options (ClientOptions.) + client (AsyncHttpClient/createClient client-options)] + (testing "HEAD request with persistent async client" + (let [response (.head client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= nil (.getBody (.deref response)))))) + (testing "GET request with persistent async client" + (let [response (.get client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "POST request with persistent async client" + (let [response (.post client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "PUT request with persistent async client" + (let [response (.put client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "DELETE request with persistent async client" + (let [response (.delete client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "TRACE request with persistent async client" + (let [response (.trace client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "OPTIONS request with persistent async client" + (let [response (.options client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "PATCH request with persistent async client" + (let [response (.patch client request-options)] + (is (= 200 (.getStatus (.deref response)))) + (is (= "Hello, World!" (slurp (.getBody (.deref response))))))) + (testing "client closes properly" + (.close client) + ; This sleep is here to avoid a race condition, as sometimes the get + ; request is made before the client can finish being closed + (Thread/sleep 1) + (is (thrown? IllegalStateException (.get client request-options)))))) + (testing "clojure async client" + (let [client (async/create-client {})] + (testing "HEAD request with persistent async client" + (let [response (common/head client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= nil (:body @response))))) + (testing "GET request with persistent async client" + (let [response (common/get client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "POST request with persistent async client" + (let [response (common/post client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "PUT request with persistent async client" + (let [response (common/put client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "DELETE request with persistent async client" + (let [response (common/delete client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "TRACE request with persistent async client" + (let [response (common/trace client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "OPTIONS request with persistent async client" + (let [response (common/options client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "PATCH request with persistent async client" + (let [response (common/patch client "http://localhost:10000/hello/")] + (is (= 200 (:status @response))) + (is (= "Hello, World!" (slurp (:body @response)))))) + (testing "client closes properly" + (common/close client) + (is (thrown? IllegalStateException (common/get client "http://localhost:10000/hello/"))))))))) (deftest request-with-client-test (testlogging/with-test-logging