add committed-offset and set-offset! to allow offsets to be manipulated/read directly
This commit is contained in:
parent
96275316e9
commit
6b506b692e
1 changed files with 29 additions and 1 deletions
|
@ -2,7 +2,8 @@
|
|||
(:use [clojure.data.json :only (read-str)]
|
||||
[clj-kafka.core :only (with-resource)])
|
||||
(:require [zookeeper :as zk]
|
||||
[clojure.string :as s]))
|
||||
[clojure.string :as s])
|
||||
(:import [org.apache.zookeeper KeeperException$NoNodeException]))
|
||||
|
||||
(defn brokers
|
||||
"Get brokers from zookeeper"
|
||||
|
@ -55,3 +56,30 @@
|
|||
String.
|
||||
read-str
|
||||
(get "partitions"))))
|
||||
|
||||
(defn offset-path
|
||||
[consumer-group topic partition]
|
||||
(str "/consumers/" consumer-group "/offsets/" topic "/" partition))
|
||||
|
||||
(defn committed-offset
|
||||
[m consumer-group topic partition]
|
||||
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
|
||||
zk/close
|
||||
(try (-> (zk/data z (offset-path consumer-group topic partition))
|
||||
:data
|
||||
(String. )
|
||||
(Long/valueOf))
|
||||
(catch KeeperException$NoNodeException e
|
||||
nil))))
|
||||
|
||||
(defn set-offset!
|
||||
[m consumer-group topic partition offset]
|
||||
{:pre [(or (nil? offset) (number? offset))]}
|
||||
(let [path (offset-path consumer-group topic partition)]
|
||||
(with-resource [z (zk/connect (get m "zookeeper.connect"))]
|
||||
zk/close
|
||||
(if (nil? offset)
|
||||
(zk/delete z path)
|
||||
(if-let [{:keys [version] :as m} (zk/exists z path)]
|
||||
(zk/set-data z path (.getBytes (str offset)) version)
|
||||
(zk/create z path :persistent? true :data (.getBytes (str offset))))))))
|
||||
|
|
Loading…
Reference in a new issue