From 1a695580a4dab4f7da4915be13d1e750a18497c9 Mon Sep 17 00:00:00 2001 From: Graham Berks Date: Tue, 18 Nov 2014 21:26:17 +0000 Subject: [PATCH] Upgraded kafka to 0.8.1.1 and zkclient to 0.4 --- .gitignore | 2 ++ project.clj | 4 ++-- test/clj_kafka/test/utils.clj | 43 +++++++++++++++++++---------------- test/clj_kafka/test/zk.clj | 13 ++++++----- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index 276db42..a7f1b46 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ autodoc/** /tmp /logs .nrepl-port +.idea +*.iml diff --git a/project.clj b/project.clj index b73d97f..70c6a3b 100644 --- a/project.clj +++ b/project.clj @@ -5,9 +5,9 @@ [org.clojure/data.json "0.2.2"] ;; kafka and its related deps - [org.apache.kafka/kafka_2.10 "0.8.0"] + [org.apache.kafka/kafka_2.10 "0.8.1.1"] [org.apache.zookeeper/zookeeper "3.3.4"] - [com.101tec/zkclient "0.3"] + [com.101tec/zkclient "0.4"] [com.yammer.metrics/metrics-core "2.2.0"] [org.scala-lang/scala-library "2.10.1"] [net.sf.jopt-simple/jopt-simple "3.2"]] diff --git a/test/clj_kafka/test/utils.clj b/test/clj_kafka/test/utils.clj index b553cb9..7a39c33 100644 --- a/test/clj_kafka/test/utils.clj +++ b/test/clj_kafka/test/utils.clj @@ -1,12 +1,14 @@ (ns clj-kafka.test.utils - (:import [kafka.server KafkaConfig KafkaServer] - [kafka.admin CreateTopicCommand] - [kafka.common TopicAndPartition] - [java.net InetSocketAddress] - [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory] - [org.apache.commons.io FileUtils] - [org.I0Itec.zkclient ZkClient] - [org.I0Itec.zkclient.serialize ZkSerializer]) + (:import + [kafka.admin AdminUtils] + [kafka.server KafkaConfig KafkaServer] + [java.net InetSocketAddress] + [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory] + [org.apache.commons.io FileUtils] + [org.I0Itec.zkclient ZkClient] + [org.I0Itec.zkclient.serialize ZkSerializer] + [kafka.utils Time] + (java.util Properties)) (:use [clojure.java.io :only (file)] [clj-kafka.core :only (as-properties)])) @@ -14,7 +16,7 @@ [& parts] (.getPath (apply file (System/getProperty "java.io.tmpdir") "clj-kafka" parts))) -(def system-time (proxy [kafka.utils.Time] [] +(def system-time (proxy [Time] [] (milliseconds [] (System/currentTimeMillis)) (nanoseconds [] (System/nanoTime)) (sleep [ms] (Thread/sleep ms)))) @@ -22,14 +24,14 @@ ;; enable.zookeeper doesn't seem to be used- check it actually has an effect (defn create-broker [{:keys [kafka-port zookeeper-port]}] - (let [base-config {"broker.id" "0" - "port" "9999" - "host.name" "localhost" - "zookeeper.connect" (str "127.0.0.1:" zookeeper-port) - "enable.zookeeper" "true" + (let [base-config {"broker.id" "0" + "port" "9999" + "host.name" "localhost" + "zookeeper.connect" (str "127.0.0.1:" zookeeper-port) + "enable.zookeeper" "true" "log.flush.interval.messages" "1" - "auto.create.topics.enable" "true" - "log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}] + "auto.create.topics.enable" "true" + "log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}] (KafkaServer. (KafkaConfig. (as-properties (assoc base-config "port" (str kafka-port)))) system-time))) @@ -41,15 +43,16 @@ (.startup zk)))) (defn wait-until-initialised - [kafka-server topic] - (let [topic-and-partition (TopicAndPartition. topic 0)] - (while (not (.. kafka-server apis leaderCache keySet (contains topic-and-partition))) + [^KafkaServer kafka-server topic] + (let [apis (.apis kafka-server) + cache (.metadataCache apis)] + (while (not (.containsTopicAndPartition cache topic 0)) (Thread/sleep 500)))) (defn create-topic [zk-client topic & {:keys [partitions replicas] :or {partitions 1 replicas 1}}] - (CreateTopicCommand/createTopic zk-client topic partitions replicas "")) + (AdminUtils/createTopic zk-client topic partitions replicas (Properties.))) (def string-serializer (proxy [ZkSerializer] [] (serialize [data] (.getBytes data "UTF-8")) diff --git a/test/clj_kafka/test/zk.clj b/test/clj_kafka/test/zk.clj index 36fa8ec..2d46318 100644 --- a/test/clj_kafka/test/zk.clj +++ b/test/clj_kafka/test/zk.clj @@ -4,29 +4,30 @@ [clj-kafka.test.utils :only (with-test-broker)])) (def config {:zookeeper-port 2182 - :kafka-port 9999 - :topic "test"}) + :kafka-port 9999 + :topic "test"}) (def zk-connect {"zookeeper.connect" "127.0.0.1:2182"}) + (given (with-test-broker config - (brokers zk-connect)) + (brokers zk-connect)) (expect count 1)) (expect nil (broker-list [])) (expect "localhost:2181" (broker-list [{:host "localhost" :port "2181"}])) (given (with-test-broker config - (first (brokers zk-connect))) + (first (brokers zk-connect))) (expect :host "localhost" :jmx_port -1 :port 9999 :version 1)) (given (with-test-broker config - (controller zk-connect)) + (controller zk-connect)) (expect identity 0)) (given (with-test-broker config - (topics zk-connect)) + (topics zk-connect)) (expect count 1 first (:topic config)))