From c46c916ca0bc4b0fa95f8f14c2de2250bd474d40 Mon Sep 17 00:00:00 2001 From: Paul Lam Date: Mon, 10 Jun 2013 11:36:37 +0100 Subject: [PATCH] zk ns to hold zookeeper methods --- src/clj_kafka/producer.clj | 18 +----------------- src/clj_kafka/zk.clj | 15 +++++++++++++++ test/clj_kafka/test/producer.clj | 14 +++----------- test/clj_kafka/test/zk.clj | 11 +++++++++++ 4 files changed, 30 insertions(+), 28 deletions(-) create mode 100644 src/clj_kafka/zk.clj create mode 100644 test/clj_kafka/test/zk.clj diff --git a/src/clj_kafka/producer.clj b/src/clj_kafka/producer.clj index a0f60c8..5ccf944 100644 --- a/src/clj_kafka/producer.clj +++ b/src/clj_kafka/producer.clj @@ -1,13 +1,8 @@ (ns clj-kafka.producer (:import [kafka.javaapi.producer Producer] [kafka.producer ProducerConfig KeyedMessage] - [kafka.message Message] - [org.I0Itec.zkclient ZkClient] - [kafka.utils ZkUtils] [java.util List]) - (:use [clj-kafka.core :only (as-properties with-resource)] - [clojure.data.json :only (read-str)]) - (:require [zookeeper :as zk])) + (:use [clj-kafka.core :only (as-properties)])) (defn producer "Creates a Producer. m is the configuration @@ -26,14 +21,3 @@ (defn send-messages [^Producer producer ^List messages] (.send producer messages)) - -(defn brokers - "Get brokers from zookeeper" - [m] - (with-resource [z (zk/connect (get m "zookeeper.connect"))] - zk/close - (doall (map (comp #(read-str % :key-fn keyword) - #(String. ^bytes %) - :data - #(zk/data z (str "/brokers/ids/" %))) - (zk/children z "/brokers/ids"))))) diff --git a/src/clj_kafka/zk.clj b/src/clj_kafka/zk.clj new file mode 100644 index 0000000..a89cdbf --- /dev/null +++ b/src/clj_kafka/zk.clj @@ -0,0 +1,15 @@ +(ns clj-kafka.zk + (:use [clojure.data.json :only (read-str)] + [clj-kafka.core :only (with-resource)]) + (:require [zookeeper :as zk])) + +(defn brokers + "Get brokers from zookeeper" + [m] + (with-resource [z (zk/connect (get m "zookeeper.connect"))] + zk/close + (doall (map (comp #(read-str % :key-fn keyword) + #(String. ^bytes %) + :data + #(zk/data z (str "/brokers/ids/" %))) + (zk/children z "/brokers/ids"))))) diff --git a/test/clj_kafka/test/producer.clj b/test/clj_kafka/test/producer.clj index 8278281..3304735 100644 --- a/test/clj_kafka/test/producer.clj +++ b/test/clj_kafka/test/producer.clj @@ -1,16 +1,8 @@ (ns clj-kafka.test.producer (:use [expectations] - [clj-kafka.core] - [clj-kafka.producer] - [clj-kafka.test.utils :only (with-test-broker)]) - (:import [kafka.message Message] - [kafka.producer KeyedMessage])) + [clj-kafka.producer]) + (:import [kafka.producer KeyedMessage])) (expect KeyedMessage (message "topic" "value")) -(given (with-test-broker {:zookeeper-port 2182 - :kafka-port 9999 - :topic "test"} - (brokers {"zookeeper.connect" "127.0.0.1:2182"})) - (expect count 1 - first {:host "localhost", :jmx_port -1, :port 9999, :version 1})) + diff --git a/test/clj_kafka/test/zk.clj b/test/clj_kafka/test/zk.clj new file mode 100644 index 0000000..1a06f7b --- /dev/null +++ b/test/clj_kafka/test/zk.clj @@ -0,0 +1,11 @@ +(ns clj-kafka.test.zk + (:use expectations + clj-kafka.zk + [clj-kafka.test.utils :only (with-test-broker)])) + +(given (with-test-broker {:zookeeper-port 2182 + :kafka-port 9999 + :topic "test"} + (brokers {"zookeeper.connect" "127.0.0.1:2182"})) + (expect count 1 + first {:host "localhost", :jmx_port -1, :port 9999, :version 1}))