fix a bug with extracting the byte array of the payload: didn't honour the length of the payload, just extracted the underlying array.
This commit is contained in:
parent
49e0cb5f7f
commit
df01625a56
2 changed files with 23 additions and 4 deletions
|
@ -1,5 +1,6 @@
|
|||
(ns clj-kafka.core
|
||||
(:import [java.util Properties]
|
||||
(:import [java.nio ByteBuffer]
|
||||
[java.util Properties]
|
||||
[kafka.message MessageAndOffset Message]))
|
||||
|
||||
(defn as-properties
|
||||
|
@ -23,7 +24,11 @@
|
|||
MessageAndOffset
|
||||
(to-clojure [x] {:message (to-clojure (.message x))
|
||||
:offset (.offset x)})
|
||||
ByteBuffer
|
||||
(to-clojure [x] (let [b (byte-array (.remaining x))]
|
||||
(.get x b)
|
||||
b))
|
||||
Message
|
||||
(to-clojure [x] {:crc (.checksum x)
|
||||
:payload (.array (.payload x))
|
||||
:payload (to-clojure (.payload x))
|
||||
:size (.size x)}))
|
||||
|
|
|
@ -1,4 +1,18 @@
|
|||
(ns clj-kafka.test.core
|
||||
(:use [clj-kafka.core])
|
||||
(:use [clojure.test]))
|
||||
(:import [java.nio ByteBuffer]
|
||||
[kafka.message Message])
|
||||
(:use [clojure.test])
|
||||
(:use [clj-kafka.core] :reload))
|
||||
|
||||
(deftest convert-message-payload-to-bytes
|
||||
(let [bytes (byte-array (repeat 989 (byte 10)))
|
||||
msg (Message. 100 bytes)]
|
||||
;; 1 magic byte
|
||||
;; 1 byte attribute identifier for annotations on message
|
||||
;; 4 bytes w/ CRC
|
||||
;; n bytes payload
|
||||
;; total = n + 4 + 1 + 1 bytes
|
||||
(is (= 995 (:size (to-clojure msg))))
|
||||
|
||||
(is (= 989 (count (:payload (to-clojure msg)))))
|
||||
(is (= (seq bytes) (seq (:payload (to-clojure msg)))))))
|
Loading…
Reference in a new issue