From c606237b1cbcdb9fc4dc3819f41fc281ec2fe354 Mon Sep 17 00:00:00 2001 From: Russell Date: Mon, 24 Feb 2014 09:29:22 +0000 Subject: [PATCH] Remove queue from zk consumer To ensure client code has explicit control over the reading of messages This change removes the ability to listen to multiple topics at once, as it is not feasible to build this functionality in at the library level while still giving client code total control and knowledge over which messages have been read / what the current offset is. --- README.md | 8 +------- src/clj_kafka/consumer/zk.clj | 27 +++++++-------------------- test/clj_kafka/test/consumer/zk.clj | 9 ++++++--- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index acf6797..fc9336a 100644 --- a/README.md +++ b/README.md @@ -50,13 +50,7 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con (with-resource [c (consumer config)] shutdown - (take 2 (messages c ["test"]))) -``` - -It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence: - -```clojure -(take 5 (messages c ["test1" "test2"])) + (take 2 (messages c "test"))) ``` ## License diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index b72bbfa..944c546 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -28,24 +28,11 @@ [^ConsumerConnector consumer] (.shutdown consumer)) -(defn- topic-map - [topics] - (apply hash-map (interleave topics - (repeat (Integer/valueOf 1))))) - (defn messages - "Creates a sequence of KafkaMessage messages from the given topics. Consumes - messages from a single stream. topics is a collection of topics to consume - from. - Optional: queue-capacity. Can be used to limit number of messages held in - queue before they've been dequeued in the returned sequence. Defaults to - Integer/MAX_VALUE but can be changed if your messages are particularly large - and consumption is slow." - [^ConsumerConnector consumer topics & {:keys [queue-capacity] - :or {queue-capacity (Integer/MAX_VALUE)}}] - (let [[queue-seq queue-put] (pipe queue-capacity)] - (doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))] - (future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))] - (queue-put (to-clojure msg))))) - queue-seq)) - + "Creates a sequence of KafkaMessage messages from the given topic. Consumes + messages from a single stream." + [^ConsumerConnector consumer topic] + (let [[_topic [stream & _]] + (first (.createMessageStreams consumer {topic (int 1)}))] + (map to-clojure + (iterator-seq (.iterator ^KafkaStream stream))))) diff --git a/test/clj_kafka/test/consumer/zk.clj b/test/clj_kafka/test/consumer/zk.clj index f4d327a..05b03be 100644 --- a/test/clj_kafka/test/consumer/zk.clj +++ b/test/clj_kafka/test/consumer/zk.clj @@ -23,6 +23,10 @@ (fn [m] (String. (k m) "UTF-8"))) +(defn test-message + [] + (message "test" (.getBytes "Hello, world"))) + (defn send-and-receive [messages] (with-test-broker test-broker-config @@ -30,10 +34,9 @@ zk/shutdown (let [p (producer producer-config)] (send-messages p messages) - (doall (take (count messages) - (zk/messages c ["test"]))))))) + (zk/messages c "test"))))) -(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))])) +(given (first (send-and-receive [(test-message)])) (expect :topic "test" :offset 0 :partition 0