add message piping back; to-clojure conversion to KafkaMessage added by default

This commit is contained in:
Paul Ingles 2013-06-07 22:03:11 +01:00
parent 57a9f17dfa
commit 84c813ae48
2 changed files with 15 additions and 6 deletions

View file

@ -28,9 +28,18 @@
[^ConsumerConnector consumer]
(.shutdown consumer))
(defn messages
"Creates a sequence of messages from the given topics."
[^ConsumerConnector consumer topic]
(let [[topic streams] (first (.createMessageStreams consumer {topic (Integer/valueOf 1)}))]
(iterator-seq (.iterator ^KafkaStream (first streams)))))
(defn- topic-map
[topics]
(apply hash-map (interleave topics
(repeat (Integer/valueOf 1)))))
(defn messages
"Creates a sequence of KafkaMessage messages from the given topics. Consumes
messages from a single stream."
[^ConsumerConnector consumer & topics]
(let [[queue-seq queue-put] (pipe)]
(doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))]
(future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))]
(queue-put (to-clojure msg)))))
queue-seq))

View file

@ -24,7 +24,7 @@
zk/shutdown
(send-message p "test" "Hello, world")
(let [msgs (zk/messages c "test")
msg (to-clojure (first msgs))]
msg (first msgs)]
(let [{:keys [topic offset partition key value]} msg]
(is (= "test" topic))
(is (= 0 offset))