From 2171610292268845459fce1afe7a57c93e7e90d7 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Sun, 20 Jul 2014 14:41:41 -0400 Subject: [PATCH] Moved to new Reactive implementation. --- src/clojure/freactive/core.clj | 112 +++++++++----------------- src/java/clojure/lang/Reactive.java | 114 +++++++++------------------ src/java/clojure/lang/Reactive2.java | 86 -------------------- 3 files changed, 76 insertions(+), 236 deletions(-) delete mode 100644 src/java/clojure/lang/Reactive2.java diff --git a/src/clojure/freactive/core.clj b/src/clojure/freactive/core.clj index 18e44a3..3ddaf1b 100644 --- a/src/clojure/freactive/core.clj +++ b/src/clojure/freactive/core.clj @@ -1,7 +1,7 @@ (ns freactive.core (:refer-clojure :exclude [atom agent ref swap! reset! compare-and-set!]) - (:import [clojure.lang ReactiveAtom Reactive Reactive2 StatefulReactive])) + (:import [clojure.lang ReactiveAtom Reactive StatefulReactive])) ;; Copying clojure.core atom stuff here so that we can use my ReactiveAtom class. @@ -50,86 +50,50 @@ current value. Returns newval." :static true} [^clojure.lang.ReactiveAtom atom newval] (.reset atom newval)) -(defn- configure-reactive [r f options] - (let [{:keys [simple deps]} (apply hash-map options) - simple (or simple (:simple (meta f)))] - (when simple - (.setSimple r true)) - (when deps - (.setDeps r deps) - (.setSimple r true)) - (#'clojure.core/setup-reference r options))) - (defn reactive [f & options] - (configure-reactive (Reactive. f) f options)) + (#'clojure.core/setup-reference (Reactive. f) options)) -(defn reactive2 [f & options] - (#'clojure.core/setup-reference (Reactive2. f) options)) - ;; (defn stateful-reactive [f & options] ;; (configure-reactive (StatefulReactive. f) options)) -(defn simple [f] (with-meta f (merge (meta f) {:simple true}))) - (import '(java.util TimerTask Timer)) -;; (defn test1 [] -;; (let [a (atom 0) -;; b (atom 0) -;; c (reactive (simple (fn [] (+ @a @b)))) -;; d (atom 0) -;; e (reactive (simple (fn [] (+ @c @d)))) -;; f (reactive (fn [] (if (even? @a) @b @c))) -;; task (proxy [TimerTask] [] -;; (run [] @f @e @c ))] -;; (. (new Timer) (schedule task (long 1))) -;; (println "Reactive") -;; (time -;; (dotimes [i 2000000] -;; (swap! a inc) -;; (swap! b inc) -;; (swap! d inc))) -;; (assert (= (+ @a @b) @c) -;; (= (+ @c @d) @e)))) +(defn test1 [] + (let [a (atom 0) + b (atom 0) + c (reactive (fn [] (+ @a @b))) + d (atom 0) + e (reactive (fn [] (+ @c @d))) + f (reactive (fn [] (if (even? @a) @b @c))) + task (proxy [TimerTask] [] + (run [] @f @e @c ))] + (. (new Timer) (schedule task (long 1))) + (println "Reactive") + (time + (dotimes [i 2000000] + (swap! a inc) + (swap! b inc) + (swap! d inc))) + (assert (= (+ @a @b) @c) + (= (+ @c @d) @e)))) -;; (test1) +(test1) -;; (defn test2 [] -;; (let [a (atom 0) -;; b (atom 0) -;; c (fn [] (+ @a @b)) -;; d (atom 0) -;; e (fn [] (+ (c) @d)) -;; f (fn [] (if (even? @a) @b (c))) -;; task (proxy [TimerTask] [] -;; (run [] (f) (e) (c)))] -;; (. (new Timer) (schedule task (long 1))) -;; (println "Non-reactive") -;; (time -;; (dotimes [i 2000000] -;; (swap! a inc) -;; (swap! b inc) -;; (swap! d inc))))) +(defn test2 [] + (let [a (atom 0) + b (atom 0) + c (fn [] (+ @a @b)) + d (atom 0) + e (fn [] (+ (c) @d)) + f (fn [] (if (even? @a) @b (c))) + task (proxy [TimerTask] [] + (run [] (f) (e) (c)))] + (. (new Timer) (schedule task (long 1))) + (println "Non-reactive") + (time + (dotimes [i 2000000] + (swap! a inc) + (swap! b inc) + (swap! d inc))))) -;; (test2) - -;; (defn test3 [] -;; (let [a (atom 0) -;; b (atom 0) -;; c (reactive2 (fn [] (+ @a @b))) -;; d (atom 0) -;; e (reactive2 (fn [] (+ @c @d))) -;; f (reactive2 (fn [] (if (even? @a) @b @c))) -;; task (proxy [TimerTask] [] -;; (run [] @f @e @c ))] -;; (. (new Timer) (schedule task (long 1))) -;; (println "Reactive") -;; (time -;; (dotimes [i 2000000] -;; (swap! a inc) -;; (swap! b inc) -;; (swap! d inc))) -;; (assert (= (+ @a @b) @c) -;; (= (+ @c @d) @e)))) - -;; (test3) +(test2) diff --git a/src/java/clojure/lang/Reactive.java b/src/java/clojure/lang/Reactive.java index 717bbc8..4e23df5 100644 --- a/src/java/clojure/lang/Reactive.java +++ b/src/java/clojure/lang/Reactive.java @@ -8,25 +8,14 @@ import java.util.Iterator; public class Reactive extends ARef implements IReactive { private final AtomicBoolean dirty = new AtomicBoolean(true); - protected final AtomicReference currentValue = new AtomicReference(null); - protected IFn func; - private final HashMap deps = new HashMap(); - private boolean simple = false; - private boolean depsRegistered = false; - class RegisterDep extends AFn { - public Object invoke(Object obj) { - ARef aref = (obj instanceof ARef ? (ARef)obj : null); - if(aref == null) return null; - if(!deps.containsKey(aref)) { - deps.put(aref, true); - aref.addWatch(sully, sully); - } - return null; - } + protected final AtomicReference state = new AtomicReference(null); + + protected IFn func; + + public Reactive(IFn func) { + this.func = func; } - - private final RegisterDep registerDep = this.new RegisterDep(); public final static Var REGISTER_DEP = Var.intern(Namespace.findOrCreate(Symbol.intern("freactive.core")), @@ -35,14 +24,30 @@ public class Reactive extends ARef implements IReactive { public static void registerDep(IRef ref) { Object v = REGISTER_DEP.deref(); if(v != null) { - ((IFn)v).invoke(ref); + ((RegisterDep)v).register(ref); } } + class RegisterDep extends AFn { + public void register(IRef ref) { + ref.addWatch(sully, sully); + } + + public Object invoke(Object obj) { + IRef ref = (obj instanceof IRef ? (IRef)obj : null); + if(ref == null) return null; + register(ref); + return null; + } + } + + private final RegisterDep registerDep = this.new RegisterDep(); + class Sully extends AFn { public Object invoke(Object key, Object ref, Object oldVal, Object newVal) { + ((ARef)ref).removeWatch(key); if(dirty.compareAndSet(false, true)) { - Object cur = currentValue.get(); + Object cur = state.get(); notifyWatches(cur, cur); } return null; @@ -50,12 +55,6 @@ public class Reactive extends ARef implements IReactive { } private final Sully sully = this.new Sully(); - - - - public Reactive(IFn func) { - this.func = func; - } protected Object compute() { return func.invoke(); @@ -65,60 +64,23 @@ public class Reactive extends ARef implements IReactive { registerDep(this); if(!dirty.get()) - return currentValue.get(); - - synchronized(this) { - if(!dirty.get()) - return currentValue.get(); - - Object newVal; - - if(simple && depsRegistered) { - try { - Var.pushThreadBindings(RT.map(REGISTER_DEP, null)); - dirty.set(false); - newVal = compute(); - } - finally { - Var.popThreadBindings(); - } - validate(newVal); - currentValue.set(newVal); - return newVal; - } - - try { - Var.pushThreadBindings(RT.map(REGISTER_DEP, registerDep)); + return state.get(); + + try { + Var.pushThreadBindings(RT.map(REGISTER_DEP, registerDep)); + for(; ;) { dirty.set(false); - newVal = compute(); - } - finally { - Var.popThreadBindings(); - } - validate(newVal); - currentValue.set(newVal); - - Iterator> it = deps.entrySet().iterator(); - while(it.hasNext()) { - Map.Entry entry = it.next(); - if(!entry.getValue()) { - entry.getKey().removeWatch(sully); - it.remove(); - } else { - entry.setValue(false); + Object v = state.get(); + Object newv = compute(); + validate(newv); + if(state.compareAndSet(v, newv)) { + notifyWatches(v, newv); + return newv; } } - return newVal; + } + finally { + Var.popThreadBindings(); } } - - public boolean getSimple() { return simple; } - - public void setSimple(boolean val) { simple = val; } - - public void setDeps(Seqable val) { - for(ISeq s = val.seq(); s != null; s = s.next()) - registerDep.invoke(s.first()); - depsRegistered = true; - } } diff --git a/src/java/clojure/lang/Reactive2.java b/src/java/clojure/lang/Reactive2.java deleted file mode 100644 index 3cba98d..0000000 --- a/src/java/clojure/lang/Reactive2.java +++ /dev/null @@ -1,86 +0,0 @@ -package clojure.lang; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.HashMap; -import java.util.Map; -import java.util.Iterator; - -public class Reactive2 extends ARef implements IReactive { - private final AtomicBoolean dirty = new AtomicBoolean(true); - - protected final AtomicReference state = new AtomicReference(null); - - protected IFn func; - - public Reactive2(IFn func) { - this.func = func; - } - - public final static Var REGISTER_DEP = - Var.intern(Namespace.findOrCreate(Symbol.intern("freactive.core")), - Symbol.intern("*register-dep*"), null, false).setDynamic(); - - public static void registerDep(IRef ref) { - Object v = REGISTER_DEP.deref(); - if(v != null) { - ((RegisterDep)v).register(ref); - } - } - - class RegisterDep extends AFn { - public void register(IRef ref) { - ref.addWatch(sully, sully); - } - - public Object invoke(Object obj) { - IRef ref = (obj instanceof IRef ? (IRef)obj : null); - if(ref == null) return null; - register(ref); - return null; - } - } - - private final RegisterDep registerDep = this.new RegisterDep(); - - class Sully extends AFn { - public Object invoke(Object key, Object ref, Object oldVal, Object newVal) { - ((ARef)ref).removeWatch(key); - if(dirty.compareAndSet(false, true)) { - Object cur = state.get(); - notifyWatches(cur, cur); - } - return null; - } - } - - private final Sully sully = this.new Sully(); - - protected Object compute() { - return func.invoke(); - } - - public Object deref() { - registerDep(this); - - if(!dirty.get()) - return state.get(); - - try { - Var.pushThreadBindings(RT.map(REGISTER_DEP, registerDep)); - for(; ;) { - dirty.set(false); - Object v = state.get(); - Object newv = compute(); - validate(newv); - if(state.compareAndSet(v, newv)) { - notifyWatches(v, newv); - return newv; - } - } - } - finally { - Var.popThreadBindings(); - } - } -}