Merge pull request #6 from msprunck/discovery-with-curator

Use curator for druid node discovery
This commit is contained in:
gbuisson 2016-08-08 10:00:52 +02:00 committed by GitHub
commit 3c67327f52
5 changed files with 81 additions and 83 deletions

View file

@ -23,7 +23,8 @@ this method supports auto detection and update of available brokers for easy HA/
```clj ```clj
(use 'clj-druid.client) (use 'clj-druid.client)
(connect {:zk {:host "127.0.0.1:2181" (connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery"}}) :discovery-path "/druid/discovery"
:node-type "broker"}})
``` ```
you can also connect by supplying a vector of hosts, useful for dev, local testing you can also connect by supplying a vector of hosts, useful for dev, local testing
@ -64,8 +65,11 @@ Issue druid queries supplying
{:type :fieldAccess :name "sample_name2" :fieldName "sample_fieldName2"}]}] {:type :fieldAccess :name "sample_name2" :fieldName "sample_fieldName2"}]}]
:intervals ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000"]}) :intervals ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000"]})
(let [client (connect {:zk {:host "127.0.0.1:2181"
(query random (:queryType q) q) :discovery-path "/druid/discovery"
:node-type "broker"}})]
(query client random (:queryType q) q)
(close client))
``` ```
### Example ### Example

View file

@ -7,10 +7,11 @@
(def default-druid-host "http://localhost:8083/druid/v2") (def default-druid-host "http://localhost:8083/druid/v2")
(client/connect {:hosts [default-druid-host]}) (def druid-client (client/connect {:hosts [default-druid-host]}))
(defn do-query [q] (defn do-query [q]
(ok (client/query (ok (client/query
druid-client
client/randomized client/randomized
(:queryType q) q))) (:queryType q) q)))

View file

@ -1,4 +1,4 @@
(defproject y42/clj-druid "0.2.12" (defproject y42/clj-druid "0.2.13-SNAPSHOT"
:description "Clojure library for Druid.io" :description "Clojure library for Druid.io"
:url "http://github.com/y42/clj-druid" :url "http://github.com/y42/clj-druid"
:license {:name "Eclipse Public License" :license {:name "Eclipse Public License"
@ -15,4 +15,4 @@
[prismatic/schema "1.0.4"] [prismatic/schema "1.0.4"]
[org.clojure/tools.logging "0.3.1"] [org.clojure/tools.logging "0.3.1"]
[org.clojure/data.json "0.2.6"] [org.clojure/data.json "0.2.6"]
[zookeeper-clj "0.9.1"]]) [curator "0.0.6"]])

View file

@ -1,102 +1,92 @@
(ns clj-druid.client (ns clj-druid.client
(:require [zookeeper :as zk] (:require [curator.framework :refer [curator-framework]]
[zookeeper.data :as data] [curator.discovery :refer [service-discovery service-cache instances]]
[clojure.data.json :as json] [clojure.data.json :as json]
[clj-druid.schemas.query :as sch] [clj-druid.schemas.query :as sch]
[clj-druid.validations :as v] [clj-druid.validations :as v]
[swiss.arrows :refer :all] [swiss.arrows :refer :all]
[clj-http.client :as http])) [clj-http.client :as http]))
(def default-timeout 5000) (defprotocol ^:private Client
(close [this]
"Close all connections to dependencies (ex: zookeeper)")
(nodes [this]
"Get a list of druid available nodes"))
(def nodes-list (defn static-client
"contains all zk nodes discovered" "Creates a druid client based on user defined hosts."
(atom [])) [hosts]
(reify Client
(defn reset-node-list (close [_])
"update node list atom" (nodes [_]
[nodes] hosts)))
(reset! nodes-list nodes))
(defn make-node-path
"make a zk path string"
[discovery-path node-type]
(str discovery-path "/" node-type))
(defn make-host-http-str (defn make-host-http-str
"make an http url string from a zk node entry" "make an http url string from a curator instance"
[c] [instance]
(str "http://" (str "http://"
(get c "address") (.getAddress instance)
":" ":"
(get c "port") (.getPort instance)
"/druid/v2/")) "/druid/v2/"))
(defn zk-watch-node-list (defn curator-nodes
"Retrieve hosts from zk discovery" "Get the nodes list from zookeeper using the cached service provider."
[zk-client path] [service-cache]
(map make-host-http-str (instances service-cache)))
(-<>> path (defn curator-client
(zk/children zk-client <> "Create a druid client using the curator service discovery."
:watch? true
:watcher #(do (println %)
(zk-watch-node-list zk-client path)))
(map #(data/to-string (:data (zk/data zk-client (str path "/" %)))))
(map #(json/read-str %))
(map #(make-host-http-str %))
(vec)
(reset-node-list)))
(defn from-zookeeper
"Maintain a druid http server list from zookeeper"
[config] [config]
(let [client (curator-framework (:host config))
(let [node-path (make-node-path (:discovery-path config) discovery (service-discovery client
(:node-type config)) nil
:base-path
zk-client (zk/connect (:host config))] (:discovery-path config))
sc (service-cache discovery (:node-type config))]
(zk-watch-node-list zk-client node-path))) (.start client)
(.start discovery)
(defn from-user (.start sc)
"Maintain a druid http server list from user" (reify Client
[hosts] (close [_]
(reset-node-list hosts)) (.close sc)
(.close discovery)
(.close client))
(nodes [_]
(curator-nodes sc)))))
(defn randomized (defn randomized
"Take a random host" "Take a random host"
[] [nodes]
(if (empty? @nodes-list) (rand-nth nodes))
(throw (Exception.
"No druid node available for query")))
(rand-nth @nodes-list))
(defn fixed (defn fixed
"Always take first host" "Always take first host"
[] [nodes]
(first @nodes-list)) (first nodes))
(defn close [client]
(close client))
(defn connect (defn connect
"Create a druid client from zk or "Create a druid client from zk or
a user defined host" a user defined host"
[params] [params]
(if (:zk params) (if (:zk params)
(from-zookeeper (:zk params)) (curator-client (:zk params))
(from-user (:hosts params)))) (static-client (:hosts params))))
(defn query (defn query
"Issue a druid query" "Issue a druid query"
[balance-strategy query-type druid-query & params] [client balance-strategy query-type druid-query & params]
(let [params (apply hash-map params) (let [params (apply hash-map params)
options (-<> (into druid-query {:queryType query-type}) options (-<> (into druid-query {:queryType query-type})
(v/validate query-type) (v/validate query-type)
(json/write-str <>) (json/write-str <>)
{:body <> :as :text} {:body <> :as :text}
(merge params))] (merge params))
(http/post (balance-strategy) options))) nodes (nodes client)]
(when (empty? nodes)
(throw (Exception.
"No druid node available for query")))
(http/post (balance-strategy nodes) options)))

View file

@ -4,20 +4,23 @@
[clj-druid.client :refer :all])) [clj-druid.client :refer :all]))
(deftest ^:integration test-connect-zookeeper (deftest ^:integration test-connect-zookeeper
(connect {:zk {:host "127.0.0.1:2181" (let [client (connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery" :discovery-path "/druid/discovery"
:node-type "broker"}})) :node-type "broker"}})]
(close client)))
(deftest ^:integration test-connect-user (deftest ^:integration test-connect-user
(connect {:hosts ["http://localhost:8082/druid/v2/"]})) (let [client (connect {:hosts ["http://localhost:8082/druid/v2/"]})]
(close client)))
(deftest ^:integration test-zk-query (deftest ^:integration test-zk-query
(connect {:zk {:host "127.0.0.1:2181" (let [client (connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery" :discovery-path "/druid/discovery"
:node-type "broker"}}) :node-type "broker"}})]
(query client randomized :groupBy f/valid-groupby-query)
(query randomized :groupBy f/valid-groupby-query)) (close client)))
(deftest ^:integration test-user-query (deftest ^:integration test-user-query
(connect {:hosts ["http://127.0.0.1:8083/druid/v2/"]}) (let [client (connect {:hosts ["http://127.0.0.1:8083/druid/v2/"]})]
(query randomized :groupBy f/valid-groupby-query :timeout 5000)) (query client randomized :groupBy f/valid-groupby-query :timeout 5000)
(close client)))