Update README; pull with-resource into core; add producer

This commit is contained in:
Paul Ingles 2012-01-26 10:56:32 +00:00
parent 9ddb351353
commit efee73f42d
4 changed files with 89 additions and 14 deletions

View file

@ -2,20 +2,70 @@
Simple Clojure interface to [Kafka](http://incubator.apache.org/kafka/).
It's currently a snapshot only until things flesh out a little more.
## Installing
Add the following to your [Leiningen](http://github.com/technomancy/leiningen) `project.clj`:
```clj
[clj-kafka "0.0.2-0.7-SNAPSHOT"]
```
## Usage
clj-kafka currently only supports consuming Kafka data against Kafka 0.7 and using the SimpleConsumer. Future versions may also include support for the ZookeeperConsumer (which tracks offsets and brokers using Zookeeper) and support for producing messages.
clj-kafka currently only supports Kafka 0.7.
### Producer
Allows batching of messages:
```clj
(use 'clj-kafka.producer)
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
(map #(.getBytes %))
(map message)))
```
Or sending a single message:
```clj
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))
```
### SimpleConsumer
```clj
user> (def c (create-consumer "localhost" 9092))
user> (def f (fetch "test" 0 0 4096))
(use 'clj-kafka.consumer.simple)
(def c (create-consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))
(messages c f)
user> (messages c f)
({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})
```
### Zookeeper Consumer
The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.
```clj
(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)
(def config {"zk.connect" "localhost:2181"})
(with-resource [c (consumer config)]
shutdown
(take 5 (messages c "test")))
({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})
```
## License
Copyright &copy; 2012 Paul Ingles

View file

@ -2,14 +2,6 @@
(:import [kafka.consumer ConsumerConfig Consumer])
(:use [clj-kafka.core :only (as-properties to-clojure)]))
(defmacro with-resource
[binding close-fn & body]
`(let ~binding
(try
(do ~@body)
(finally
(~close-fn ~(binding 0))))))
(defn consumer
"Uses information in Zookeeper to connect to Kafka. More info on settings
is available here: http://incubator.apache.org/kafka/configuration.html
@ -41,4 +33,4 @@
(let [topic-map {topic (Integer/valueOf 1)}
streams (.createMessageStreams consumer topic-map)
stream (first (.get streams topic))]
(map to-clojure (iterator-seq (.iterator stream)))))
(map to-clojure (iterator-seq (.iterator stream)))))

View file

@ -8,8 +8,16 @@
(doseq [[n v] m] (.setProperty props n v))
props))
(defmacro with-resource
[binding close-fn & body]
`(let ~binding
(try
(do ~@body)
(finally
(~close-fn ~(binding 0))))))
(defprotocol ToClojure
(to-clojure [_] "Converts type to Clojure structure"))
(to-clojure [x] "Converts type to Clojure structure"))
(extend-protocol ToClojure
MessageAndOffset

View file

@ -0,0 +1,25 @@
(ns clj-kafka.producer
(:import [kafka.javaapi.producer Producer ProducerData]
[kafka.producer ProducerConfig]
[kafka.message Message])
(:use [clj-kafka.core :only (as-properties as-bytes)]))
(defn producer
"Creates a Producer. m is the configuration
serializer.class : default is kafka.serializer.DefaultEncoder
zk.connect : Zookeeper connection. e.g. localhost:2181 "
[m]
(Producer. (ProducerConfig. (as-properties m))))
(defn message
"Creates a message with the specified payload.
payload : bytes for the message payload. e.g. (.getBytes \"hello, world\")"
[payload]
(Message. payload))
(defn send-messages
"Sends a message.
topic : a string
msgs : a single message, or sequence of messages to send"
[producer topic msgs]
(.send producer (ProducerData. topic msgs)))