No description
Find a file
2013-12-15 20:55:05 +00:00
autodoc@b818f4e6bc updated autodoc 2012-01-26 10:59:48 +00:00
dev-resources change log4j to append to ./logs/kafka.log; tidy the zk consumer test 2013-06-09 07:51:34 +01:00
src/clj_kafka update tests and fix controller fn to extract the controller brokerid 2013-12-15 20:55:05 +00:00
test/clj_kafka/test update tests and fix controller fn to extract the controller brokerid 2013-12-15 20:55:05 +00:00
.gitignore change log4j to append to ./logs/kafka.log; tidy the zk consumer test 2013-06-09 07:51:34 +01:00
.travis.yml update travis-ci to run only expectations 2013-06-09 09:18:19 +01:00
project.clj updates for kafka 0.8 release; few changes in zk structures so needs more work 2013-12-15 19:39:27 +00:00
README.md change release number in README 2013-09-19 11:22:14 +01:00

clj-kafka

Clojure library for Kafka.

Current build status: Build Status

Development is against the upcoming 0.8 release of Kafka. The protocols for 0.7 and 0.8 are incompatible so this will only work when connecting to a 0.8 cluster.

Installing

Given 0.8 is still unreleased we're pushing SNAPSHOT releases.

Add the following to your Leiningen project.clj:

[clj-kafka "0.1.0-0.8-beta1"]

Usage

Producer

Discovery of Kafka brokers from Zookeeper:

(brokers {"zookeeper.connect" "127.0.0.1:2181"})
;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
(use 'clj-kafka.producer)

(def p (producer {"metadata.broker.list" "localhost:9999"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(send-message p (keyed-message "test" (.getBytes "this is my message")))

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zookeeper.connect" "localhost:2182"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c ["test"])))

It's also now possible to consume messages from multiple topics at the same time. These are aggregated and returned as a single sequence:

(take 5 (messages c ["test1" "test2"]))

License

Copyright © 2013 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.