diff --git a/README.md b/README.md index acf6797..fc9336a 100644 --- a/README.md +++ b/README.md @@ -50,13 +50,7 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con (with-resource [c (consumer config)] shutdown - (take 2 (messages c ["test"]))) -``` - -It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence: - -```clojure -(take 5 (messages c ["test1" "test2"])) + (take 2 (messages c "test"))) ``` ## License diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index b72bbfa..944c546 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -28,24 +28,11 @@ [^ConsumerConnector consumer] (.shutdown consumer)) -(defn- topic-map - [topics] - (apply hash-map (interleave topics - (repeat (Integer/valueOf 1))))) - (defn messages - "Creates a sequence of KafkaMessage messages from the given topics. Consumes - messages from a single stream. topics is a collection of topics to consume - from. - Optional: queue-capacity. Can be used to limit number of messages held in - queue before they've been dequeued in the returned sequence. Defaults to - Integer/MAX_VALUE but can be changed if your messages are particularly large - and consumption is slow." - [^ConsumerConnector consumer topics & {:keys [queue-capacity] - :or {queue-capacity (Integer/MAX_VALUE)}}] - (let [[queue-seq queue-put] (pipe queue-capacity)] - (doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))] - (future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))] - (queue-put (to-clojure msg))))) - queue-seq)) - + "Creates a sequence of KafkaMessage messages from the given topic. Consumes + messages from a single stream." + [^ConsumerConnector consumer topic] + (let [[_topic [stream & _]] + (first (.createMessageStreams consumer {topic (int 1)}))] + (map to-clojure + (iterator-seq (.iterator ^KafkaStream stream))))) diff --git a/test/clj_kafka/test/consumer/zk.clj b/test/clj_kafka/test/consumer/zk.clj index f4d327a..05b03be 100644 --- a/test/clj_kafka/test/consumer/zk.clj +++ b/test/clj_kafka/test/consumer/zk.clj @@ -23,6 +23,10 @@ (fn [m] (String. (k m) "UTF-8"))) +(defn test-message + [] + (message "test" (.getBytes "Hello, world"))) + (defn send-and-receive [messages] (with-test-broker test-broker-config @@ -30,10 +34,9 @@ zk/shutdown (let [p (producer producer-config)] (send-messages p messages) - (doall (take (count messages) - (zk/messages c ["test"]))))))) + (zk/messages c "test"))))) -(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))])) +(given (first (send-and-receive [(test-message)])) (expect :topic "test" :offset 0 :partition 0