diff --git a/project.clj b/project.clj index e1d3594..70d51d7 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject clj-kafka/clj-kafka "0.2.1-0.8" +(defproject clj-kafka/clj-kafka "0.2.2-0.8" :min-lein-version "2.0.0" :dependencies [[org.clojure/clojure "1.5.1"] [zookeeper-clj "0.9.3"] diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index 8ce9820..af7cdde 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -37,8 +37,9 @@ (defn messages "Creates a sequence of KafkaMessage messages from the given topic. Consumes messages from a single stream." - [^ConsumerConnector consumer topic] + [^ConsumerConnector consumer topic & {:keys [threads] + :or {threads 1}}] (let [[_topic [stream & _]] - (first (.createMessageStreams consumer {topic (int 1)}))] + (first (.createMessageStreams consumer {topic (int threads)}))] (map to-clojure (lazy-iterate (.iterator ^KafkaStream stream)))))