From 4ff8334299f9b71788b0174d57dec5392b07c70a Mon Sep 17 00:00:00 2001 From: prepor Date: Wed, 19 Mar 2014 20:45:49 +0900 Subject: [PATCH] Do not use iterator-seq with KafkaStream --- src/clj_kafka/consumer/zk.clj | 8 +++++++- test/clj_kafka/test/consumer/zk.clj | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index 22bc64e..8ce9820 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -28,6 +28,12 @@ [^ConsumerConnector consumer] (.shutdown consumer)) +(defn- lazy-iterate + [it] + (lazy-seq + (when (.hasNext it) + (cons (.next it) (lazy-iterate it))))) + (defn messages "Creates a sequence of KafkaMessage messages from the given topic. Consumes messages from a single stream." @@ -35,4 +41,4 @@ (let [[_topic [stream & _]] (first (.createMessageStreams consumer {topic (int 1)}))] (map to-clojure - (iterator-seq (.iterator ^KafkaStream stream))))) + (lazy-iterate (.iterator ^KafkaStream stream))))) diff --git a/test/clj_kafka/test/consumer/zk.clj b/test/clj_kafka/test/consumer/zk.clj index 05b03be..175c11f 100644 --- a/test/clj_kafka/test/consumer/zk.clj +++ b/test/clj_kafka/test/consumer/zk.clj @@ -34,9 +34,9 @@ zk/shutdown (let [p (producer producer-config)] (send-messages p messages) - (zk/messages c "test"))))) + (first (zk/messages c "test")))))) -(given (first (send-and-receive [(test-message)])) +(given (send-and-receive [(test-message)]) (expect :topic "test" :offset 0 :partition 0