map simple consumer messages to-clojure
This commit is contained in:
parent
f997dcfcb0
commit
f9a1e978ca
1 changed files with 5 additions and 6 deletions
|
@ -1,9 +1,8 @@
|
||||||
(ns clj-kafka.consumer.simple
|
(ns clj-kafka.consumer.simple
|
||||||
(:use [clj-kafka.core :only (to-clojure)])
|
(:use [clj-kafka.core :only (to-clojure)])
|
||||||
(:import [kafka.javaapi.consumer SimpleConsumer]
|
(:import [kafka.javaapi.consumer SimpleConsumer]
|
||||||
[kafka.api FetchRequestBuilder PartitionOffsetRequestInfo]
|
[kafka.api FetchRequest FetchRequestBuilder PartitionOffsetRequestInfo]
|
||||||
[kafka.javaapi OffsetRequest]
|
[kafka.javaapi OffsetRequest TopicMetadataRequest FetchResponse]
|
||||||
[kafka.javaapi TopicMetadataRequest]
|
|
||||||
[kafka.common TopicAndPartition]))
|
[kafka.common TopicAndPartition]))
|
||||||
|
|
||||||
(defn consumer
|
(defn consumer
|
||||||
|
@ -25,9 +24,9 @@
|
||||||
(defn messages
|
(defn messages
|
||||||
[^SimpleConsumer consumer client-id topic partition offset fetch-size]
|
[^SimpleConsumer consumer client-id topic partition offset fetch-size]
|
||||||
(let [fetch (fetch-request client-id topic partition offset fetch-size)]
|
(let [fetch (fetch-request client-id topic partition offset fetch-size)]
|
||||||
(iterator-seq (.iterator (.messageSet ^kafka.javaapi.FetchResponse (.fetch consumer ^kafka.api.FetchRequest fetch)
|
(map to-clojure (iterator-seq (.iterator (.messageSet ^FetchResponse (.fetch consumer ^FetchRequest fetch)
|
||||||
topic
|
topic
|
||||||
partition)))))
|
partition))))))
|
||||||
|
|
||||||
(defn topic-meta-data [consumer topics]
|
(defn topic-meta-data [consumer topics]
|
||||||
(to-clojure (.send consumer (TopicMetadataRequest. topics))))
|
(to-clojure (.send consumer (TopicMetadataRequest. topics))))
|
||||||
|
|
Loading…
Reference in a new issue