diff --git a/.travis.yml b/.travis.yml index 9f981eb..4fedd9c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,3 @@ language: clojure lein: lein2 -script: lein2 do test, expectations +script: lein2 expectations diff --git a/src/clj_kafka/producer.clj b/src/clj_kafka/producer.clj index d7657fb..a03a17a 100644 --- a/src/clj_kafka/producer.clj +++ b/src/clj_kafka/producer.clj @@ -22,6 +22,10 @@ [^Producer producer ^KeyedMessage message] (.send producer message)) +(defn send-messages + [^Producer producer messages] + (.send producer messages)) + (defn brokers "Get brokers from zookeeper" [m] diff --git a/test/clj_kafka/test/consumer.clj b/test/clj_kafka/test/consumer.clj index 320df3e..a8e164f 100644 --- a/test/clj_kafka/test/consumer.clj +++ b/test/clj_kafka/test/consumer.clj @@ -1,7 +1,7 @@ (ns clj-kafka.test.consumer - (:use [clojure.test] + (:use [expectations] [clj-kafka.core :only (with-resource to-clojure)] - [clj-kafka.producer :only (producer send-message message)] + [clj-kafka.producer :only (producer send-messages message)] [clj-kafka.test.utils :only (with-test-broker)]) (:require [clj-kafka.consumer.zk :as zk] [clj-kafka.consumer.simple :as simp])) @@ -14,50 +14,28 @@ :kafka-port 9999 :topic "test"}) -(deftest test-zookeeper-consumption - (let [consumer-config {"zookeeper.connect" "localhost:2182" - "group.id" "clj-kafka.test.consumer" - "auto.offset.reset" "smallest" - "auto.commit.enable" "false"}] - (testing "Sending single message" - (with-test-broker test-broker-config - (let [p (producer producer-config)] - (with-resource [c (zk/consumer consumer-config)] - zk/shutdown - (send-message p (message "test" (.getBytes "Hello, world"))) - (let [{:keys [topic offset partition key value]} (first (zk/messages c ["test"]))] - (is (= "test" topic)) - (is (= 0 offset)) - (is (= 0 partition)) - (is (= "Hello, world" (String. value "UTF-8")))))))) - (testing "Sending multiple messages" - (with-test-broker test-broker-config - (let [p (producer producer-config)] - (with-resource [c (zk/consumer consumer-config)] - zk/shutdown - (send-message p (message "test" (.getBytes "Hello, world"))) - (let [{:keys [topic offset partition key value]} (first (zk/messages c ["test"]))] - (is (= "test" topic)) - (is (= 0 offset)) - (is (= 0 partition)) - (is (= "Hello, world" (String. value "UTF-8")))))))))) +(def consumer-config {"zookeeper.connect" "localhost:2182" + "group.id" "clj-kafka.test.consumer" + "auto.offset.reset" "smallest" + "auto.commit.enable" "false"}) +(defn string-value + [k] + (fn [m] + (String. (k m) "UTF-8"))) -(deftest test-simple-consumer +(defn send-and-receive + [messages] (with-test-broker test-broker-config - (let [p (producer producer-config) - c (simp/consumer "localhost" 9999 "simple-consumer")] - (send-message p (message "test" (.getBytes "Hello, world"))) - (let [msgs (simp/messages c - "clj-kafka.test.simple-consumer" - "test" - 0 - 0 - 1024) - msg (to-clojure (first msgs))] - (let [{:keys [topic offset partition key value]} msg] - (is (= nil topic)) - (is (= 0 offset)) - (is (= nil partition)) - (is (= nil key)) - (is (= "Hello, world" (String. value)))))))) + (with-resource [c (zk/consumer consumer-config)] + zk/shutdown + (let [p (producer producer-config)] + (send-messages p messages) + (doall (take (count messages) + (zk/messages c ["test"]))))))) + +(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))])) + (expect :topic "test" + :offset 0 + :partition 0 + (string-value :value) "Hello, world"))