Merge pull request #22 from uswitch/unqueued-zk-consumer

Unqueued zk consumer
This commit is contained in:
Paul Ingles 2014-03-05 12:59:19 +00:00
commit 9c19d5df43
6 changed files with 18 additions and 52 deletions

1
.gitignore vendored
View file

@ -11,3 +11,4 @@ autodoc/**
.lein-repl-history
/tmp
/logs
.nrepl-port

View file

@ -50,13 +50,7 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con
(with-resource [c (consumer config)]
shutdown
(take 2 (messages c ["test"])))
```
It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:
```clojure
(take 5 (messages c ["test1" "test2"]))
(take 2 (messages c "test")))
```
## License

View file

@ -1,9 +1,9 @@
(defproject clj-kafka/clj-kafka "0.1.2-0.8"
(defproject clj-kafka/clj-kafka "0.2.0-0.8"
:min-lein-version "2.0.0"
:dependencies [[org.clojure/clojure "1.5.1"]
[zookeeper-clj "0.9.3"]
[org.clojure/data.json "0.2.2"]
;; kafka and its related deps
[org.apache.kafka/kafka_2.10 "0.8.0"]
[org.apache.zookeeper/zookeeper "3.3.4"]

View file

@ -1,7 +1,7 @@
(ns clj-kafka.consumer.zk
(:import [kafka.consumer ConsumerConfig Consumer KafkaStream]
[kafka.javaapi.consumer ConsumerConnector])
(:use [clj-kafka.core :only (as-properties to-clojure with-resource pipe)])
(:use [clj-kafka.core :only (as-properties to-clojure with-resource)])
(:require [zookeeper :as zk]))
(defn consumer
@ -28,24 +28,11 @@
[^ConsumerConnector consumer]
(.shutdown consumer))
(defn- topic-map
[topics]
(apply hash-map (interleave topics
(repeat (Integer/valueOf 1)))))
(defn messages
"Creates a sequence of KafkaMessage messages from the given topics. Consumes
messages from a single stream. topics is a collection of topics to consume
from.
Optional: queue-capacity. Can be used to limit number of messages held in
queue before they've been dequeued in the returned sequence. Defaults to
Integer/MAX_VALUE but can be changed if your messages are particularly large
and consumption is slow."
[^ConsumerConnector consumer topics & {:keys [queue-capacity]
:or {queue-capacity (Integer/MAX_VALUE)}}]
(let [[queue-seq queue-put] (pipe queue-capacity)]
(doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))]
(future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))]
(queue-put (to-clojure msg)))))
queue-seq))
"Creates a sequence of KafkaMessage messages from the given topic. Consumes
messages from a single stream."
[^ConsumerConnector consumer topic]
(let [[_topic [stream & _]]
(first (.createMessageStreams consumer {topic (int 1)}))]
(map to-clojure
(iterator-seq (.iterator ^KafkaStream stream)))))

View file

@ -61,22 +61,3 @@
TopicMetadataResponse
(to-clojure [x]
(map to-clojure (.topicsMetadata x))))
(defn pipe
"Returns a vector containing a sequence that will read from the
queue, and a function that inserts items into the queue.
Source: http://clj-me.cgrand.net/2010/04/02/pipe-dreams-are-not-necessarily-made-of-promises/"
([] (pipe 100))
([size]
(let [q (java.util.concurrent.LinkedBlockingQueue. ^int size)
EOQ (Object.)
NIL (Object.)
s (fn queue-seq [] (lazy-seq
(let [x (.take q)]
(when-not (= EOQ x)
(cons (when-not (= NIL x) x)
(queue-seq))))))]
[(s) (fn queue-put
([] (.put q EOQ))
([x] (.put q (or x NIL))))])))

View file

@ -23,6 +23,10 @@
(fn [m]
(String. (k m) "UTF-8")))
(defn test-message
[]
(message "test" (.getBytes "Hello, world")))
(defn send-and-receive
[messages]
(with-test-broker test-broker-config
@ -30,10 +34,9 @@
zk/shutdown
(let [p (producer producer-config)]
(send-messages p messages)
(doall (take (count messages)
(zk/messages c ["test"])))))))
(zk/messages c "test")))))
(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))]))
(given (first (send-and-receive [(test-message)]))
(expect :topic "test"
:offset 0
:partition 0