No description
Find a file
Paul Ingles f62b4f21c4 Merge pull request #43 from kelveden/master
Update URL references to kafka incubator to use the 'real' kafka.
2015-02-18 12:05:15 +00:00
autodoc@b818f4e6bc updated autodoc 2012-01-26 10:59:48 +00:00
dev-resources change log4j to append to ./logs/kafka.log; tidy the zk consumer test 2013-06-09 07:51:34 +01:00
src/clj_kafka Update URL references to kafka incubator to use the 'real' kafka. 2015-02-18 11:15:18 +00:00
test/clj_kafka/test style tweak 2014-11-19 09:15:29 +00:00
.gitignore Upgraded kafka to 0.8.1.1 and zkclient to 0.4 2014-11-18 21:26:17 +00:00
.travis.yml update travis-ci to run only expectations 2013-06-09 09:18:19 +01:00
LICENSE Add EPL License file. 2014-10-23 18:04:32 +01:00
project.clj release 0.2.8-0.8.1.1, adds url and license for clojars promotion 2014-11-19 09:24:45 +00:00
README.md Update URL references to kafka incubator to use the 'real' kafka. 2015-02-18 11:15:18 +00:00

clj-kafka

Clojure library for Kafka.

Current build status: Build Status

Development is against the 0.8 release of Kafka. The protocols for 0.7 and 0.8 are incompatible so this will only work when connecting to a 0.8 cluster. Earlier releases of clj-kafka support the earlier 0.7 release if you need it.

Installing

Add the following to your Leiningen project.clj:

latest clj-kafka version

Usage

Producer

Discovery of Kafka brokers from Zookeeper:

(brokers {"zookeeper.connect" "127.0.0.1:2181"})
;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
(use 'clj-kafka.producer)

(def p (producer {"metadata.broker.list" "localhost:9999"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(send-message p (message "test" (.getBytes "this is my message")))

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zookeeper.connect" "localhost:2182"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test")))

License

Copyright © 2013 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.

Thanks

YourKit is kindly supporting this open source project with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: YourKit Java Profiler and YourKit .NET Profiler.