added brokers in producer to retrieve list of brokers from zookeeper,
cool
This commit is contained in:
parent
590f303a49
commit
c01ffa4a96
3 changed files with 28 additions and 6 deletions
|
@ -2,7 +2,8 @@
|
|||
:min-lein-version "2.0.0"
|
||||
:dependencies [[org.clojure/clojure "1.5.1"]
|
||||
[com.uswitch/kafka_2.9.2 "0.8.0-SNAPSHOT"]
|
||||
[zookeeper-clj "0.9.3"]]
|
||||
[zookeeper-clj "0.9.3"]
|
||||
[org.clojure/data.json "0.2.2"]]
|
||||
:exclusions [javax.mail/mail
|
||||
javax.jms/jms
|
||||
com.sun.jdmk/jmxtools
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
(ns clj-kafka.producer
|
||||
(:import [kafka.javaapi.producer Producer]
|
||||
[kafka.producer ProducerConfig KeyedMessage]
|
||||
[kafka.message Message])
|
||||
(:use [clj-kafka.core :only (as-properties)]))
|
||||
[kafka.message Message]
|
||||
[org.I0Itec.zkclient ZkClient]
|
||||
[kafka.utils ZkUtils])
|
||||
(:use [clj-kafka.core :only (as-properties with-resource)]
|
||||
[clojure.data.json :only (read-str)])
|
||||
(:require [zookeeper :as zk]))
|
||||
|
||||
(defprotocol MessagePayload
|
||||
"Converts message payloads to bytes"
|
||||
|
@ -24,4 +28,15 @@
|
|||
|
||||
(defn send-message
|
||||
[^Producer producer ^String topic value]
|
||||
(.send producer ^KeyedMessage (keyed-message topic (message-payload value))))
|
||||
(.send producer ^KeyedMessage (keyed-message topic (message-payload value)))))
|
||||
|
||||
(defn brokers
|
||||
"Get brokers from zookeeper"
|
||||
[m]
|
||||
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
|
||||
zk/close
|
||||
(doall (map (comp #(read-str % :key-fn keyword)
|
||||
#(String. %)
|
||||
:data
|
||||
#(zk/data z (str "/brokers/ids/" %)))
|
||||
(zk/children z "/brokers/ids")))))
|
||||
|
|
|
@ -1,10 +1,16 @@
|
|||
(ns clj-kafka.test.producer
|
||||
(:use [clojure.test]
|
||||
[clj-kafka.core]
|
||||
[clj-kafka.producer] :reload)
|
||||
[clj-kafka.producer] :reload
|
||||
[clj-kafka.test.utils :only (with-broker)])
|
||||
(:import [kafka.message Message]
|
||||
[kafka.producer KeyedMessage]))
|
||||
|
||||
(deftest keyed-messages
|
||||
(is (instance? KeyedMessage
|
||||
(keyed-message "topic" "value"))))
|
||||
|
||||
(deftest brokers-test
|
||||
(with-broker
|
||||
(is (= [{:host "localhost", :jmx_port -1, :port 9999, :version 1}]
|
||||
(brokers {"zookeeper.connect" "localhost:2182"})))))
|
||||
|
|
Loading…
Reference in a new issue