Revert "work on updating the api to work against 0.8"

This reverts commit 5286bbfbb6.
This commit is contained in:
Paul Ingles 2013-06-07 10:48:08 +01:00
parent 5286bbfbb6
commit 2492aee358
2 changed files with 8 additions and 21 deletions

View file

@ -6,7 +6,8 @@
(defn producer
"Creates a Producer. m is the configuration
metadata.broker.list: host1:port1,host1:port2"
serializer.class : default is kafka.serializer.DefaultEncoder
zk.connect : Zookeeper connection. e.g. localhost:2181 "
[m]
^Producer (Producer. (ProducerConfig. (as-properties m))))
@ -16,16 +17,12 @@
[#^bytes payload]
(Message. payload))
(defn- keyed-message
[^String topic ^Message message]
(KeyedMessage. topic nil message))
(defn send-message
(defn send-messages
"Sends a message.
topic : a string
msgs : a single message, or sequence of messages to send"
[^Producer producer ^String topic ^Message message]
(.send producer (keyed-message topic message)))
[^Producer producer ^String topic msgs]
(.send producer (map #(KeyedMessage. topic %) msgs)))
(defprotocol ToMessage

View file

@ -1,17 +1,7 @@
(ns clj-kafka.test.consumer
(:use [clojure.test]
[clj-kafka.test.utils :only (with-broker)]
[clj-kafka.core :only (with-resource)]
[clj-kafka.producer :only (producer to-message send-messages)])
(:require [clj-kafka.consumer.zk :as zk]))
[clj-kafka.test.utils :only (with-broker)]))
(deftest zookeeper-consumer
(deftest testing-something
(with-broker
(let [p (producer {"metadata.broker.list" "localhost:9999"})]
(send-messages p "test" (to-message "Hello, world"))
(with-resource [c (zk/consumer {"zookeeper.connect" "127.0.0.1:2182"
"group.id" "clj-kafka.test.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})]
zk/shutdown
(is (= 1 1))))))
(is (= 1 1))))