zk ns to hold zookeeper methods

This commit is contained in:
Paul Lam 2013-06-10 11:36:37 +01:00
parent 602be6f061
commit c46c916ca0
4 changed files with 30 additions and 28 deletions

View file

@ -1,13 +1,8 @@
(ns clj-kafka.producer
(:import [kafka.javaapi.producer Producer]
[kafka.producer ProducerConfig KeyedMessage]
[kafka.message Message]
[org.I0Itec.zkclient ZkClient]
[kafka.utils ZkUtils]
[java.util List])
(:use [clj-kafka.core :only (as-properties with-resource)]
[clojure.data.json :only (read-str)])
(:require [zookeeper :as zk]))
(:use [clj-kafka.core :only (as-properties)]))
(defn producer
"Creates a Producer. m is the configuration
@ -26,14 +21,3 @@
(defn send-messages
[^Producer producer ^List messages]
(.send producer messages))
(defn brokers
"Get brokers from zookeeper"
[m]
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
zk/close
(doall (map (comp #(read-str % :key-fn keyword)
#(String. ^bytes %)
:data
#(zk/data z (str "/brokers/ids/" %)))
(zk/children z "/brokers/ids")))))

15
src/clj_kafka/zk.clj Normal file
View file

@ -0,0 +1,15 @@
(ns clj-kafka.zk
(:use [clojure.data.json :only (read-str)]
[clj-kafka.core :only (with-resource)])
(:require [zookeeper :as zk]))
(defn brokers
"Get brokers from zookeeper"
[m]
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
zk/close
(doall (map (comp #(read-str % :key-fn keyword)
#(String. ^bytes %)
:data
#(zk/data z (str "/brokers/ids/" %)))
(zk/children z "/brokers/ids")))))

View file

@ -1,16 +1,8 @@
(ns clj-kafka.test.producer
(:use [expectations]
[clj-kafka.core]
[clj-kafka.producer]
[clj-kafka.test.utils :only (with-test-broker)])
(:import [kafka.message Message]
[kafka.producer KeyedMessage]))
[clj-kafka.producer])
(:import [kafka.producer KeyedMessage]))
(expect KeyedMessage (message "topic" "value"))
(given (with-test-broker {:zookeeper-port 2182
:kafka-port 9999
:topic "test"}
(brokers {"zookeeper.connect" "127.0.0.1:2182"}))
(expect count 1
first {:host "localhost", :jmx_port -1, :port 9999, :version 1}))

View file

@ -0,0 +1,11 @@
(ns clj-kafka.test.zk
(:use expectations
clj-kafka.zk
[clj-kafka.test.utils :only (with-test-broker)]))
(given (with-test-broker {:zookeeper-port 2182
:kafka-port 9999
:topic "test"}
(brokers {"zookeeper.connect" "127.0.0.1:2182"}))
(expect count 1
first {:host "localhost", :jmx_port -1, :port 9999, :version 1}))