diff --git a/src/clj_kafka/consumer/simple.clj b/src/clj_kafka/consumer/simple.clj index 90e80f5..f3281ea 100644 --- a/src/clj_kafka/consumer/simple.clj +++ b/src/clj_kafka/consumer/simple.clj @@ -1,9 +1,8 @@ (ns clj-kafka.consumer.simple (:use [clj-kafka.core :only (to-clojure)]) (:import [kafka.javaapi.consumer SimpleConsumer] - [kafka.api FetchRequestBuilder PartitionOffsetRequestInfo] - [kafka.javaapi OffsetRequest] - [kafka.javaapi TopicMetadataRequest] + [kafka.api FetchRequest FetchRequestBuilder PartitionOffsetRequestInfo] + [kafka.javaapi OffsetRequest TopicMetadataRequest FetchResponse] [kafka.common TopicAndPartition])) (defn consumer @@ -25,9 +24,9 @@ (defn messages [^SimpleConsumer consumer client-id topic partition offset fetch-size] (let [fetch (fetch-request client-id topic partition offset fetch-size)] - (iterator-seq (.iterator (.messageSet ^kafka.javaapi.FetchResponse (.fetch consumer ^kafka.api.FetchRequest fetch) - topic - partition))))) + (map to-clojure (iterator-seq (.iterator (.messageSet ^FetchResponse (.fetch consumer ^FetchRequest fetch) + topic + partition)))))) (defn topic-meta-data [consumer topics] (to-clojure (.send consumer (TopicMetadataRequest. topics))))