Merge pull request #25 from prepor/lazy-iterate

Do not use iterator-seq with KafkaStream
This commit is contained in:
Christian Blunden 2014-03-19 14:27:47 +00:00
commit f0c025d358
2 changed files with 9 additions and 3 deletions

View file

@ -28,6 +28,12 @@
[^ConsumerConnector consumer]
(.shutdown consumer))
(defn- lazy-iterate
[it]
(lazy-seq
(when (.hasNext it)
(cons (.next it) (lazy-iterate it)))))
(defn messages
"Creates a sequence of KafkaMessage messages from the given topic. Consumes
messages from a single stream."
@ -35,4 +41,4 @@
(let [[_topic [stream & _]]
(first (.createMessageStreams consumer {topic (int 1)}))]
(map to-clojure
(iterator-seq (.iterator ^KafkaStream stream)))))
(lazy-iterate (.iterator ^KafkaStream stream)))))

View file

@ -34,9 +34,9 @@
zk/shutdown
(let [p (producer producer-config)]
(send-messages p messages)
(zk/messages c "test")))))
(first (zk/messages c "test"))))))
(given (first (send-and-receive [(test-message)]))
(given (send-and-receive [(test-message)])
(expect :topic "test"
:offset 0
:partition 0