clj-kafka/README.md

85 lines
2.7 KiB
Markdown
Raw Normal View History

2012-01-22 23:01:36 +00:00
# clj-kafka
Simple Clojure interface to [Kafka](http://incubator.apache.org/kafka/).
2012-01-26 11:00:38 +00:00
It's currently a snapshot only until things flesh out a little more. [API Documentation is also available](http://pingles.github.com/clj-kafka/).
2012-01-26 11:04:00 +00:00
Note: Kafka binaries are not currently published to any public repositories. Additionally, the 0.7 release was [published as source](http://incubator.apache.org/kafka/downloads.html). This library uses [a build of the 0.7 incubator release published on Clojars](http://clojars.org/org.clojars.paul/core-kafka_2.8.0).
2013-05-15 08:27:57 +00:00
Current build status: ![Build status](https://secure.travis-ci.org/pingles/clj-kafka.png)
## Installing
Add the following to your [Leiningen](http://github.com/technomancy/leiningen) `project.clj`:
```clj
[clj-kafka "0.0.5-0.7-SNAPSHOT"]
```
2012-01-22 23:01:36 +00:00
## Usage
clj-kafka currently only supports Kafka 0.7.
### Producer
Allows batching of messages:
```clj
(use 'clj-kafka.producer)
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
(map #(.getBytes %))
(map message)))
```
Or sending a single message:
```clj
(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))
```
2012-01-22 23:01:36 +00:00
### SimpleConsumer
2012-01-22 23:24:41 +00:00
```clj
(use 'clj-kafka.consumer.simple)
2012-03-10 02:15:15 +00:00
(def c (consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))
(messages c f)
2012-01-22 23:01:36 +00:00
2012-01-22 23:24:41 +00:00
({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})
```
2012-01-22 23:01:36 +00:00
### Zookeeper Consumer
The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.
```clj
(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)
2012-03-10 02:15:15 +00:00
(def config {"zk.connect" "localhost:2181"
"groupid" "my-task-group"})
(with-resource [c (consumer config)]
shutdown
(take 5 (messages c "test")))
({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})
```
It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:
```clojure
(take 5 (messages c "test1" "test2"))
```
2012-01-22 23:01:36 +00:00
## License
Copyright &copy; 2012 Paul Ingles
Distributed under the Eclipse Public License, the same as Clojure.