From c606237b1cbcdb9fc4dc3819f41fc281ec2fe354 Mon Sep 17 00:00:00 2001 From: Russell Date: Mon, 24 Feb 2014 09:29:22 +0000 Subject: [PATCH 1/4] Remove queue from zk consumer To ensure client code has explicit control over the reading of messages This change removes the ability to listen to multiple topics at once, as it is not feasible to build this functionality in at the library level while still giving client code total control and knowledge over which messages have been read / what the current offset is. --- README.md | 8 +------- src/clj_kafka/consumer/zk.clj | 27 +++++++-------------------- test/clj_kafka/test/consumer/zk.clj | 9 ++++++--- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index acf6797..fc9336a 100644 --- a/README.md +++ b/README.md @@ -50,13 +50,7 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con (with-resource [c (consumer config)] shutdown - (take 2 (messages c ["test"]))) -``` - -It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence: - -```clojure -(take 5 (messages c ["test1" "test2"])) + (take 2 (messages c "test"))) ``` ## License diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index b72bbfa..944c546 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -28,24 +28,11 @@ [^ConsumerConnector consumer] (.shutdown consumer)) -(defn- topic-map - [topics] - (apply hash-map (interleave topics - (repeat (Integer/valueOf 1))))) - (defn messages - "Creates a sequence of KafkaMessage messages from the given topics. Consumes - messages from a single stream. topics is a collection of topics to consume - from. - Optional: queue-capacity. Can be used to limit number of messages held in - queue before they've been dequeued in the returned sequence. Defaults to - Integer/MAX_VALUE but can be changed if your messages are particularly large - and consumption is slow." - [^ConsumerConnector consumer topics & {:keys [queue-capacity] - :or {queue-capacity (Integer/MAX_VALUE)}}] - (let [[queue-seq queue-put] (pipe queue-capacity)] - (doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))] - (future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))] - (queue-put (to-clojure msg))))) - queue-seq)) - + "Creates a sequence of KafkaMessage messages from the given topic. Consumes + messages from a single stream." + [^ConsumerConnector consumer topic] + (let [[_topic [stream & _]] + (first (.createMessageStreams consumer {topic (int 1)}))] + (map to-clojure + (iterator-seq (.iterator ^KafkaStream stream))))) diff --git a/test/clj_kafka/test/consumer/zk.clj b/test/clj_kafka/test/consumer/zk.clj index f4d327a..05b03be 100644 --- a/test/clj_kafka/test/consumer/zk.clj +++ b/test/clj_kafka/test/consumer/zk.clj @@ -23,6 +23,10 @@ (fn [m] (String. (k m) "UTF-8"))) +(defn test-message + [] + (message "test" (.getBytes "Hello, world"))) + (defn send-and-receive [messages] (with-test-broker test-broker-config @@ -30,10 +34,9 @@ zk/shutdown (let [p (producer producer-config)] (send-messages p messages) - (doall (take (count messages) - (zk/messages c ["test"]))))))) + (zk/messages c "test"))))) -(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))])) +(given (first (send-and-receive [(test-message)])) (expect :topic "test" :offset 0 :partition 0 From ce5cd3eaa2e3625e0a826233da3f091da598d568 Mon Sep 17 00:00:00 2001 From: Russell Date: Mon, 24 Feb 2014 09:31:53 +0000 Subject: [PATCH 2/4] Remove no longer used pipe implementation --- src/clj_kafka/consumer/zk.clj | 2 +- src/clj_kafka/core.clj | 19 ------------------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index 944c546..22bc64e 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -1,7 +1,7 @@ (ns clj-kafka.consumer.zk (:import [kafka.consumer ConsumerConfig Consumer KafkaStream] [kafka.javaapi.consumer ConsumerConnector]) - (:use [clj-kafka.core :only (as-properties to-clojure with-resource pipe)]) + (:use [clj-kafka.core :only (as-properties to-clojure with-resource)]) (:require [zookeeper :as zk])) (defn consumer diff --git a/src/clj_kafka/core.clj b/src/clj_kafka/core.clj index 5420256..2bbc3cd 100644 --- a/src/clj_kafka/core.clj +++ b/src/clj_kafka/core.clj @@ -61,22 +61,3 @@ TopicMetadataResponse (to-clojure [x] (map to-clojure (.topicsMetadata x)))) - -(defn pipe - "Returns a vector containing a sequence that will read from the - queue, and a function that inserts items into the queue. - - Source: http://clj-me.cgrand.net/2010/04/02/pipe-dreams-are-not-necessarily-made-of-promises/" - ([] (pipe 100)) - ([size] - (let [q (java.util.concurrent.LinkedBlockingQueue. ^int size) - EOQ (Object.) - NIL (Object.) - s (fn queue-seq [] (lazy-seq - (let [x (.take q)] - (when-not (= EOQ x) - (cons (when-not (= NIL x) x) - (queue-seq))))))] - [(s) (fn queue-put - ([] (.put q EOQ)) - ([x] (.put q (or x NIL))))]))) From 1723d57e812f1e70a860716b49f024d7a6a65796 Mon Sep 17 00:00:00 2001 From: Russell Date: Mon, 24 Feb 2014 12:35:52 +0000 Subject: [PATCH 3/4] Add nrepl-port to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 41d2931..276db42 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ autodoc/** .lein-repl-history /tmp /logs +.nrepl-port From 9a42598d835eb19400a906e737b5bbab369f177c Mon Sep 17 00:00:00 2001 From: Russell Date: Mon, 24 Feb 2014 12:38:03 +0000 Subject: [PATCH 4/4] Up project version to reflect breaking change --- project.clj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project.clj b/project.clj index 478a0d5..1b546d0 100644 --- a/project.clj +++ b/project.clj @@ -1,9 +1,9 @@ -(defproject clj-kafka/clj-kafka "0.1.2-0.8" +(defproject clj-kafka/clj-kafka "0.2.0-0.8" :min-lein-version "2.0.0" :dependencies [[org.clojure/clojure "1.5.1"] [zookeeper-clj "0.9.3"] [org.clojure/data.json "0.2.2"] - + ;; kafka and its related deps [org.apache.kafka/kafka_2.10 "0.8.0"] [org.apache.zookeeper/zookeeper "3.3.4"]