Upgraded kafka to 0.8.1.1 and zkclient to 0.4

This commit is contained in:
Graham Berks 2014-11-18 21:26:17 +00:00
parent 678caa5b2c
commit 1a695580a4
4 changed files with 34 additions and 28 deletions

2
.gitignore vendored
View file

@ -12,3 +12,5 @@ autodoc/**
/tmp /tmp
/logs /logs
.nrepl-port .nrepl-port
.idea
*.iml

View file

@ -5,9 +5,9 @@
[org.clojure/data.json "0.2.2"] [org.clojure/data.json "0.2.2"]
;; kafka and its related deps ;; kafka and its related deps
[org.apache.kafka/kafka_2.10 "0.8.0"] [org.apache.kafka/kafka_2.10 "0.8.1.1"]
[org.apache.zookeeper/zookeeper "3.3.4"] [org.apache.zookeeper/zookeeper "3.3.4"]
[com.101tec/zkclient "0.3"] [com.101tec/zkclient "0.4"]
[com.yammer.metrics/metrics-core "2.2.0"] [com.yammer.metrics/metrics-core "2.2.0"]
[org.scala-lang/scala-library "2.10.1"] [org.scala-lang/scala-library "2.10.1"]
[net.sf.jopt-simple/jopt-simple "3.2"]] [net.sf.jopt-simple/jopt-simple "3.2"]]

View file

@ -1,12 +1,14 @@
(ns clj-kafka.test.utils (ns clj-kafka.test.utils
(:import [kafka.server KafkaConfig KafkaServer] (:import
[kafka.admin CreateTopicCommand] [kafka.admin AdminUtils]
[kafka.common TopicAndPartition] [kafka.server KafkaConfig KafkaServer]
[java.net InetSocketAddress] [java.net InetSocketAddress]
[org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory] [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory]
[org.apache.commons.io FileUtils] [org.apache.commons.io FileUtils]
[org.I0Itec.zkclient ZkClient] [org.I0Itec.zkclient ZkClient]
[org.I0Itec.zkclient.serialize ZkSerializer]) [org.I0Itec.zkclient.serialize ZkSerializer]
[kafka.utils Time]
(java.util Properties))
(:use [clojure.java.io :only (file)] (:use [clojure.java.io :only (file)]
[clj-kafka.core :only (as-properties)])) [clj-kafka.core :only (as-properties)]))
@ -14,7 +16,7 @@
[& parts] [& parts]
(.getPath (apply file (System/getProperty "java.io.tmpdir") "clj-kafka" parts))) (.getPath (apply file (System/getProperty "java.io.tmpdir") "clj-kafka" parts)))
(def system-time (proxy [kafka.utils.Time] [] (def system-time (proxy [Time] []
(milliseconds [] (System/currentTimeMillis)) (milliseconds [] (System/currentTimeMillis))
(nanoseconds [] (System/nanoTime)) (nanoseconds [] (System/nanoTime))
(sleep [ms] (Thread/sleep ms)))) (sleep [ms] (Thread/sleep ms))))
@ -22,14 +24,14 @@
;; enable.zookeeper doesn't seem to be used- check it actually has an effect ;; enable.zookeeper doesn't seem to be used- check it actually has an effect
(defn create-broker (defn create-broker
[{:keys [kafka-port zookeeper-port]}] [{:keys [kafka-port zookeeper-port]}]
(let [base-config {"broker.id" "0" (let [base-config {"broker.id" "0"
"port" "9999" "port" "9999"
"host.name" "localhost" "host.name" "localhost"
"zookeeper.connect" (str "127.0.0.1:" zookeeper-port) "zookeeper.connect" (str "127.0.0.1:" zookeeper-port)
"enable.zookeeper" "true" "enable.zookeeper" "true"
"log.flush.interval.messages" "1" "log.flush.interval.messages" "1"
"auto.create.topics.enable" "true" "auto.create.topics.enable" "true"
"log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}] "log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}]
(KafkaServer. (KafkaConfig. (as-properties (assoc base-config "port" (str kafka-port)))) (KafkaServer. (KafkaConfig. (as-properties (assoc base-config "port" (str kafka-port))))
system-time))) system-time)))
@ -41,15 +43,16 @@
(.startup zk)))) (.startup zk))))
(defn wait-until-initialised (defn wait-until-initialised
[kafka-server topic] [^KafkaServer kafka-server topic]
(let [topic-and-partition (TopicAndPartition. topic 0)] (let [apis (.apis kafka-server)
(while (not (.. kafka-server apis leaderCache keySet (contains topic-and-partition))) cache (.metadataCache apis)]
(while (not (.containsTopicAndPartition cache topic 0))
(Thread/sleep 500)))) (Thread/sleep 500))))
(defn create-topic (defn create-topic
[zk-client topic & {:keys [partitions replicas] [zk-client topic & {:keys [partitions replicas]
:or {partitions 1 replicas 1}}] :or {partitions 1 replicas 1}}]
(CreateTopicCommand/createTopic zk-client topic partitions replicas "")) (AdminUtils/createTopic zk-client topic partitions replicas (Properties.)))
(def string-serializer (proxy [ZkSerializer] [] (def string-serializer (proxy [ZkSerializer] []
(serialize [data] (.getBytes data "UTF-8")) (serialize [data] (.getBytes data "UTF-8"))

View file

@ -4,29 +4,30 @@
[clj-kafka.test.utils :only (with-test-broker)])) [clj-kafka.test.utils :only (with-test-broker)]))
(def config {:zookeeper-port 2182 (def config {:zookeeper-port 2182
:kafka-port 9999 :kafka-port 9999
:topic "test"}) :topic "test"})
(def zk-connect {"zookeeper.connect" "127.0.0.1:2182"}) (def zk-connect {"zookeeper.connect" "127.0.0.1:2182"})
(given (with-test-broker config (given (with-test-broker config
(brokers zk-connect)) (brokers zk-connect))
(expect count 1)) (expect count 1))
(expect nil (broker-list [])) (expect nil (broker-list []))
(expect "localhost:2181" (broker-list [{:host "localhost" :port "2181"}])) (expect "localhost:2181" (broker-list [{:host "localhost" :port "2181"}]))
(given (with-test-broker config (given (with-test-broker config
(first (brokers zk-connect))) (first (brokers zk-connect)))
(expect :host "localhost" (expect :host "localhost"
:jmx_port -1 :jmx_port -1
:port 9999 :port 9999
:version 1)) :version 1))
(given (with-test-broker config (given (with-test-broker config
(controller zk-connect)) (controller zk-connect))
(expect identity 0)) (expect identity 0))
(given (with-test-broker config (given (with-test-broker config
(topics zk-connect)) (topics zk-connect))
(expect count 1 (expect count 1
first (:topic config))) first (:topic config)))