From ddb0a773916684b9504a7f0c602bd9f11d20ad5e Mon Sep 17 00:00:00 2001 From: Paul Ingles Date: Thu, 1 May 2014 15:23:56 +0100 Subject: [PATCH] add topic-offset to simple consumer: can be used to determine earliest offset available too --- project.clj | 2 +- src/clj_kafka/consumer/simple.clj | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/project.clj b/project.clj index 70d51d7..befc59b 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject clj-kafka/clj-kafka "0.2.2-0.8" +(defproject clj-kafka/clj-kafka "0.2.3-0.8" :min-lein-version "2.0.0" :dependencies [[org.clojure/clojure "1.5.1"] [zookeeper-clj "0.9.3"] diff --git a/src/clj_kafka/consumer/simple.clj b/src/clj_kafka/consumer/simple.clj index f3281ea..82dbcf2 100644 --- a/src/clj_kafka/consumer/simple.clj +++ b/src/clj_kafka/consumer/simple.clj @@ -31,9 +31,13 @@ (defn topic-meta-data [consumer topics] (to-clojure (.send consumer (TopicMetadataRequest. topics)))) -(defn latest-topic-offset [consumer topic partition] - (let [tp (TopicAndPartition. topic partition) - pori (PartitionOffsetRequestInfo. -1 1) +(defn topic-offset [consumer topic partition offset-position] + (let [op {:latest -1 :earliest -2} + tp (TopicAndPartition. topic partition) + pori (PartitionOffsetRequestInfo. (offset-position op) 1) hm (java.util.HashMap. {tp pori})] (let [response (.getOffsetsBefore consumer (OffsetRequest. hm (kafka.api.OffsetRequest/CurrentVersion) "clj-kafka-id"))] (first (.offsets response topic partition))))) + +(defn latest-topic-offset [consumer topic partition] + (topic-offset consumer topic partition :latest))