diff --git a/.gitignore b/.gitignore index 41d2931..276db42 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ autodoc/** .lein-repl-history /tmp /logs +.nrepl-port diff --git a/README.md b/README.md index acf6797..fc9336a 100644 --- a/README.md +++ b/README.md @@ -50,13 +50,7 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con (with-resource [c (consumer config)] shutdown - (take 2 (messages c ["test"]))) -``` - -It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence: - -```clojure -(take 5 (messages c ["test1" "test2"])) + (take 2 (messages c "test"))) ``` ## License diff --git a/project.clj b/project.clj index 478a0d5..1b546d0 100644 --- a/project.clj +++ b/project.clj @@ -1,9 +1,9 @@ -(defproject clj-kafka/clj-kafka "0.1.2-0.8" +(defproject clj-kafka/clj-kafka "0.2.0-0.8" :min-lein-version "2.0.0" :dependencies [[org.clojure/clojure "1.5.1"] [zookeeper-clj "0.9.3"] [org.clojure/data.json "0.2.2"] - + ;; kafka and its related deps [org.apache.kafka/kafka_2.10 "0.8.0"] [org.apache.zookeeper/zookeeper "3.3.4"] diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index b72bbfa..22bc64e 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -1,7 +1,7 @@ (ns clj-kafka.consumer.zk (:import [kafka.consumer ConsumerConfig Consumer KafkaStream] [kafka.javaapi.consumer ConsumerConnector]) - (:use [clj-kafka.core :only (as-properties to-clojure with-resource pipe)]) + (:use [clj-kafka.core :only (as-properties to-clojure with-resource)]) (:require [zookeeper :as zk])) (defn consumer @@ -28,24 +28,11 @@ [^ConsumerConnector consumer] (.shutdown consumer)) -(defn- topic-map - [topics] - (apply hash-map (interleave topics - (repeat (Integer/valueOf 1))))) - (defn messages - "Creates a sequence of KafkaMessage messages from the given topics. Consumes - messages from a single stream. topics is a collection of topics to consume - from. - Optional: queue-capacity. Can be used to limit number of messages held in - queue before they've been dequeued in the returned sequence. Defaults to - Integer/MAX_VALUE but can be changed if your messages are particularly large - and consumption is slow." - [^ConsumerConnector consumer topics & {:keys [queue-capacity] - :or {queue-capacity (Integer/MAX_VALUE)}}] - (let [[queue-seq queue-put] (pipe queue-capacity)] - (doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))] - (future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))] - (queue-put (to-clojure msg))))) - queue-seq)) - + "Creates a sequence of KafkaMessage messages from the given topic. Consumes + messages from a single stream." + [^ConsumerConnector consumer topic] + (let [[_topic [stream & _]] + (first (.createMessageStreams consumer {topic (int 1)}))] + (map to-clojure + (iterator-seq (.iterator ^KafkaStream stream))))) diff --git a/src/clj_kafka/core.clj b/src/clj_kafka/core.clj index 5420256..2bbc3cd 100644 --- a/src/clj_kafka/core.clj +++ b/src/clj_kafka/core.clj @@ -61,22 +61,3 @@ TopicMetadataResponse (to-clojure [x] (map to-clojure (.topicsMetadata x)))) - -(defn pipe - "Returns a vector containing a sequence that will read from the - queue, and a function that inserts items into the queue. - - Source: http://clj-me.cgrand.net/2010/04/02/pipe-dreams-are-not-necessarily-made-of-promises/" - ([] (pipe 100)) - ([size] - (let [q (java.util.concurrent.LinkedBlockingQueue. ^int size) - EOQ (Object.) - NIL (Object.) - s (fn queue-seq [] (lazy-seq - (let [x (.take q)] - (when-not (= EOQ x) - (cons (when-not (= NIL x) x) - (queue-seq))))))] - [(s) (fn queue-put - ([] (.put q EOQ)) - ([x] (.put q (or x NIL))))]))) diff --git a/test/clj_kafka/test/consumer/zk.clj b/test/clj_kafka/test/consumer/zk.clj index f4d327a..05b03be 100644 --- a/test/clj_kafka/test/consumer/zk.clj +++ b/test/clj_kafka/test/consumer/zk.clj @@ -23,6 +23,10 @@ (fn [m] (String. (k m) "UTF-8"))) +(defn test-message + [] + (message "test" (.getBytes "Hello, world"))) + (defn send-and-receive [messages] (with-test-broker test-broker-config @@ -30,10 +34,9 @@ zk/shutdown (let [p (producer producer-config)] (send-messages p messages) - (doall (take (count messages) - (zk/messages c ["test"]))))))) + (zk/messages c "test"))))) -(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))])) +(given (first (send-and-receive [(test-message)])) (expect :topic "test" :offset 0 :partition 0