(TK-108) Add a persistent async Java client

Add a new class, PersistentAsyncHttpClient, that is a wrapped
CloseableHttpAsyncClient that is persistent.
This commit is contained in:
Preben Ingvaldsen 2014-11-07 13:29:17 -08:00
parent 972de1e9e9
commit 018fe70153
4 changed files with 215 additions and 51 deletions

View file

@ -1,8 +1,6 @@
package com.puppetlabs.http.client;
import com.puppetlabs.http.client.impl.JavaClient;
import com.puppetlabs.http.client.impl.Promise;
import com.puppetlabs.http.client.impl.SslUtils;
import com.puppetlabs.http.client.impl.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,7 +15,13 @@ public class AsyncHttpClient {
return JavaClient.request(requestOptions, clientOptions, null);
}
public static PersistentAsyncHttpClient createClient(ClientOptions clientOptions) {
clientOptions = SslUtils.configureSsl(clientOptions);
CoercedClientOptions coercedClientOptions = JavaClient.coerceClientOptions(clientOptions);
return new PersistentAsyncHttpClient(JavaClient.createClient(coercedClientOptions));
}
public static Promise<Response> get(String url) throws URISyntaxException {
return get(new URI(url));
}

View file

@ -129,7 +129,7 @@ public class JavaClient {
return new CoercedRequestOptions(uri, method, headers, body);
}
private static CoercedClientOptions coerceClientOptions(ClientOptions options) {
public static CoercedClientOptions coerceClientOptions(ClientOptions options) {
SSLContext sslContext = null;
if (options.getSslContext() != null) {
sslContext = options.getSslContext();
@ -187,11 +187,19 @@ public class JavaClient {
}
public static Promise<Response> request(final RequestOptions requestOptions, final ClientOptions clientOptions, final IResponseCallback callback) {
CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions);
CoercedClientOptions coercedClientOptions = coerceClientOptions(clientOptions);
final CloseableHttpAsyncClient client = createClient(coercedClientOptions);
return requestWithClient(requestOptions, callback, client, false);
}
public static Promise<Response> requestWithClient(final RequestOptions requestOptions,
final IResponseCallback callback,
final CloseableHttpAsyncClient client,
final boolean persistent) {
CoercedRequestOptions coercedRequestOptions = coerceRequestOptions(requestOptions);
HttpRequestBase request = constructRequest(coercedRequestOptions.getMethod(),
coercedRequestOptions.getUri(), coercedRequestOptions.getBody());
request.setHeaders(coercedRequestOptions.getHeaders());
@ -226,27 +234,27 @@ public class JavaClient {
new Response(requestOptions, origContentEncoding, body,
headers, httpResponse.getStatusLine().getStatusCode(),
contentType),
callback, promise);
callback, promise, persistent);
} catch (Exception e) {
deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise);
deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise, persistent);
}
}
@Override
public void failed(Exception e) {
deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise);
deliverResponse(client, requestOptions, new Response(requestOptions, e), callback, promise, persistent);
}
@Override
public void cancelled() {
deliverResponse(client, requestOptions, new Response(requestOptions, new HttpClientException("Request cancelled", null)), callback, promise);
deliverResponse(client, requestOptions, new Response(requestOptions, new HttpClientException("Request cancelled", null)), callback, promise, persistent);
}
});
return promise;
}
private static CloseableHttpAsyncClient createClient(CoercedClientOptions coercedOptions) {
public static CloseableHttpAsyncClient createClient(CoercedClientOptions coercedOptions) {
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom();
if (coercedOptions.getSslContext() != null) {
clientBuilder.setSSLStrategy(
@ -283,7 +291,7 @@ public class JavaClient {
private static void deliverResponse(CloseableHttpAsyncClient client, RequestOptions options,
Response httpResponse, IResponseCallback callback,
Promise<Response> promise) {
Promise<Response> promise, boolean persistent) {
try {
if (callback != null) {
try {
@ -304,7 +312,9 @@ public class JavaClient {
// great solution but avoids the deadlock until an implementation
// that allows the originating request thread to perform the client
// close can be done.
AsyncClose.close(client);
if (!persistent) {
AsyncClose.close(client);
}
}
}

View file

@ -0,0 +1,106 @@
package com.puppetlabs.http.client.impl;
import com.puppetlabs.http.client.*;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
public class PersistentAsyncHttpClient {
private CloseableHttpAsyncClient client;
private static final Logger LOGGER = LoggerFactory.getLogger(SyncHttpClient.class);
public PersistentAsyncHttpClient(CloseableHttpAsyncClient client) {
this.client = client;
}
public void close() {
AsyncClose.close(client);
}
public Promise<Response> request(RequestOptions requestOptions) {
return JavaClient.requestWithClient(requestOptions, null, client, true);
}
public Promise<Response> get(String url) throws URISyntaxException {
return get(new URI(url));
}
public Promise<Response> get(URI uri) {
return get(new RequestOptions(uri));
}
public Promise<Response> get(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.GET));
}
public Promise<Response> head(String url) throws URISyntaxException {
return head(new URI(url));
}
public Promise<Response> head(URI uri) {
return head(new RequestOptions(uri));
}
public Promise<Response> head(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.HEAD));
}
public Promise<Response> post(String url) throws URISyntaxException {
return post(new URI(url));
}
public Promise<Response> post(URI uri) {
return post(new RequestOptions(uri));
}
public Promise<Response> post(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.POST));
}
public Promise<Response> put(String url) throws URISyntaxException {
return put(new URI(url));
}
public Promise<Response> put(URI uri) {
return put(new RequestOptions(uri));
}
public Promise<Response> put(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.PUT));
}
public Promise<Response> delete(String url) throws URISyntaxException {
return delete(new URI(url));
}
public Promise<Response> delete(URI uri) {
return delete(new RequestOptions(uri));
}
public Promise<Response> delete(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.DELETE));
}
public Promise<Response> trace(String url) throws URISyntaxException {
return trace(new URI(url));
}
public Promise<Response> trace(URI uri) {
return trace(new RequestOptions(uri));
}
public Promise<Response> trace(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.TRACE));
}
public Promise<Response> options(String url) throws URISyntaxException {
return options(new URI(url));
}
public Promise<Response> options(URI uri) {
return options(new RequestOptions(uri));
}
public Promise<Response> options(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.OPTIONS));
}
public Promise<Response> patch(String url) throws URISyntaxException {
return patch(new URI(url));
}
public Promise<Response> patch(URI uri) {
return patch(new RequestOptions(uri));
}
public Promise<Response> patch(RequestOptions requestOptions) {
return request(requestOptions.setMethod(HttpMethod.PATCH));
}
}

View file

@ -1,7 +1,8 @@
(ns puppetlabs.http.client.async-plaintext-test
(:import (com.puppetlabs.http.client AsyncHttpClient RequestOptions ClientOptions)
(org.apache.http.impl.nio.client HttpAsyncClients)
(java.net URI))
(java.net URI)
(com.puppetlabs.http.client.impl PersistentAsyncHttpClient))
(:require [clojure.test :refer :all]
[puppetlabs.http.client.test-common :refer :all]
[puppetlabs.trapperkeeper.core :as tk]
@ -54,7 +55,7 @@
response (AsyncHttpClient/head request-options client-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= nil (.getBody (.deref response))))))
(testing "clojure sync client"
(testing "clojure async client"
(let [response (async/head "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= nil (:body @response))))))))
@ -85,42 +86,85 @@
(testutils/with-app-with-config app
[jetty9/jetty9-service test-web-service]
{:webserver {:port 10000}}
(let [client (async/create-client {})]
(testing "HEAD request with persistent async client"
(let [response (common/head client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= nil (:body @response)))))
(testing "GET request with persistent async client"
(let [response (common/get client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "POST request with persistent async client"
(let [response (common/post client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PUT request with persistent async client"
(let [response (common/put client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "DELETE request with persistent async client"
(let [response (common/delete client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "TRACE request with persistent async client"
(let [response (common/trace client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "OPTIONS request with persistent async client"
(let [response (common/options client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PATCH request with persistent async client"
(let [response (common/patch client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "client closes properly"
(common/close client)
(is (thrown? IllegalStateException (common/get client "http://localhost:10000/hello/"))))))))
(testing "java async client"
(let [request-options (RequestOptions. (URI. "http://localhost:10000/hello/"))
client-options (ClientOptions.)
client (AsyncHttpClient/createClient client-options)]
(testing "HEAD request with persistent async client"
(let [response (.head client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= nil (.getBody (.deref response))))))
(testing "GET request with persistent async client"
(let [response (.get client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "POST request with persistent async client"
(let [response (.post client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "PUT request with persistent async client"
(let [response (.put client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "DELETE request with persistent async client"
(let [response (.delete client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "TRACE request with persistent async client"
(let [response (.trace client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "OPTIONS request with persistent async client"
(let [response (.options client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "PATCH request with persistent async client"
(let [response (.patch client request-options)]
(is (= 200 (.getStatus (.deref response))))
(is (= "Hello, World!" (slurp (.getBody (.deref response)))))))
(testing "client closes properly"
(.close client)
; This sleep is here to avoid a race condition, as sometimes the get
; request is made before the client can finish being closed
(Thread/sleep 1)
(is (thrown? IllegalStateException (.get client request-options))))))
(testing "clojure async client"
(let [client (async/create-client {})]
(testing "HEAD request with persistent async client"
(let [response (common/head client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= nil (:body @response)))))
(testing "GET request with persistent async client"
(let [response (common/get client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "POST request with persistent async client"
(let [response (common/post client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PUT request with persistent async client"
(let [response (common/put client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "DELETE request with persistent async client"
(let [response (common/delete client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "TRACE request with persistent async client"
(let [response (common/trace client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "OPTIONS request with persistent async client"
(let [response (common/options client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "PATCH request with persistent async client"
(let [response (common/patch client "http://localhost:10000/hello/")]
(is (= 200 (:status @response)))
(is (= "Hello, World!" (slurp (:body @response))))))
(testing "client closes properly"
(common/close client)
(is (thrown? IllegalStateException (common/get client "http://localhost:10000/hello/")))))))))
(deftest request-with-client-test
(testlogging/with-test-logging