diff --git a/src/clj_kafka/consumer/zk.clj b/src/clj_kafka/consumer/zk.clj index 944c546..22bc64e 100644 --- a/src/clj_kafka/consumer/zk.clj +++ b/src/clj_kafka/consumer/zk.clj @@ -1,7 +1,7 @@ (ns clj-kafka.consumer.zk (:import [kafka.consumer ConsumerConfig Consumer KafkaStream] [kafka.javaapi.consumer ConsumerConnector]) - (:use [clj-kafka.core :only (as-properties to-clojure with-resource pipe)]) + (:use [clj-kafka.core :only (as-properties to-clojure with-resource)]) (:require [zookeeper :as zk])) (defn consumer diff --git a/src/clj_kafka/core.clj b/src/clj_kafka/core.clj index 5420256..2bbc3cd 100644 --- a/src/clj_kafka/core.clj +++ b/src/clj_kafka/core.clj @@ -61,22 +61,3 @@ TopicMetadataResponse (to-clojure [x] (map to-clojure (.topicsMetadata x)))) - -(defn pipe - "Returns a vector containing a sequence that will read from the - queue, and a function that inserts items into the queue. - - Source: http://clj-me.cgrand.net/2010/04/02/pipe-dreams-are-not-necessarily-made-of-promises/" - ([] (pipe 100)) - ([size] - (let [q (java.util.concurrent.LinkedBlockingQueue. ^int size) - EOQ (Object.) - NIL (Object.) - s (fn queue-seq [] (lazy-seq - (let [x (.take q)] - (when-not (= EOQ x) - (cons (when-not (= NIL x) x) - (queue-seq))))))] - [(s) (fn queue-put - ([] (.put q EOQ)) - ([x] (.put q (or x NIL))))])))