add topic-offset to simple consumer: can be used to determine earliest offset available too
This commit is contained in:
parent
5b7afa5723
commit
ddb0a77391
2 changed files with 8 additions and 4 deletions
|
@ -1,4 +1,4 @@
|
|||
(defproject clj-kafka/clj-kafka "0.2.2-0.8"
|
||||
(defproject clj-kafka/clj-kafka "0.2.3-0.8"
|
||||
:min-lein-version "2.0.0"
|
||||
:dependencies [[org.clojure/clojure "1.5.1"]
|
||||
[zookeeper-clj "0.9.3"]
|
||||
|
|
|
@ -31,9 +31,13 @@
|
|||
(defn topic-meta-data [consumer topics]
|
||||
(to-clojure (.send consumer (TopicMetadataRequest. topics))))
|
||||
|
||||
(defn latest-topic-offset [consumer topic partition]
|
||||
(let [tp (TopicAndPartition. topic partition)
|
||||
pori (PartitionOffsetRequestInfo. -1 1)
|
||||
(defn topic-offset [consumer topic partition offset-position]
|
||||
(let [op {:latest -1 :earliest -2}
|
||||
tp (TopicAndPartition. topic partition)
|
||||
pori (PartitionOffsetRequestInfo. (offset-position op) 1)
|
||||
hm (java.util.HashMap. {tp pori})]
|
||||
(let [response (.getOffsetsBefore consumer (OffsetRequest. hm (kafka.api.OffsetRequest/CurrentVersion) "clj-kafka-id"))]
|
||||
(first (.offsets response topic partition)))))
|
||||
|
||||
(defn latest-topic-offset [consumer topic partition]
|
||||
(topic-offset consumer topic partition :latest))
|
||||
|
|
Loading…
Reference in a new issue