No description
Find a file
2012-03-14 12:11:39 +00:00
autodoc@b818f4e6bc updated autodoc 2012-01-26 10:59:48 +00:00
src/clj_kafka fix a bug with extracting the byte array of the payload: didn't honour the length of the payload, just extracted the underlying array. 2012-03-14 12:10:39 +00:00
test/clj_kafka/test fix a bug with extracting the byte array of the payload: didn't honour the length of the payload, just extracted the underlying array. 2012-03-14 12:10:39 +00:00
.gitignore first commit 2012-01-22 23:01:36 +00:00
project.clj bump version- fix for payload data extraction 2012-03-14 12:11:39 +00:00
README.md Update README.md 2012-03-09 18:15:15 -08:00

clj-kafka

Simple Clojure interface to Kafka.

It's currently a snapshot only until things flesh out a little more. API Documentation is also available.

Note: Kafka binaries are not currently published to any public repositories. Additionally, the 0.7 release was published as source. This library uses a build of the 0.7 incubator release published on Clojars.

Installing

Add the following to your Leiningen project.clj:

[clj-kafka "0.0.2-0.7-SNAPSHOT"]

Usage

clj-kafka currently only supports Kafka 0.7.

Producer

Allows batching of messages:

(use 'clj-kafka.producer)

(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
                             (map #(.getBytes %))
                             (map message)))

Or sending a single message:

(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))

SimpleConsumer

(use 'clj-kafka.consumer.simple)

(def c (consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))

(messages c f)

({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zk.connect" "localhost:2181" 
             "groupid"    "my-task-group"})

(with-resource [c (consumer config)]
  shutdown
  (take 5 (messages c "test")))

({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})

License

Copyright © 2012 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.