Support for persisting data sets into MongoDB

This commit is contained in:
Antonio Garrote 2010-03-06 12:01:05 +01:00
parent df2fe1913f
commit c1c7ba0ac5
6 changed files with 175 additions and 9 deletions

View file

@ -6,4 +6,5 @@
[org.clojure/clojure-contrib "1.1.0"]
[lein-javac "0.0.2-SNAPSHOT"]
[incanter/incanter-full "1.0-master-SNAPSHOT"]
[com.mongodb/mongo "1.0"]
[weka/weka "3.6.2"]])

132
src/clj_ml/data_store.clj Normal file
View file

@ -0,0 +1,132 @@
;;
;; Distance functions
;; @author Antonio Garrote
;;
(ns clj-ml.data-store
(:use [clj-ml utils data])
(:import (com.mongodb Mongo DB BasicDBObject DBCollection DBCursor)))
(defn keywords-to-strings [format]
"Recursively transforms all keywords into strings"
(if (keyword? format)
(key-to-str format)
(if (map? format)
(loop [acum {}
ks (keys format)]
(if (empty? ks)
acum
(recur (conj {(key-to-str (first ks))
(keywords-to-strings (get format (first ks)))}
acum)
(rest ks))))
(if (sequential? format)
(map #(keywords-to-strings %1) format)
format))))
(defmulti make-data-store-connection
"Connects to a data store"
(fn [kind params] kind))
(defmethod make-data-store-connection :mongodb
([kind params]
(let [_foo1 (println (:host params))
_foo2 (println (:port params))]
(new Mongo (:host params) (:port params)))
))
(defmulti data-store-connection-db
"Returns a DB from the stablished connection"
(fn [kind connection name & params] kind))
(defmethod data-store-connection-db :mongodb
([kind connection name & params]
(.getDB connection name)))
;; High level API
(def *clj-ml-datasets* "-clj-ml-datasets")
(def *clj-ml-format-suffix* "-clj-ml-schema")
(def *clj-ml-instances-suffix* "-clj-ml-instances")
(defmulti data-store-save-dataset
"Persists a whole dataset in the data store"
(fn [kind database dataset & options] kind))
(defmethod data-store-save-dataset :mongodb
([kind database dataset & options]
(let [format (dataset-format dataset)
name (md5-sum (dataset-name dataset))
datasets-collection (.getCollection database *clj-ml-datasets*)
schema-collection (.getCollection database (str name *clj-ml-format-suffix*))
data-collection-tmp (.getCollection database (str name *clj-ml-instances-suffix*))
format-to-insert (new BasicDBObject {"format" (keywords-to-strings format)})]
(.remove datasets-collection (new BasicDBObject {"id" name}))
(.insert datasets-collection (new BasicDBObject {"id" name}))
(.remove schema-collection format-to-insert)
(.insert schema-collection format-to-insert)
(when (not (nil? data-collection-tmp))
(.drop data-collection-tmp))
(let [data-collection (.getCollection database (str name *clj-ml-instances-suffix*))]
(for [i (dataset-seq dataset)]
(.insert data-collection (new BasicDBObject (keywords-to-strings {"instance" (instance-to-vector i)}))))))))
(declare mongo-persisted-instance-to-map)
(defn- mongo-persisted-instance-to-vector
"Transforms an instance persisted in a mongodb database back to a vector"
([inst] (mongo-persisted-instance-to-vector inst false))
([inst use-keys?]
(loop [vals (.toMap inst)
acum []]
(if (empty? vals)
acum
(recur (rest vals)
(conj acum (let [tmp-val (.getValue (first vals))]
(if (= (class tmp-val) com.mongodb.BasicDBObject)
(mongo-persisted-instance-to-map tmp-val use-keys?)
(if (string? tmp-val)
(if use-keys? (keyword tmp-val) tmp-val)
tmp-val)))))))))
(defn- mongo-persisted-instance-to-map
"Transforms an instance persisted in a mongodb database back to a vector"
([inst] (mongo-persisted-instance-to-map inst false))
([inst use-keys?]
(loop [mp (.toMap inst)
vals (keys mp)
acum {}]
(if (empty? vals)
acum
(recur mp
(rest vals)
(conj acum {(keyword (first vals)) (let [tmp-val (get mp (first vals))]
(if (= (class tmp-val) com.mongodb.BasicDBObject)
(mongo-persisted-instance-to-map tmp-val use-keys?)
(if (= (class tmp-val) com.mongodb.BasicDBList)
(mongo-persisted-instance-to-vector tmp-val use-keys?)
tmp-val)))}))))))
(defmulti data-store-load-dataset
"Load a whole dataset from a data store"
(fn [kind database database-name & options] kind))
(defmethod data-store-load-dataset :mongodb
([kind database database-name & options]
(let [dsf (str (md5-sum database-name) *clj-ml-format-suffix*)
col (.getCollection database dsf)
format (mongo-persisted-instance-to-vector (get (.next (.find col)) "format") true)
dsi (str (md5-sum database-name) *clj-ml-instances-suffix*)
coli (.getCollection database dsi)
cursor (.find coli)
insts (loop [cont (.hasNext cursor)
acum []]
(if cont
(let [exp (get (. (.next cursor) toMap) "instance")]
(recur (.hasNext cursor)
(conj acum (mongo-persisted-instance-to-vector exp))))
acum))]
(make-dataset database-name format insts))))

View file

@ -14,7 +14,7 @@
(defmulti load-instances
"Load instances from different data sources"
(fn [kind source] kind))
(fn [kind source & options] kind))
(defmacro m-load-instances [loader source]
`(do
@ -25,27 +25,32 @@
(.getDataSet ~loader)))
(defmethod load-instances :arff
([kind source]
([kind source & options]
(let [loader (new ArffLoader)]
(m-load-instances loader source))))
(defmethod load-instances :xrff
([kind source]
([kind source & options]
(let [loader (new XRFFLoader)]
(m-load-instances loader source))))
(defmethod load-instances :csv
([kind source]
([kind source & options]
(let [loader (new CSVLoader)]
(m-load-instances loader source))))
(defmethod load-instances :mongodb
([kind source & options]
(let [database {:database source}
name {:dataset-name source}]
(clj-ml.data-store/data-store-load-dataset :mongodb database name options))))
;; Saving of instances
(defmulti save-instances
"Save instances into data destinies"
(fn [kind destiny instances] kind))
(fn [kind destiny instances & options] kind))
(defmacro m-save-instances [saver destiny instances]
`(do
@ -57,17 +62,20 @@
(.writeBatch ~saver)))
(defmethod save-instances :arff
([kind destiny instances]
([kind destiny instances & options]
(let [saver (new ArffSaver)]
(m-save-instances saver destiny instances))))
(defmethod save-instances :xrff
([kind destiny instances]
([kind destiny instances & options]
(let [saver (new XRFFSaver)]
(m-save-instances saver destiny instances))))
(defmethod save-instances :csv
([kind destiny instances]
([kind destiny instances & options]
(let [saver (new CSVSaver)]
(m-save-instances saver destiny instances))))
(defmethod save-instances :mongodb
([kind destiny instances & options]
(clj-ml.data-store/data-store-save-dataset :mongodb destiny instances options)))

View file

@ -192,4 +192,7 @@
; (add-classpath "file:///Users/antonio.garrote/Development/old/clj-ml/lib/jcommon-1.0.16.jar")
; (add-classpath "file:///Users/antonio.garrote/Development/old/clj-ml/lib/netlib-java-0.9.1.jar")
; (add-classpath "file:///Users/antonio.garrote/Development/old/clj-ml/lib/processing-core-1.jar")
; (add-classpath"file:///Users/antonio.garrote/Development/old/clj-ml/lib/congomongo-0.1.1-20091229.021828-1.jar")
; (add-classpath"file:///Users/antonio.garrote/Development/old/clj-ml/lib/mongo-1.0.jar")
; (add-classpath"file:///Users/antonio.garrote/Development/old/clj-ml/lib/mongo-java-driver-1.1.0-20091229.021828-3.jar")
; ))

View file

@ -6,7 +6,10 @@
(ns clj-ml.utils
(:import (java.io ObjectOutputStream ByteArrayOutputStream
ByteArrayInputStream ObjectInputStream
FileOutputStream FileInputStream)))
FileOutputStream FileInputStream)
(java.security
NoSuchAlgorithmException
MessageDigest)))
(defn key-to-str
@ -41,6 +44,17 @@
(rest ks))))))
(defn md5-sum
"Compute the hex MD5 sum of a string."
[#^String str]
(let [alg (doto (MessageDigest/getInstance "MD5")
(.reset)
(.update (.getBytes str)))]
(try
(.toString (new BigInteger 1 (.digest alg)) 16)
(catch NoSuchAlgorithmException e
(throw (new RuntimeException e))))))
;; Manipulation of array of options
(defn check-option [opts val flag map]

View file

@ -0,0 +1,8 @@
(ns clj-ml.data-store-test
(:use [clj-ml.data-store] :reload-all)
(:use [clojure.test]))
(deftest make-instance-num
(is (= (keywords-to-strings
[1 :hola {:a [:b {:d "hola"}]}])
'(1 "hola" {"a" ("b" {"d" "hola"})}))))