Remove queue from zk consumer

To ensure client code has explicit control over the reading of messages
This change removes the ability to listen to multiple topics at once,
as it is not feasible to build this functionality in at the library
level while still giving client code total control and knowledge over
which messages have been read / what the current offset is.
This commit is contained in:
Russell 2014-02-24 09:29:22 +00:00
parent edda2420ae
commit c606237b1c
3 changed files with 14 additions and 30 deletions

View file

@ -50,13 +50,7 @@ The Zookeeper consumer uses broker information contained within Zookeeper to con
(with-resource [c (consumer config)]
shutdown
(take 2 (messages c ["test"])))
```
It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:
```clojure
(take 5 (messages c ["test1" "test2"]))
(take 2 (messages c "test")))
```
## License

View file

@ -28,24 +28,11 @@
[^ConsumerConnector consumer]
(.shutdown consumer))
(defn- topic-map
[topics]
(apply hash-map (interleave topics
(repeat (Integer/valueOf 1)))))
(defn messages
"Creates a sequence of KafkaMessage messages from the given topics. Consumes
messages from a single stream. topics is a collection of topics to consume
from.
Optional: queue-capacity. Can be used to limit number of messages held in
queue before they've been dequeued in the returned sequence. Defaults to
Integer/MAX_VALUE but can be changed if your messages are particularly large
and consumption is slow."
[^ConsumerConnector consumer topics & {:keys [queue-capacity]
:or {queue-capacity (Integer/MAX_VALUE)}}]
(let [[queue-seq queue-put] (pipe queue-capacity)]
(doseq [[topic streams] (.createMessageStreams consumer (topic-map topics))]
(future (doseq [msg (iterator-seq (.iterator ^KafkaStream (first streams)))]
(queue-put (to-clojure msg)))))
queue-seq))
"Creates a sequence of KafkaMessage messages from the given topic. Consumes
messages from a single stream."
[^ConsumerConnector consumer topic]
(let [[_topic [stream & _]]
(first (.createMessageStreams consumer {topic (int 1)}))]
(map to-clojure
(iterator-seq (.iterator ^KafkaStream stream)))))

View file

@ -23,6 +23,10 @@
(fn [m]
(String. (k m) "UTF-8")))
(defn test-message
[]
(message "test" (.getBytes "Hello, world")))
(defn send-and-receive
[messages]
(with-test-broker test-broker-config
@ -30,10 +34,9 @@
zk/shutdown
(let [p (producer producer-config)]
(send-messages p messages)
(doall (take (count messages)
(zk/messages c ["test"])))))))
(zk/messages c "test")))))
(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))]))
(given (first (send-and-receive [(test-message)]))
(expect :topic "test"
:offset 0
:partition 0