Use curator for druid node discovery

This commit is contained in:
Matthieu Sprunck 2016-08-05 11:21:22 +02:00
parent 27ba5dc05a
commit 3158070745
5 changed files with 81 additions and 83 deletions

View file

@ -23,7 +23,8 @@ this method supports auto detection and update of available brokers for easy HA/
```clj
(use 'clj-druid.client)
(connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery"}})
:discovery-path "/druid/discovery"
:node-type "broker"}})
```
you can also connect by supplying a vector of hosts, useful for dev, local testing
@ -64,8 +65,11 @@ Issue druid queries supplying
{:type :fieldAccess :name "sample_name2" :fieldName "sample_fieldName2"}]}]
:intervals ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000"]})
(query random (:queryType q) q)
(let [client (connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery"
:node-type "broker"}})]
(query client random (:queryType q) q)
(close client))
```
### Example

View file

@ -7,10 +7,11 @@
(def default-druid-host "http://localhost:8083/druid/v2")
(client/connect {:hosts [default-druid-host]})
(def druid-client (client/connect {:hosts [default-druid-host]}))
(defn do-query [q]
(ok (client/query
druid-client
client/randomized
(:queryType q) q)))

View file

@ -1,4 +1,4 @@
(defproject y42/clj-druid "0.2.12"
(defproject y42/clj-druid "0.2.13-SNAPSHOT"
:description "Clojure library for Druid.io"
:url "http://github.com/y42/clj-druid"
:license {:name "Eclipse Public License"
@ -15,4 +15,4 @@
[prismatic/schema "1.0.4"]
[org.clojure/tools.logging "0.3.1"]
[org.clojure/data.json "0.2.6"]
[zookeeper-clj "0.9.1"]])
[curator "0.0.6"]])

View file

@ -1,102 +1,92 @@
(ns clj-druid.client
(:require [zookeeper :as zk]
[zookeeper.data :as data]
(:require [curator.framework :refer [curator-framework]]
[curator.discovery :refer [service-discovery service-cache instances]]
[clojure.data.json :as json]
[clj-druid.schemas.query :as sch]
[clj-druid.validations :as v]
[swiss.arrows :refer :all]
[clj-http.client :as http]))
(def default-timeout 5000)
(defprotocol ^:private Client
(close [this]
"Close all connections to dependencies (ex: zookeeper)")
(nodes [this]
"Get a list of druid available nodes"))
(def nodes-list
"contains all zk nodes discovered"
(atom []))
(defn reset-node-list
"update node list atom"
[nodes]
(reset! nodes-list nodes))
(defn make-node-path
"make a zk path string"
[discovery-path node-type]
(str discovery-path "/" node-type))
(defn static-client
"Creates a druid client based on user defined hosts."
[hosts]
(reify Client
(close [_])
(nodes [_]
hosts)))
(defn make-host-http-str
"make an http url string from a zk node entry"
[c]
"make an http url string from a curator instance"
[instance]
(str "http://"
(get c "address")
(.getAddress instance)
":"
(get c "port")
(.getPort instance)
"/druid/v2/"))
(defn zk-watch-node-list
"Retrieve hosts from zk discovery"
[zk-client path]
(defn curator-nodes
"Get the nodes list from zookeeper using the cached service provider."
[service-cache]
(map make-host-http-str (instances service-cache)))
(-<>> path
(zk/children zk-client <>
:watch? true
:watcher #(do (println %)
(zk-watch-node-list zk-client path)))
(map #(data/to-string (:data (zk/data zk-client (str path "/" %)))))
(map #(json/read-str %))
(map #(make-host-http-str %))
(vec)
(reset-node-list)))
(defn from-zookeeper
"Maintain a druid http server list from zookeeper"
(defn curator-client
"Create a druid client using the curator service discovery."
[config]
(let [node-path (make-node-path (:discovery-path config)
(:node-type config))
zk-client (zk/connect (:host config))]
(zk-watch-node-list zk-client node-path)))
(defn from-user
"Maintain a druid http server list from user"
[hosts]
(reset-node-list hosts))
(let [client (curator-framework (:host config))
discovery (service-discovery client
nil
:base-path
(:discovery-path config))
sc (service-cache discovery (:node-type config))]
(.start client)
(.start discovery)
(.start sc)
(reify Client
(close [_]
(.close sc)
(.close discovery)
(.close client))
(nodes [_]
(curator-nodes sc)))))
(defn randomized
"Take a random host"
[]
(if (empty? @nodes-list)
(throw (Exception.
"No druid node available for query")))
(rand-nth @nodes-list))
[nodes]
(rand-nth nodes))
(defn fixed
"Always take first host"
[]
(first @nodes-list))
[nodes]
(first nodes))
(defn close [client]
(close client))
(defn connect
"Create a druid client from zk or
a user defined host"
[params]
(if (:zk params)
(from-zookeeper (:zk params))
(from-user (:hosts params))))
(curator-client (:zk params))
(static-client (:hosts params))))
(defn query
"Issue a druid query"
[balance-strategy query-type druid-query & params]
[client balance-strategy query-type druid-query & params]
(let [params (apply hash-map params)
options (-<> (into druid-query {:queryType query-type})
(v/validate query-type)
(json/write-str <>)
{:body <> :as :text}
(merge params))]
(http/post (balance-strategy) options)))
(merge params))
nodes (nodes client)]
(when (empty? nodes)
(throw (Exception.
"No druid node available for query")))
(http/post (balance-strategy nodes) options)))

View file

@ -4,20 +4,23 @@
[clj-druid.client :refer :all]))
(deftest ^:integration test-connect-zookeeper
(connect {:zk {:host "127.0.0.1:2181"
(let [client (connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery"
:node-type "broker"}}))
:node-type "broker"}})]
(close client)))
(deftest ^:integration test-connect-user
(connect {:hosts ["http://localhost:8082/druid/v2/"]}))
(let [client (connect {:hosts ["http://localhost:8082/druid/v2/"]})]
(close client)))
(deftest ^:integration test-zk-query
(connect {:zk {:host "127.0.0.1:2181"
(let [client (connect {:zk {:host "127.0.0.1:2181"
:discovery-path "/druid/discovery"
:node-type "broker"}})
(query randomized :groupBy f/valid-groupby-query))
:node-type "broker"}})]
(query client randomized :groupBy f/valid-groupby-query)
(close client)))
(deftest ^:integration test-user-query
(connect {:hosts ["http://127.0.0.1:8083/druid/v2/"]})
(query randomized :groupBy f/valid-groupby-query :timeout 5000))
(let [client (connect {:hosts ["http://127.0.0.1:8083/druid/v2/"]})]
(query client randomized :groupBy f/valid-groupby-query :timeout 5000)
(close client)))