add broker-list to help build metadata.broker.list strings; change clj-kafka.zk/brokers to return an empty list if zookeeper is available but no kafka brokers are registered.
This commit is contained in:
parent
157ca25220
commit
8acfb69df6
3 changed files with 21 additions and 8 deletions
|
@ -1,4 +1,4 @@
|
||||||
(defproject clj-kafka/clj-kafka "0.2.0-0.8"
|
(defproject clj-kafka/clj-kafka "0.2.1-0.8"
|
||||||
:min-lein-version "2.0.0"
|
:min-lein-version "2.0.0"
|
||||||
:dependencies [[org.clojure/clojure "1.5.1"]
|
:dependencies [[org.clojure/clojure "1.5.1"]
|
||||||
[zookeeper-clj "0.9.3"]
|
[zookeeper-clj "0.9.3"]
|
||||||
|
|
|
@ -1,18 +1,28 @@
|
||||||
(ns clj-kafka.zk
|
(ns clj-kafka.zk
|
||||||
(:use [clojure.data.json :only (read-str)]
|
(:use [clojure.data.json :only (read-str)]
|
||||||
[clj-kafka.core :only (with-resource)])
|
[clj-kafka.core :only (with-resource)])
|
||||||
(:require [zookeeper :as zk]))
|
(:require [zookeeper :as zk]
|
||||||
|
[clojure.string :as s]))
|
||||||
|
|
||||||
(defn brokers
|
(defn brokers
|
||||||
"Get brokers from zookeeper"
|
"Get brokers from zookeeper"
|
||||||
[m]
|
[m]
|
||||||
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
|
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
|
||||||
zk/close
|
zk/close
|
||||||
(doall (map (comp #(read-str % :key-fn keyword)
|
(if-let [broker-ids (zk/children z "/brokers/ids")]
|
||||||
#(String. ^bytes %)
|
(doall (map (comp #(read-str % :key-fn keyword)
|
||||||
:data
|
#(String. ^bytes %)
|
||||||
#(zk/data z (str "/brokers/ids/" %)))
|
:data
|
||||||
(zk/children z "/brokers/ids")))))
|
#(zk/data z (str "/brokers/ids/" %)))
|
||||||
|
broker-ids))
|
||||||
|
'())))
|
||||||
|
|
||||||
|
(defn broker-list
|
||||||
|
"Returns a comma separated list of Kafka brokers, as returned from clj-kafka.zk/brokers.
|
||||||
|
e.g.: (broker-list (brokers {\"zookeeper.connect\" \"127.0.0.1:2181\"})) "
|
||||||
|
[brokers]
|
||||||
|
(when (seq brokers)
|
||||||
|
(s/join "," (map (fn [{:keys [host port]}] (str host ":" port)) brokers))))
|
||||||
|
|
||||||
(defn- controller-broker-id
|
(defn- controller-broker-id
|
||||||
[^String zk-data]
|
[^String zk-data]
|
||||||
|
|
|
@ -12,6 +12,9 @@
|
||||||
(brokers zk-connect))
|
(brokers zk-connect))
|
||||||
(expect count 1))
|
(expect count 1))
|
||||||
|
|
||||||
|
(expect nil (broker-list []))
|
||||||
|
(expect "localhost:2181" (broker-list [{:host "localhost" :port "2181"}]))
|
||||||
|
|
||||||
(given (with-test-broker config
|
(given (with-test-broker config
|
||||||
(first (brokers zk-connect)))
|
(first (brokers zk-connect)))
|
||||||
(expect :host "localhost"
|
(expect :host "localhost"
|
||||||
|
|
Loading…
Reference in a new issue