Merge branch '0.8'

Conflicts:
	README.md
	src/clj_kafka/example/zk.clj
This commit is contained in:
Paul Ingles 2013-09-19 11:20:30 +01:00
commit 62a3ed8ed9
17 changed files with 369 additions and 182 deletions

3
.gitignore vendored
View file

@ -8,3 +8,6 @@
/.lein-deps-sum
autodoc/**
/target
.lein-repl-history
/tmp
/logs

View file

@ -1,2 +1,3 @@
language: clojure
lein: lein2
script: lein2 expectations

View file

@ -1,58 +1,40 @@
# clj-kafka
Simple Clojure interface to [Kafka](http://incubator.apache.org/kafka/).
Clojure library for [Kafka](http://incubator.apache.org/kafka/).
It's currently a snapshot only until things flesh out a little more. [API Documentation is also available](http://pingles.github.com/clj-kafka/).
Current build status: [![Build Status](https://travis-ci.org/pingles/clj-kafka.png?branch=0.8)](https://travis-ci.org/pingles/clj-kafka)
Note: Kafka binaries are not currently published to any public repositories. Additionally, the 0.7 release was [published as source](http://incubator.apache.org/kafka/downloads.html). This library uses [a build of the 0.7 incubator release published on Clojars](http://clojars.org/org.clojars.paul/core-kafka_2.8.0).
Regarding Kafka 0.8, a working version of this library for Kafka 0.8 that is already used in production is in the [0.8 branch](https://github.com/pingles/clj-kafka/tree/0.8).
Current build status: ![Build status](https://secure.travis-ci.org/pingles/clj-kafka.png)
Development is against the upcoming 0.8 release of Kafka. The protocols for 0.7 and 0.8 are incompatible so this will only work when connecting to a 0.8 cluster.
## Installing
Given 0.8 is still unreleased we're pushing SNAPSHOT releases.
Add the following to your [Leiningen](http://github.com/technomancy/leiningen) `project.clj`:
```clj
[clj-kafka "0.0.5-0.7-SNAPSHOT"]
[clj-kafka "0.1.0-0.8-SNAPSHOT"]
```
## Usage
clj-kafka currently only supports Kafka 0.7.
### Producer
Allows batching of messages:
Discovery of Kafka brokers from Zookeeper:
```clj
(brokers {"zookeeper.connect" "127.0.0.1:2181"})
;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
```
```clj
(use 'clj-kafka.producer)
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
(map #(.getBytes %))
(map message)))
```
(def p (producer {"metadata.broker.list" "localhost:9999"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))
Or sending a single message:
```clj
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))
```
### SimpleConsumer
```clj
(use 'clj-kafka.consumer.simple)
(def c (consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))
(messages c f)
({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})
(send-message p (keyed-message "test" (.getBytes "this is my message")))
```
### Zookeeper Consumer
@ -63,24 +45,24 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con
(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)
(def config {"zk.connect" "localhost:2181"
"groupid" "my-task-group"})
(def config {"zookeeper.connect" "localhost:2182"
"group.id" "clj-kafka.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(with-resource [c (consumer config)]
shutdown
(take 5 (messages c "test")))
({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})
(take 2 (messages c ["test"])))
```
It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:
```clojure
(take 5 (messages c "test1" "test2"))
(take 5 (messages c ["test1" "test2"]))
```
## License
Copyright &copy; 2012 Paul Ingles
Copyright &copy; 2013 Paul Ingles
Distributed under the Eclipse Public License, the same as Clojure.

View file

@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, file
# Direct log messages to stdout
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=./logs/kafka.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

View file

@ -1,15 +1,23 @@
(defproject clj-kafka/clj-kafka "0.0.7-0.7"
(defproject clj-kafka/clj-kafka "0.1.0-0.8-beta1"
:min-lein-version "2.0.0"
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojars.paul/core-kafka_2.8.0 "0.7.0-1"
:exclusions
[javax.mail/mail
javax.jms/jms
com.sun.jdmk/jmxtools
com.sun.jmx/jmxri
jline/jline
net.sf.jopt-simple/jopt-simple
junit/junit]]
[org.scala-lang/scala-library "2.8.0"]
[zookeeper-clj "0.9.2"]]
:dependencies [[org.clojure/clojure "1.5.1"]
[zookeeper-clj "0.9.3"]
[org.clojure/data.json "0.2.2"]
;; kafka and its related deps
[org.apache.kafka/kafka_2.9.2 "0.8.0-beta1"]
[org.scala-lang/scala-library "2.9.2"]
[org.apache.zookeeper/zookeeper "3.3.4"]
[net.sf.jopt-simple/jopt-simple "3.2"]]
:exclusions [javax.mail/mail
javax.jms/jms
com.sun.jdmk/jmxtools
com.sun.jmx/jmxri
jline/jline]
:plugins [[lein-expectations "0.0.8"]]
:profiles {:dev {:dependencies [[commons-io/commons-io "2.4"]
[expectations "1.4.45"]
[com.101tec/zkclient "0.3"]
[com.yammer.metrics/metrics-core "2.2.0"]
[org.slf4j/slf4j-simple "1.6.4"]]}}
:description "Clojure wrapper for Kafka's Java API")

View file

@ -1,40 +1,39 @@
(ns clj-kafka.consumer.simple
(:use [clj-kafka.core :only (to-clojure)])
(:import [kafka.javaapi.consumer SimpleConsumer]
[kafka.api FetchRequest OffsetRequest]))
[kafka.api FetchRequest FetchRequestBuilder PartitionOffsetRequestInfo]
[kafka.javaapi OffsetRequest TopicMetadataRequest FetchResponse]
[kafka.common TopicAndPartition]))
(defn consumer
"Create a consumer to connect to host and port. Port will
normally be 9092."
[host port & {:keys [timeout buffer-size] :or {timeout 100000 buffer-size 10000}}]
(SimpleConsumer. host port timeout buffer-size))
[host ^Long port client-id & {:keys [^Long timeout ^Long buffer-size] :or {timeout 100000 buffer-size 10000}}]
(SimpleConsumer. host
(Integer/valueOf port)
(Integer/valueOf timeout)
(Integer/valueOf buffer-size)
client-id))
(defn earliest-offset
"Retrieves the earliest offset available for topic and partition."
[consumer topic partition]
(long (first (.getOffsetsBefore consumer topic partition (OffsetRequest/EarliestTime) 1))))
(defn latest-offsets
"Retrieves n most recent offsets for topic and partition."
[consumer topic partition n]
(map long (.getOffsetsBefore consumer topic partition (OffsetRequest/LatestTime) n)))
(defn max-offset
[consumer topic partition]
(first (latest-offsets consumer topic partition 1)))
(defn fetch
"Creates a request to retrieve a set of messages from the
specified topic.
Arguments:
partition: as specified when producing messages
offset: offset to start retrieval
max-size: number of bytes to retrieve"
[^String topic ^Integer partition ^Long offset ^Integer max-size]
(FetchRequest. topic partition offset max-size))
(defn fetch-request
[client-id topic ^Long partition offset fetch-size]
(.build (doto (FetchRequestBuilder. )
(.clientId client-id)
(.addFetch topic (Integer/valueOf partition) offset fetch-size))))
(defn messages
"Creates a sequence of messages from the given request."
[consumer request]
(map to-clojure (iterator-seq (.iterator (.fetch consumer request)))))
[^SimpleConsumer consumer client-id topic partition offset fetch-size]
(let [fetch (fetch-request client-id topic partition offset fetch-size)]
(map to-clojure (iterator-seq (.iterator (.messageSet ^FetchResponse (.fetch consumer ^FetchRequest fetch)
topic
partition))))))
(defn topic-meta-data [consumer topics]
(to-clojure (.send consumer (TopicMetadataRequest. topics))))
(defn latest-topic-offset [consumer topic partition]
(let [tp (TopicAndPartition. topic partition)
pori (PartitionOffsetRequestInfo. -1 1)
hm (java.util.HashMap. {tp pori})]
(let [response (.getOffsetsBefore consumer (OffsetRequest. hm (kafka.api.OffsetRequest/CurrentVersion) "clj-kafka-id"))]
(first (.offsets response topic partition)))))

View file

@ -1,5 +1,6 @@
(ns clj-kafka.consumer.zk
(:import [kafka.consumer ConsumerConfig Consumer])
(:import [kafka.consumer ConsumerConfig Consumer KafkaStream]
[kafka.javaapi.consumer ConsumerConnector])
(:use [clj-kafka.core :only (as-properties to-clojure with-resource pipe)])
(:require [zookeeper :as zk]))
@ -13,19 +14,18 @@
(take 5 (messages c \"test\")))
Keys:
zk.connect : host:port for Zookeeper. e.g: 127.0.0.1:2181
groupid : consumer group. e.g. group1
zk.sessiontimeout.ms : session timeout. e.g. 400
zk.synctime.ms : Max time for how far a ZK follower can be behind a ZK leader. 200 ms
autocommit.interval.ms : the frequency that the consumed offsets are committed to zookeeper.
autocommit.enable : if set to true, the consumer periodically commits to zookeeper the latest consumed offset of each partition"
zookeeper.connect : host:port for Zookeeper. e.g: 127.0.0.1:2181
group.id : consumer group. e.g. group1
auto.offset.reset : what to do if an offset is out of range, e.g. smallest, largest
auto.commit.interval.ms : the frequency that the consumed offsets are committed to zookeeper.
auto.commit.enable : if set to true, the consumer periodically commits to zookeeper the latest consumed offset of each partition"
[m]
(let [config (ConsumerConfig. (as-properties m))]
(Consumer/createJavaConsumerConnector config)))
(defn shutdown
"Closes the connection to Zookeeper and stops consuming messages."
[consumer]
[^ConsumerConnector consumer]
(.shutdown consumer))
(defn- topic-map
@ -34,18 +34,18 @@
(repeat (Integer/valueOf 1)))))
(defn messages
"Creates a sequence of messages from the given topics."
[consumer & topics]
(let [[queue-seq queue-put] (pipe)]
"Creates a sequence of KafkaMessage messages from the given topics. Consumes
messages from a single stream. topics is a collection of topics to consume
from.
Optional: queue-capacity. Can be used to limit number of messages held in
queue before they've been dequeued in the returned sequence. Defaults to
Integer/MAX_VALUE but can be changed if your messages are particularly large
and consumption is slow."
[^ConsumerConnector consumer topics & {:keys [queue-capacity]
:or {queue-capacity (Integer/MAX_VALUE)}}]
(let [[queue-seq queue-put] (pipe queue-capacity)]
(doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))]
(future (doseq [msg (iterator-seq (.iterator (first streams)))]
(queue-put (-> msg to-clojure (assoc :topic topic))))))
(future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))]
(queue-put (to-clojure msg)))))
queue-seq))
(defn topics
"Connects to Zookeeper to read the list of topics. Use the same config
as with consumer, uses the zk.connect value to connect to the client"
[config]
(with-resource [z (zk/connect (get config "zk.connect"))]
zk/close
(sort (zk/children z "/brokers/topics"))))

View file

@ -1,8 +1,12 @@
(ns clj-kafka.core
(:import [java.nio ByteBuffer]
[java.util Properties]
[kafka.message MessageAndOffset Message]
[java.util.concurrent LinkedBlockingQueue]))
[kafka.message MessageAndMetadata MessageAndOffset]
[java.util.concurrent LinkedBlockingQueue]
[kafka.javaapi OffsetResponse PartitionMetadata TopicMetadata TopicMetadataResponse]
[kafka.cluster Broker]))
(defrecord KafkaMessage [topic offset partition key value])
(defn as-properties
[m]
@ -22,17 +26,41 @@
(to-clojure [x] "Converts type to Clojure structure"))
(extend-protocol ToClojure
MessageAndMetadata
(to-clojure [x] (KafkaMessage. (.topic x) (.offset x) (.partition x) (.key x) (.message x)))
MessageAndOffset
(to-clojure [x] {:message (to-clojure (.message x))
:offset (.offset x)})
ByteBuffer
(to-clojure [x] (let [b (byte-array (.remaining x))]
(.get x b)
b))
Message
(to-clojure [x] {:crc (.checksum x)
:payload (to-clojure (.payload x))
:size (.size x)}))
(to-clojure [x]
(letfn [(byte-buffer-bytes [^ByteBuffer bb] (let [b (byte-array (.remaining bb))]
(.get bb b)
b))]
(let [offset (.offset x)
msg (.message x)]
(KafkaMessage. nil offset nil (.key msg) (byte-buffer-bytes (.payload msg))))))
Broker
(to-clojure [x]
{:connect (.getConnectionString x)
:host (.host x)
:port (.port x)
:broker-id (.id x)})
PartitionMetadata
(to-clojure [x]
{:partition-id (.partitionId x)
:leader (to-clojure (.leader x))
:replicas (map to-clojure (.replicas x))
:in-sync-replicas (map to-clojure (.isr x))
:error-code (.errorCode x)})
TopicMetadata
(to-clojure [x]
{:topic (.topic x)
:partition-metadata (map to-clojure (.partitionsMetadata x))})
TopicMetadataResponse
(to-clojure [x]
(map to-clojure (.topicsMetadata x))))
(defn pipe
"Returns a vector containing a sequence that will read from the
@ -41,7 +69,7 @@
Source: http://clj-me.cgrand.net/2010/04/02/pipe-dreams-are-not-necessarily-made-of-promises/"
([] (pipe 100))
([size]
(let [q (java.util.concurrent.LinkedBlockingQueue. size)
(let [q (java.util.concurrent.LinkedBlockingQueue. ^int size)
EOQ (Object.)
NIL (Object.)
s (fn queue-seq [] (lazy-seq

View file

@ -1,13 +0,0 @@
(ns clj-kafka.example.zk
(:use [clj-kafka.core :only (with-resource)]
[clj-kafka.consumer.zk :only (consumer shutdown messages)]))
(def config {"zk.connect" "localhost:2181"
"auto.commit.enable" "false"
"group.id" "group1"})
(defn -main []
(with-resource [c (consumer config)]
shutdown
(doseq [m (messages c "testing" "testing2")]
(println (:topic m) ": " (String. (:payload m))))))

View file

@ -1,34 +1,23 @@
(ns clj-kafka.producer
(:import [kafka.javaapi.producer Producer ProducerData]
[kafka.producer ProducerConfig]
[kafka.message Message])
(:import [kafka.javaapi.producer Producer]
[kafka.producer ProducerConfig KeyedMessage]
[java.util List])
(:use [clj-kafka.core :only (as-properties)]))
(defn producer
"Creates a Producer. m is the configuration
serializer.class : default is kafka.serializer.DefaultEncoder
zk.connect : Zookeeper connection. e.g. localhost:2181 "
metadata.broker.list : \"server:port,server:port\""
[m]
^Producer (Producer. (ProducerConfig. (as-properties m))))
(Producer. (ProducerConfig. (as-properties m))))
(defn message
"Creates a message with the specified payload.
payload : bytes for the message payload. e.g. (.getBytes \"hello, world\")"
[#^bytes payload]
(Message. payload))
([topic value] (message topic nil value))
([topic key value] (KeyedMessage. topic key value)))
(defn send-message
[^Producer producer ^KeyedMessage message]
(.send producer message))
(defn send-messages
"Sends a message.
topic : a string
msgs : a single message, or sequence of messages to send"
[^Producer producer ^String topic msgs]
(.send producer (ProducerData. topic msgs)))
(defprotocol ToMessage
"Protocol to be extended to convert types to encoded Message objects"
(to-message [x] "Creates a Message instance"))
(extend-protocol ToMessage
String
(to-message [x] (message (.getBytes x))))
[^Producer producer ^List messages]
(.send producer messages))

32
src/clj_kafka/zk.clj Normal file
View file

@ -0,0 +1,32 @@
(ns clj-kafka.zk
(:use [clojure.data.json :only (read-str)]
[clj-kafka.core :only (with-resource)])
(:require [zookeeper :as zk]))
(defn brokers
"Get brokers from zookeeper"
[m]
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
zk/close
(doall (map (comp #(read-str % :key-fn keyword)
#(String. ^bytes %)
:data
#(zk/data z (str "/brokers/ids/" %)))
(zk/children z "/brokers/ids")))))
(defn controller
"Get leader node"
[m]
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
zk/close
(-> (zk/data z "/controller")
:data
String.
Integer/valueOf)))
(defn topics
"Get topics"
[m]
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
zk/close
(zk/children z "/brokers/topics")))

View file

@ -0,0 +1,33 @@
(ns clj-kafka.test.consumer.simple
(:use clj-kafka.consumer.simple
expectations
[clj-kafka.test.utils :only (with-test-broker)]))
(def test-broker-config {:zookeeper-port 2182
:kafka-port 9999
:topic "test"})
(given (with-test-broker test-broker-config
(let [c (consumer "127.0.0.1" 9999 "simple-consumer")]
(topic-meta-data c ["test"])))
(expect count 1
first {:topic "test",
:partition-metadata [{:partition-id 0,
:leader {:connect "localhost:9999",
:host "localhost",
:port 9999
:broker-id 0},
:replicas [{:connect "localhost:9999",
:host "localhost",
:port 9999
:broker-id 0}],
:in-sync-replicas [{:connect "localhost:9999",
:host "localhost",
:port 9999
:broker-id 0}],
:error-code 0}]}))
(given (with-test-broker test-broker-config
(let [c (consumer "127.0.0.1" 9999 "simple-consumer")]
(latest-topic-offset c "test" 0)))
(expect identity 0))

View file

@ -0,0 +1,40 @@
(ns clj-kafka.test.consumer.zk
(:use [expectations]
[clj-kafka.core :only (with-resource to-clojure)]
[clj-kafka.producer :only (producer send-messages message)]
[clj-kafka.test.utils :only (with-test-broker)])
(:require [clj-kafka.consumer.zk :as zk]))
(def producer-config {"metadata.broker.list" "localhost:9999"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"})
(def test-broker-config {:zookeeper-port 2182
:kafka-port 9999
:topic "test"})
(def consumer-config {"zookeeper.connect" "localhost:2182"
"group.id" "clj-kafka.test.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
(defn string-value
[k]
(fn [m]
(String. (k m) "UTF-8")))
(defn send-and-receive
[messages]
(with-test-broker test-broker-config
(with-resource [c (zk/consumer consumer-config)]
zk/shutdown
(let [p (producer producer-config)]
(send-messages p messages)
(doall (take (count messages)
(zk/messages c ["test"])))))))
(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))]))
(expect :topic "test"
:offset 0
:partition 0
(string-value :value) "Hello, world"))

View file

@ -1,18 +0,0 @@
(ns clj-kafka.test.core
(:import [java.nio ByteBuffer]
[kafka.message Message])
(:use [clojure.test])
(:use [clj-kafka.core] :reload))
(deftest convert-message-payload-to-bytes
(let [bytes (byte-array (repeat 989 (byte 10)))
msg (Message. 100 bytes)]
;; 1 magic byte
;; 1 byte attribute identifier for annotations on message
;; 4 bytes w/ CRC
;; n bytes payload
;; total = n + 4 + 1 + 1 bytes
(is (= 995 (:size (to-clojure msg))))
(is (= 989 (count (:payload (to-clojure msg)))))
(is (= (seq bytes) (seq (:payload (to-clojure msg)))))))

View file

@ -1,11 +1,8 @@
(ns clj-kafka.test.producer
(:use [clojure.test]
[clj-kafka.core]
[clj-kafka.producer] :reload)
(:import [kafka.message Message]))
(:use [expectations]
[clj-kafka.producer])
(:import [kafka.producer KeyedMessage]))
(expect KeyedMessage (message "topic" "value"))
(deftest creates-message-with-string-bytes
(is (instance? Message
(to-message "Hello, world")))
(is (= "Hello, world"
(String. (:payload (to-clojure (to-message "Hello, world")))))))

View file

@ -0,0 +1,75 @@
(ns clj-kafka.test.utils
(:import [kafka.server KafkaConfig KafkaServer]
[kafka.admin CreateTopicCommand]
[kafka.common TopicAndPartition]
[java.net InetSocketAddress]
[org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory]
[org.apache.commons.io FileUtils]
[org.I0Itec.zkclient ZkClient]
[org.I0Itec.zkclient.serialize ZkSerializer])
(:use [clojure.java.io :only (file)]
[clj-kafka.core :only (as-properties)]))
(defn tmp-dir
[& parts]
(.getPath (apply file (System/getProperty "java.io.tmpdir") "clj-kafka" parts)))
(def system-time (proxy [kafka.utils.Time] []
(milliseconds [] (System/currentTimeMillis))
(nanoseconds [] (System/nanoTime))
(sleep [ms] (Thread/sleep ms))))
;; enable.zookeeper doesn't seem to be used- check it actually has an effect
(defn create-broker
[{:keys [kafka-port zookeeper-port]}]
(let [base-config {"broker.id" "0"
"port" "9999"
"host.name" "localhost"
"zookeeper.connect" (str "127.0.0.1:" zookeeper-port)
"enable.zookeeper" "true"
"log.flush.interval.messages" "1"
"auto.create.topics.enable" "true"
"log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}]
(KafkaServer. (KafkaConfig. (as-properties (assoc base-config "port" (str kafka-port))))
system-time)))
(defn create-zookeeper
[{:keys [zookeeper-port]}]
(let [tick-time 500
zk (ZooKeeperServer. (file (tmp-dir "zookeeper-snapshot")) (file (tmp-dir "zookeeper-log")) tick-time)]
(doto (NIOServerCnxn$Factory. (InetSocketAddress. "127.0.0.1" zookeeper-port))
(.startup zk))))
(defn wait-until-initialised
[kafka-server topic]
(let [topic-and-partition (TopicAndPartition. topic 0)]
(while (not (.. kafka-server apis leaderCache keySet (contains topic-and-partition)))
(Thread/sleep 500))))
(defn create-topic
[zk-client topic & {:keys [partitions replicas]
:or {partitions 1 replicas 1}}]
(CreateTopicCommand/createTopic zk-client topic partitions replicas ""))
(def string-serializer (proxy [ZkSerializer] []
(serialize [data] (.getBytes data "UTF-8"))
(deserialize [bytes] (when bytes
(String. bytes "UTF-8")))))
(defmacro with-test-broker
"Creates an in-process broker that can be used to test against"
[config & body]
`(do (FileUtils/deleteDirectory (file (tmp-dir)))
(let [zk# (create-zookeeper ~config)
kafka# (create-broker ~config)
topic# (:topic ~config)]
(try
(.startup kafka#)
(let [zk-client# (ZkClient. (str "127.0.0.1:" (:zookeeper-port ~config)) 500 500 string-serializer)]
(create-topic zk-client# topic#)
(wait-until-initialised kafka# topic#)
~@body)
(finally (do (.shutdown kafka#)
(.awaitShutdown kafka#)
(.shutdown zk#)
(FileUtils/deleteDirectory (file (tmp-dir)))))))))

View file

@ -0,0 +1,23 @@
(ns clj-kafka.test.zk
(:use expectations
clj-kafka.zk
[clj-kafka.test.utils :only (with-test-broker)]))
(def config {:zookeeper-port 2182
:kafka-port 9999
:topic "test"})
(def zk-connect {"zookeeper.connect" "127.0.0.1:2182"})
(given (with-test-broker config
(brokers zk-connect))
(expect count 1
first {:host "localhost", :jmx_port -1, :port 9999, :version 1}))
(given (with-test-broker config
(controller zk-connect))
(expect identity 0))
(given (with-test-broker config
(topics zk-connect))
(expect count 1
first (:topic config)))