Do not use iterator-seq with KafkaStream
This commit is contained in:
parent
9c19d5df43
commit
4ff8334299
2 changed files with 9 additions and 3 deletions
|
@ -28,6 +28,12 @@
|
|||
[^ConsumerConnector consumer]
|
||||
(.shutdown consumer))
|
||||
|
||||
(defn- lazy-iterate
|
||||
[it]
|
||||
(lazy-seq
|
||||
(when (.hasNext it)
|
||||
(cons (.next it) (lazy-iterate it)))))
|
||||
|
||||
(defn messages
|
||||
"Creates a sequence of KafkaMessage messages from the given topic. Consumes
|
||||
messages from a single stream."
|
||||
|
@ -35,4 +41,4 @@
|
|||
(let [[_topic [stream & _]]
|
||||
(first (.createMessageStreams consumer {topic (int 1)}))]
|
||||
(map to-clojure
|
||||
(iterator-seq (.iterator ^KafkaStream stream)))))
|
||||
(lazy-iterate (.iterator ^KafkaStream stream)))))
|
||||
|
|
|
@ -34,9 +34,9 @@
|
|||
zk/shutdown
|
||||
(let [p (producer producer-config)]
|
||||
(send-messages p messages)
|
||||
(zk/messages c "test")))))
|
||||
(first (zk/messages c "test"))))))
|
||||
|
||||
(given (first (send-and-receive [(test-message)]))
|
||||
(given (send-and-receive [(test-message)])
|
||||
(expect :topic "test"
|
||||
:offset 0
|
||||
:partition 0
|
||||
|
|
Loading…
Reference in a new issue