From c01ffa4a96112ca9cc63ba0cb997f2a3c99b79a0 Mon Sep 17 00:00:00 2001 From: Paul Lam Date: Fri, 7 Jun 2013 16:15:54 +0100 Subject: [PATCH] added brokers in producer to retrieve list of brokers from zookeeper, cool --- project.clj | 3 ++- src/clj_kafka/producer.clj | 21 ++++++++++++++++++--- test/clj_kafka/test/producer.clj | 10 ++++++++-- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/project.clj b/project.clj index 38bbf3a..7438eec 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,8 @@ :min-lein-version "2.0.0" :dependencies [[org.clojure/clojure "1.5.1"] [com.uswitch/kafka_2.9.2 "0.8.0-SNAPSHOT"] - [zookeeper-clj "0.9.3"]] + [zookeeper-clj "0.9.3"] + [org.clojure/data.json "0.2.2"]] :exclusions [javax.mail/mail javax.jms/jms com.sun.jdmk/jmxtools diff --git a/src/clj_kafka/producer.clj b/src/clj_kafka/producer.clj index 7f2d310..c263a8d 100644 --- a/src/clj_kafka/producer.clj +++ b/src/clj_kafka/producer.clj @@ -1,8 +1,12 @@ (ns clj-kafka.producer (:import [kafka.javaapi.producer Producer] [kafka.producer ProducerConfig KeyedMessage] - [kafka.message Message]) - (:use [clj-kafka.core :only (as-properties)])) + [kafka.message Message] + [org.I0Itec.zkclient ZkClient] + [kafka.utils ZkUtils]) + (:use [clj-kafka.core :only (as-properties with-resource)] + [clojure.data.json :only (read-str)]) + (:require [zookeeper :as zk])) (defprotocol MessagePayload "Converts message payloads to bytes" @@ -24,4 +28,15 @@ (defn send-message [^Producer producer ^String topic value] - (.send producer ^KeyedMessage (keyed-message topic (message-payload value)))) + (.send producer ^KeyedMessage (keyed-message topic (message-payload value))))) + +(defn brokers + "Get brokers from zookeeper" + [m] + (with-resource [z (zk/connect (get m "zookeeper.connect"))] + zk/close + (doall (map (comp #(read-str % :key-fn keyword) + #(String. %) + :data + #(zk/data z (str "/brokers/ids/" %))) + (zk/children z "/brokers/ids"))))) diff --git a/test/clj_kafka/test/producer.clj b/test/clj_kafka/test/producer.clj index 30c5af3..44e6bac 100644 --- a/test/clj_kafka/test/producer.clj +++ b/test/clj_kafka/test/producer.clj @@ -1,10 +1,16 @@ (ns clj-kafka.test.producer (:use [clojure.test] [clj-kafka.core] - [clj-kafka.producer] :reload) + [clj-kafka.producer] :reload + [clj-kafka.test.utils :only (with-broker)]) (:import [kafka.message Message] [kafka.producer KeyedMessage])) (deftest keyed-messages (is (instance? KeyedMessage - (keyed-message "topic" "value")))) \ No newline at end of file + (keyed-message "topic" "value")))) + +(deftest brokers-test + (with-broker + (is (= [{:host "localhost", :jmx_port -1, :port 9999, :version 1}] + (brokers {"zookeeper.connect" "localhost:2182"})))))