From df01625a563650db145b43f63bb2343329446627 Mon Sep 17 00:00:00 2001 From: Paul Ingles Date: Wed, 14 Mar 2012 12:10:39 +0000 Subject: [PATCH] fix a bug with extracting the byte array of the payload: didn't honour the length of the payload, just extracted the underlying array. --- src/clj_kafka/core.clj | 9 +++++++-- test/clj_kafka/test/core.clj | 18 ++++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/clj_kafka/core.clj b/src/clj_kafka/core.clj index c1f5139..9252918 100644 --- a/src/clj_kafka/core.clj +++ b/src/clj_kafka/core.clj @@ -1,5 +1,6 @@ (ns clj-kafka.core - (:import [java.util Properties] + (:import [java.nio ByteBuffer] + [java.util Properties] [kafka.message MessageAndOffset Message])) (defn as-properties @@ -23,7 +24,11 @@ MessageAndOffset (to-clojure [x] {:message (to-clojure (.message x)) :offset (.offset x)}) + ByteBuffer + (to-clojure [x] (let [b (byte-array (.remaining x))] + (.get x b) + b)) Message (to-clojure [x] {:crc (.checksum x) - :payload (.array (.payload x)) + :payload (to-clojure (.payload x)) :size (.size x)})) diff --git a/test/clj_kafka/test/core.clj b/test/clj_kafka/test/core.clj index d25cfff..b3b174b 100644 --- a/test/clj_kafka/test/core.clj +++ b/test/clj_kafka/test/core.clj @@ -1,4 +1,18 @@ (ns clj-kafka.test.core - (:use [clj-kafka.core]) - (:use [clojure.test])) + (:import [java.nio ByteBuffer] + [kafka.message Message]) + (:use [clojure.test]) + (:use [clj-kafka.core] :reload)) +(deftest convert-message-payload-to-bytes + (let [bytes (byte-array (repeat 989 (byte 10))) + msg (Message. 100 bytes)] + ;; 1 magic byte + ;; 1 byte attribute identifier for annotations on message + ;; 4 bytes w/ CRC + ;; n bytes payload + ;; total = n + 4 + 1 + 1 bytes + (is (= 995 (:size (to-clojure msg)))) + + (is (= 989 (count (:payload (to-clojure msg))))) + (is (= (seq bytes) (seq (:payload (to-clojure msg))))))) \ No newline at end of file