From 27ba361517798fdf74d3e84fb8658a54444c4982 Mon Sep 17 00:00:00 2001 From: Paul Lam Date: Mon, 10 Jun 2013 11:56:49 +0100 Subject: [PATCH] zk/controller to get leader node --- src/clj_kafka/zk.clj | 10 ++++++++++ test/clj_kafka/test/zk.clj | 12 +++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/clj_kafka/zk.clj b/src/clj_kafka/zk.clj index a89cdbf..7e5236b 100644 --- a/src/clj_kafka/zk.clj +++ b/src/clj_kafka/zk.clj @@ -13,3 +13,13 @@ :data #(zk/data z (str "/brokers/ids/" %))) (zk/children z "/brokers/ids"))))) + +(defn controller + "Get leader node" + [m] + (with-resource [z (zk/connect (get m "zookeeper.connect"))] + zk/close + (-> (zk/data z "/controller") + :data + String. + Integer/valueOf))) diff --git a/test/clj_kafka/test/zk.clj b/test/clj_kafka/test/zk.clj index 1a06f7b..4342bb6 100644 --- a/test/clj_kafka/test/zk.clj +++ b/test/clj_kafka/test/zk.clj @@ -3,9 +3,15 @@ clj-kafka.zk [clj-kafka.test.utils :only (with-test-broker)])) -(given (with-test-broker {:zookeeper-port 2182 - :kafka-port 9999 - :topic "test"} +(def config {:zookeeper-port 2182 + :kafka-port 9999 + :topic "test"}) + +(given (with-test-broker config (brokers {"zookeeper.connect" "127.0.0.1:2182"})) (expect count 1 first {:host "localhost", :jmx_port -1, :port 9999, :version 1})) + +(given (with-test-broker config + (controller {"zookeeper.connect" "127.0.0.1:2182"})) + (expect identity 0))