No description
Find a file
2013-06-07 21:09:02 +01:00
autodoc@b818f4e6bc updated autodoc 2012-01-26 10:59:48 +00:00
dev-resources add log4j properties to reduce logging in dev/test; delete tmp directory before test run too 2013-06-07 20:58:23 +01:00
src/clj_kafka delete unused to-clojure definition 2013-06-07 21:09:02 +01:00
test/clj_kafka/test delete unused to-clojure definition 2013-06-07 21:09:02 +01:00
.gitignore clean the kafka log directory before using the broker in tests 2013-06-07 09:01:38 +01:00
.travis.yml add travis-ci yaml 2013-05-15 09:27:25 +01:00
project.clj add log4j properties to reduce logging in dev/test; delete tmp directory before test run too 2013-06-07 20:58:23 +01:00
README.md update readme and project.clj for 0.8 2013-06-06 17:09:27 +01:00

clj-kafka

Simple Clojure interface to Kafka.

It's currently a snapshot only until things flesh out a little more. API Documentation is also available.

Note: Kafka binaries are not currently published to any public repositories. Additionally, the 0.8 release was published as source. This library uses a build of the 0.8 incubator release published on Clojars.

Current build status: Build status

Installing

Add the following to your Leiningen project.clj:

[clj-kafka "0.1.0-0.8-SNAPSHOT"]

Usage

clj-kafka currently only supports Kafka 0.8.

Producer

Allows batching of messages:

(use 'clj-kafka.producer)

(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (->> ["message payload 1" "message payload 2"]
                             (map #(.getBytes %))
                             (map message)))

Or sending a single message:

(def p (producer {"zk.connect" "localhost:2181"}))
(send-messages p "test" (message (.getBytes "payload")))

SimpleConsumer

(use 'clj-kafka.consumer.simple)

(def c (consumer "localhost" 9092))
(def f (fetch "test" 0 0 4096))

(messages c f)

({:message {:crc 1513777821, :payload #<byte[] [B@3088890d>, :size 1089}, :offset 1093} {:message {:crc 4119364266, :payload #<byte[] [B@3088890d>, :size 968}, :offset 2065} {:message {:crc 3827222527, :payload #<byte[] [B@3088890d>, :size 1137}, :offset 3206})

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zookeeper.connect" "localhost:2181" 
             "group.id"    "my-task-group"
             "auto.offset.reset" "smallest"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test")))

({:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 3417370184, :payload #<byte[] [B@698b41da>, :size 22} {:crc 960674935, :payload #<byte[] [B@698b41da>, :size 86} {:crc 3651343620, :payload #<byte[] [B@698b41da>, :size 20} {:crc 2012604996, :payload #<byte[] [B@698b41da>, :size 20})

It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:

(take 5 (messages c "test1" "test2"))

License

Copyright © 2013 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.