create embedded zookeeper server
This commit is contained in:
parent
79544297fe
commit
a89a454d3b
1 changed files with 48 additions and 11 deletions
|
@ -1,17 +1,30 @@
|
||||||
(ns clj-kafka.test.consumer
|
(ns clj-kafka.test.consumer
|
||||||
(:import [kafka.server KafkaConfig KafkaServer]
|
(:import [kafka.server KafkaConfig KafkaServer]
|
||||||
|
[java.net InetSocketAddress]
|
||||||
|
[org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory]
|
||||||
[org.apache.commons.io FileUtils])
|
[org.apache.commons.io FileUtils])
|
||||||
(:use [clojure.test]
|
(:use [clojure.test]
|
||||||
[clojure.java.io :only (file)]
|
[clojure.java.io :only (file)]
|
||||||
[clj-kafka.core :only (as-properties)]))
|
[clj-kafka.core :only (as-properties)]))
|
||||||
|
|
||||||
(def broker-config {"broker.id" "0"
|
(defn tmp-dir
|
||||||
"port" "9999"
|
[path]
|
||||||
"host.name" "localhost"
|
(.getPath (file (System/getProperty "java.io.tmpdir") path)))
|
||||||
"zookeeper.connect" "127.0.0.1:2182"
|
|
||||||
"enable.zookeeper" "false"
|
(def zk-config {:host "127.0.0.1"
|
||||||
"log.flush.interval.messages" "1"
|
:port 2182
|
||||||
"log.dir" (.getAbsolutePath (clojure.java.io/file "tmp/log"))})
|
:snapshot-dir (tmp-dir "zookeeper-snapshot")
|
||||||
|
:log-dir (tmp-dir "zookeeper-log")})
|
||||||
|
|
||||||
|
(def broker-config (let [{:keys [host port]} zk-config]
|
||||||
|
{"broker.id" "0"
|
||||||
|
"port" "9999"
|
||||||
|
"host.name" "localhost"
|
||||||
|
"zookeeper.connect" (str host ":" port)
|
||||||
|
"enable.zookeeper" "false"
|
||||||
|
"log.flush.interval.messages" "1"
|
||||||
|
"auto.create.topics.enabled" "true"
|
||||||
|
"log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}))
|
||||||
|
|
||||||
(def system-time (proxy [kafka.utils.Time] []
|
(def system-time (proxy [kafka.utils.Time] []
|
||||||
(milliseconds [] (System/currentTimeMillis))
|
(milliseconds [] (System/currentTimeMillis))
|
||||||
|
@ -32,20 +45,44 @@
|
||||||
[broker]
|
[broker]
|
||||||
(.shutdown broker))
|
(.shutdown broker))
|
||||||
|
|
||||||
|
(defn create-zookeeper
|
||||||
|
[{:keys [port host snapshot-dir log-dir]}]
|
||||||
|
(let [tick-time 500
|
||||||
|
zk (ZooKeeperServer. (file snapshot-dir) (file log-dir) tick-time)]
|
||||||
|
(doto (NIOServerCnxn$Factory. (InetSocketAddress. host port))
|
||||||
|
(.startup zk))))
|
||||||
|
|
||||||
|
(defn shutdown-zookeeper
|
||||||
|
[zookeeper]
|
||||||
|
(.shutdown zookeeper))
|
||||||
|
|
||||||
(defn- clean-broker-data
|
(defn- clean-broker-data
|
||||||
[]
|
[]
|
||||||
(FileUtils/deleteDirectory (file (broker-config "log.dir"))))
|
(FileUtils/deleteDirectory (file (broker-config "log.dir"))))
|
||||||
|
|
||||||
|
(defn- clean-zk
|
||||||
|
[]
|
||||||
|
(let [{:keys [snapshot-dir log-dir]} zk-config]
|
||||||
|
(FileUtils/deleteDirectory (file snapshot-dir))
|
||||||
|
(FileUtils/deleteDirectory (file log-dir))))
|
||||||
|
|
||||||
|
(defn- cleanup
|
||||||
|
[]
|
||||||
|
(clean-broker-data)
|
||||||
|
(clean-zk))
|
||||||
|
|
||||||
(defmacro with-broker
|
(defmacro with-broker
|
||||||
"Creates an in-process broker that can be used to test against"
|
"Creates an in-process broker that can be used to test against"
|
||||||
[& body]
|
[& body]
|
||||||
`(let [broker# (create-broker)]
|
`(let [broker# (create-broker)
|
||||||
(try (do (clean-broker-data)
|
zookeeper# (create-zookeeper zk-config)]
|
||||||
(start broker#)
|
(try (do (start broker#)
|
||||||
~@body)
|
~@body)
|
||||||
(catch Exception e#
|
(catch Exception e#
|
||||||
(throw e#))
|
(throw e#))
|
||||||
(finally (stop broker#)))))
|
(finally (do (stop broker#)
|
||||||
|
(shutdown-zookeeper zookeeper#)
|
||||||
|
(cleanup))))))
|
||||||
|
|
||||||
(deftest testing-something
|
(deftest testing-something
|
||||||
(with-broker (is (= 1 1))))
|
(with-broker (is (= 1 1))))
|
Loading…
Reference in a new issue