update travis-ci to run only expectations
This commit is contained in:
parent
6e19e4c8b3
commit
b9d3c7a069
3 changed files with 29 additions and 47 deletions
|
@ -1,3 +1,3 @@
|
||||||
language: clojure
|
language: clojure
|
||||||
lein: lein2
|
lein: lein2
|
||||||
script: lein2 do test, expectations
|
script: lein2 expectations
|
||||||
|
|
|
@ -22,6 +22,10 @@
|
||||||
[^Producer producer ^KeyedMessage message]
|
[^Producer producer ^KeyedMessage message]
|
||||||
(.send producer message))
|
(.send producer message))
|
||||||
|
|
||||||
|
(defn send-messages
|
||||||
|
[^Producer producer messages]
|
||||||
|
(.send producer messages))
|
||||||
|
|
||||||
(defn brokers
|
(defn brokers
|
||||||
"Get brokers from zookeeper"
|
"Get brokers from zookeeper"
|
||||||
[m]
|
[m]
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
(ns clj-kafka.test.consumer
|
(ns clj-kafka.test.consumer
|
||||||
(:use [clojure.test]
|
(:use [expectations]
|
||||||
[clj-kafka.core :only (with-resource to-clojure)]
|
[clj-kafka.core :only (with-resource to-clojure)]
|
||||||
[clj-kafka.producer :only (producer send-message message)]
|
[clj-kafka.producer :only (producer send-messages message)]
|
||||||
[clj-kafka.test.utils :only (with-test-broker)])
|
[clj-kafka.test.utils :only (with-test-broker)])
|
||||||
(:require [clj-kafka.consumer.zk :as zk]
|
(:require [clj-kafka.consumer.zk :as zk]
|
||||||
[clj-kafka.consumer.simple :as simp]))
|
[clj-kafka.consumer.simple :as simp]))
|
||||||
|
@ -14,50 +14,28 @@
|
||||||
:kafka-port 9999
|
:kafka-port 9999
|
||||||
:topic "test"})
|
:topic "test"})
|
||||||
|
|
||||||
(deftest test-zookeeper-consumption
|
(def consumer-config {"zookeeper.connect" "localhost:2182"
|
||||||
(let [consumer-config {"zookeeper.connect" "localhost:2182"
|
|
||||||
"group.id" "clj-kafka.test.consumer"
|
"group.id" "clj-kafka.test.consumer"
|
||||||
"auto.offset.reset" "smallest"
|
"auto.offset.reset" "smallest"
|
||||||
"auto.commit.enable" "false"}]
|
"auto.commit.enable" "false"})
|
||||||
(testing "Sending single message"
|
|
||||||
|
(defn string-value
|
||||||
|
[k]
|
||||||
|
(fn [m]
|
||||||
|
(String. (k m) "UTF-8")))
|
||||||
|
|
||||||
|
(defn send-and-receive
|
||||||
|
[messages]
|
||||||
(with-test-broker test-broker-config
|
(with-test-broker test-broker-config
|
||||||
(let [p (producer producer-config)]
|
|
||||||
(with-resource [c (zk/consumer consumer-config)]
|
(with-resource [c (zk/consumer consumer-config)]
|
||||||
zk/shutdown
|
zk/shutdown
|
||||||
(send-message p (message "test" (.getBytes "Hello, world")))
|
|
||||||
(let [{:keys [topic offset partition key value]} (first (zk/messages c ["test"]))]
|
|
||||||
(is (= "test" topic))
|
|
||||||
(is (= 0 offset))
|
|
||||||
(is (= 0 partition))
|
|
||||||
(is (= "Hello, world" (String. value "UTF-8"))))))))
|
|
||||||
(testing "Sending multiple messages"
|
|
||||||
(with-test-broker test-broker-config
|
|
||||||
(let [p (producer producer-config)]
|
(let [p (producer producer-config)]
|
||||||
(with-resource [c (zk/consumer consumer-config)]
|
(send-messages p messages)
|
||||||
zk/shutdown
|
(doall (take (count messages)
|
||||||
(send-message p (message "test" (.getBytes "Hello, world")))
|
(zk/messages c ["test"])))))))
|
||||||
(let [{:keys [topic offset partition key value]} (first (zk/messages c ["test"]))]
|
|
||||||
(is (= "test" topic))
|
|
||||||
(is (= 0 offset))
|
|
||||||
(is (= 0 partition))
|
|
||||||
(is (= "Hello, world" (String. value "UTF-8"))))))))))
|
|
||||||
|
|
||||||
|
(given (first (send-and-receive [(message "test" (.getBytes "Hello, world"))]))
|
||||||
(deftest test-simple-consumer
|
(expect :topic "test"
|
||||||
(with-test-broker test-broker-config
|
:offset 0
|
||||||
(let [p (producer producer-config)
|
:partition 0
|
||||||
c (simp/consumer "localhost" 9999 "simple-consumer")]
|
(string-value :value) "Hello, world"))
|
||||||
(send-message p (message "test" (.getBytes "Hello, world")))
|
|
||||||
(let [msgs (simp/messages c
|
|
||||||
"clj-kafka.test.simple-consumer"
|
|
||||||
"test"
|
|
||||||
0
|
|
||||||
0
|
|
||||||
1024)
|
|
||||||
msg (to-clojure (first msgs))]
|
|
||||||
(let [{:keys [topic offset partition key value]} msg]
|
|
||||||
(is (= nil topic))
|
|
||||||
(is (= 0 offset))
|
|
||||||
(is (= nil partition))
|
|
||||||
(is (= nil key))
|
|
||||||
(is (= "Hello, world" (String. value))))))))
|
|
||||||
|
|
Loading…
Reference in a new issue