diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index e7168ab..11fcba5 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -28,9 +28,18 @@ [^ConsumerConnector consumer] (.shutdown consumer)) -(defn messages - "Creates a sequence of messages from the given topics." - [^ConsumerConnector consumer topic] - (let [[topic streams] (first (.createMessageStreams consumer {topic (Integer/valueOf 1)}))] - (iterator-seq (.iterator ^KafkaStream (first streams))))) +(defn- topic-map + [topics] + (apply hash-map (interleave topics + (repeat (Integer/valueOf 1))))) + +(defn messages + "Creates a sequence of KafkaMessage messages from the given topics. Consumes + messages from a single stream." + [^ConsumerConnector consumer & topics] + (let [[queue-seq queue-put] (pipe)] + (doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))] + (future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))] + (queue-put (to-clojure msg))))) + queue-seq)) diff --git a/test/clj_kafka/test/consumer.clj b/test/clj_kafka/test/consumer.clj index 063f9a5..7be3860 100644 --- a/test/clj_kafka/test/consumer.clj +++ b/test/clj_kafka/test/consumer.clj @@ -24,7 +24,7 @@ zk/shutdown (send-message p "test" "Hello, world") (let [msgs (zk/messages c "test") - msg (to-clojure (first msgs))] + msg (first msgs)] (let [{:keys [topic offset partition key value]} msg] (is (= "test" topic)) (is (= 0 offset))