diff --git a/README.md b/README.md index 46fd22f..91eb353 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,8 @@ this method supports auto detection and update of available brokers for easy HA/ ```clj (use 'clj-druid.client) (connect {:zk {:host "127.0.0.1:2181" - :discovery-path "/druid/discovery"}}) + :discovery-path "/druid/discovery" + :node-type "broker"}}) ``` you can also connect by supplying a vector of hosts, useful for dev, local testing @@ -64,8 +65,11 @@ Issue druid queries supplying {:type :fieldAccess :name "sample_name2" :fieldName "sample_fieldName2"}]}] :intervals ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000"]}) - - (query random (:queryType q) q) + (let [client (connect {:zk {:host "127.0.0.1:2181" + :discovery-path "/druid/discovery" + :node-type "broker"}})] + (query client random (:queryType q) q) + (close client)) ``` ### Example diff --git a/examples/src/handler.clj b/examples/src/handler.clj index 2887cdd..310a6ba 100644 --- a/examples/src/handler.clj +++ b/examples/src/handler.clj @@ -7,10 +7,11 @@ (def default-druid-host "http://localhost:8083/druid/v2") -(client/connect {:hosts [default-druid-host]}) +(def druid-client (client/connect {:hosts [default-druid-host]})) (defn do-query [q] (ok (client/query + druid-client client/randomized (:queryType q) q))) diff --git a/project.clj b/project.clj index 310b422..f7a8135 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject y42/clj-druid "0.2.12" +(defproject y42/clj-druid "0.2.13-SNAPSHOT" :description "Clojure library for Druid.io" :url "http://github.com/y42/clj-druid" :license {:name "Eclipse Public License" @@ -15,4 +15,4 @@ [prismatic/schema "1.0.4"] [org.clojure/tools.logging "0.3.1"] [org.clojure/data.json "0.2.6"] - [zookeeper-clj "0.9.1"]]) + [curator "0.0.6"]]) diff --git a/src/clj_druid/client.clj b/src/clj_druid/client.clj index a1f2a06..023120a 100644 --- a/src/clj_druid/client.clj +++ b/src/clj_druid/client.clj @@ -1,102 +1,92 @@ (ns clj-druid.client - (:require [zookeeper :as zk] - [zookeeper.data :as data] + (:require [curator.framework :refer [curator-framework]] + [curator.discovery :refer [service-discovery service-cache instances]] [clojure.data.json :as json] [clj-druid.schemas.query :as sch] [clj-druid.validations :as v] [swiss.arrows :refer :all] [clj-http.client :as http])) -(def default-timeout 5000) +(defprotocol ^:private Client + (close [this] + "Close all connections to dependencies (ex: zookeeper)") + (nodes [this] + "Get a list of druid available nodes")) -(def nodes-list - "contains all zk nodes discovered" - (atom [])) - -(defn reset-node-list - "update node list atom" - [nodes] - (reset! nodes-list nodes)) - -(defn make-node-path - "make a zk path string" - [discovery-path node-type] - - (str discovery-path "/" node-type)) +(defn static-client + "Creates a druid client based on user defined hosts." + [hosts] + (reify Client + (close [_]) + (nodes [_] + hosts))) (defn make-host-http-str - "make an http url string from a zk node entry" - [c] - + "make an http url string from a curator instance" + [instance] (str "http://" - (get c "address") + (.getAddress instance) ":" - (get c "port") + (.getPort instance) "/druid/v2/")) -(defn zk-watch-node-list - "Retrieve hosts from zk discovery" - [zk-client path] +(defn curator-nodes + "Get the nodes list from zookeeper using the cached service provider." + [service-cache] + (map make-host-http-str (instances service-cache))) - (-<>> path - (zk/children zk-client <> - :watch? true - :watcher #(do (println %) - (zk-watch-node-list zk-client path))) - - (map #(data/to-string (:data (zk/data zk-client (str path "/" %))))) - (map #(json/read-str %)) - (map #(make-host-http-str %)) - (vec) - (reset-node-list))) - -(defn from-zookeeper - "Maintain a druid http server list from zookeeper" +(defn curator-client + "Create a druid client using the curator service discovery." [config] - - (let [node-path (make-node-path (:discovery-path config) - (:node-type config)) - - zk-client (zk/connect (:host config))] - - (zk-watch-node-list zk-client node-path))) - -(defn from-user - "Maintain a druid http server list from user" - [hosts] - (reset-node-list hosts)) + (let [client (curator-framework (:host config)) + discovery (service-discovery client + nil + :base-path + (:discovery-path config)) + sc (service-cache discovery (:node-type config))] + (.start client) + (.start discovery) + (.start sc) + (reify Client + (close [_] + (.close sc) + (.close discovery) + (.close client)) + (nodes [_] + (curator-nodes sc))))) (defn randomized "Take a random host" - [] - (if (empty? @nodes-list) - (throw (Exception. - "No druid node available for query"))) - - (rand-nth @nodes-list)) + [nodes] + (rand-nth nodes)) (defn fixed "Always take first host" - [] - (first @nodes-list)) + [nodes] + (first nodes)) + +(defn close [client] + (close client)) (defn connect "Create a druid client from zk or a user defined host" [params] (if (:zk params) - (from-zookeeper (:zk params)) - (from-user (:hosts params)))) + (curator-client (:zk params)) + (static-client (:hosts params)))) (defn query "Issue a druid query" - [balance-strategy query-type druid-query & params] - + [client balance-strategy query-type druid-query & params] (let [params (apply hash-map params) - options (-<> (into druid-query {:queryType query-type}) (v/validate query-type) (json/write-str <>) {:body <> :as :text} - (merge params))] - (http/post (balance-strategy) options))) + (merge params)) + nodes (nodes client)] + (when (empty? nodes) + (throw (Exception. + "No druid node available for query"))) + (http/post (balance-strategy nodes) options))) diff --git a/test/clj_druid/client_test.clj b/test/clj_druid/client_test.clj index 36ac17f..b56133b 100644 --- a/test/clj_druid/client_test.clj +++ b/test/clj_druid/client_test.clj @@ -4,20 +4,23 @@ [clj-druid.client :refer :all])) (deftest ^:integration test-connect-zookeeper - (connect {:zk {:host "127.0.0.1:2181" - :discovery-path "/druid/discovery" - :node-type "broker"}})) + (let [client (connect {:zk {:host "127.0.0.1:2181" + :discovery-path "/druid/discovery" + :node-type "broker"}})] + (close client))) (deftest ^:integration test-connect-user - (connect {:hosts ["http://localhost:8082/druid/v2/"]})) + (let [client (connect {:hosts ["http://localhost:8082/druid/v2/"]})] + (close client))) (deftest ^:integration test-zk-query - (connect {:zk {:host "127.0.0.1:2181" - :discovery-path "/druid/discovery" - :node-type "broker"}}) - - (query randomized :groupBy f/valid-groupby-query)) + (let [client (connect {:zk {:host "127.0.0.1:2181" + :discovery-path "/druid/discovery" + :node-type "broker"}})] + (query client randomized :groupBy f/valid-groupby-query) + (close client))) (deftest ^:integration test-user-query - (connect {:hosts ["http://127.0.0.1:8083/druid/v2/"]}) - (query randomized :groupBy f/valid-groupby-query :timeout 5000)) + (let [client (connect {:hosts ["http://127.0.0.1:8083/druid/v2/"]})] + (query client randomized :groupBy f/valid-groupby-query :timeout 5000) + (close client)))