diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index 22bc64e..8ce9820 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -28,6 +28,12 @@ [^ConsumerConnector consumer] (.shutdown consumer)) +(defn- lazy-iterate + [it] + (lazy-seq + (when (.hasNext it) + (cons (.next it) (lazy-iterate it))))) + (defn messages "Creates a sequence of KafkaMessage messages from the given topic. Consumes messages from a single stream." @@ -35,4 +41,4 @@ (let [[_topic [stream & _]] (first (.createMessageStreams consumer {topic (int 1)}))] (map to-clojure - (iterator-seq (.iterator ^KafkaStream stream))))) + (lazy-iterate (.iterator ^KafkaStream stream))))) diff --git a/test/clj_kafka/test/consumer/zk.clj b/test/clj_kafka/test/consumer/zk.clj index 05b03be..175c11f 100644 --- a/test/clj_kafka/test/consumer/zk.clj +++ b/test/clj_kafka/test/consumer/zk.clj @@ -34,9 +34,9 @@ zk/shutdown (let [p (producer producer-config)] (send-messages p messages) - (zk/messages c "test"))))) + (first (zk/messages c "test")))))) -(given (first (send-and-receive [(test-message)])) +(given (send-and-receive [(test-message)]) (expect :topic "test" :offset 0 :partition 0