Moved to new Reactive implementation.

This commit is contained in:
Aaron Craelius 2014-07-20 14:41:41 -04:00
parent a4873c6b66
commit 2171610292
3 changed files with 76 additions and 236 deletions

View file

@ -1,7 +1,7 @@
(ns freactive.core
(:refer-clojure
:exclude [atom agent ref swap! reset! compare-and-set!])
(:import [clojure.lang ReactiveAtom Reactive Reactive2 StatefulReactive]))
(:import [clojure.lang ReactiveAtom Reactive StatefulReactive]))
;; Copying clojure.core atom stuff here so that we can use my ReactiveAtom class.
@ -50,86 +50,50 @@ current value. Returns newval."
:static true}
[^clojure.lang.ReactiveAtom atom newval] (.reset atom newval))
(defn- configure-reactive [r f options]
(let [{:keys [simple deps]} (apply hash-map options)
simple (or simple (:simple (meta f)))]
(when simple
(.setSimple r true))
(when deps
(.setDeps r deps)
(.setSimple r true))
(#'clojure.core/setup-reference r options)))
(defn reactive [f & options]
(configure-reactive (Reactive. f) f options))
(#'clojure.core/setup-reference (Reactive. f) options))
(defn reactive2 [f & options]
(#'clojure.core/setup-reference (Reactive2. f) options))
;; (defn stateful-reactive [f & options]
;; (configure-reactive (StatefulReactive. f) options))
(defn simple [f] (with-meta f (merge (meta f) {:simple true})))
(import '(java.util TimerTask Timer))
;; (defn test1 []
;; (let [a (atom 0)
;; b (atom 0)
;; c (reactive (simple (fn [] (+ @a @b))))
;; d (atom 0)
;; e (reactive (simple (fn [] (+ @c @d))))
;; f (reactive (fn [] (if (even? @a) @b @c)))
;; task (proxy [TimerTask] []
;; (run [] @f @e @c ))]
;; (. (new Timer) (schedule task (long 1)))
;; (println "Reactive")
;; (time
;; (dotimes [i 2000000]
;; (swap! a inc)
;; (swap! b inc)
;; (swap! d inc)))
;; (assert (= (+ @a @b) @c)
;; (= (+ @c @d) @e))))
(defn test1 []
(let [a (atom 0)
b (atom 0)
c (reactive (fn [] (+ @a @b)))
d (atom 0)
e (reactive (fn [] (+ @c @d)))
f (reactive (fn [] (if (even? @a) @b @c)))
task (proxy [TimerTask] []
(run [] @f @e @c ))]
(. (new Timer) (schedule task (long 1)))
(println "Reactive")
(time
(dotimes [i 2000000]
(swap! a inc)
(swap! b inc)
(swap! d inc)))
(assert (= (+ @a @b) @c)
(= (+ @c @d) @e))))
;; (test1)
(test1)
;; (defn test2 []
;; (let [a (atom 0)
;; b (atom 0)
;; c (fn [] (+ @a @b))
;; d (atom 0)
;; e (fn [] (+ (c) @d))
;; f (fn [] (if (even? @a) @b (c)))
;; task (proxy [TimerTask] []
;; (run [] (f) (e) (c)))]
;; (. (new Timer) (schedule task (long 1)))
;; (println "Non-reactive")
;; (time
;; (dotimes [i 2000000]
;; (swap! a inc)
;; (swap! b inc)
;; (swap! d inc)))))
(defn test2 []
(let [a (atom 0)
b (atom 0)
c (fn [] (+ @a @b))
d (atom 0)
e (fn [] (+ (c) @d))
f (fn [] (if (even? @a) @b (c)))
task (proxy [TimerTask] []
(run [] (f) (e) (c)))]
(. (new Timer) (schedule task (long 1)))
(println "Non-reactive")
(time
(dotimes [i 2000000]
(swap! a inc)
(swap! b inc)
(swap! d inc)))))
;; (test2)
;; (defn test3 []
;; (let [a (atom 0)
;; b (atom 0)
;; c (reactive2 (fn [] (+ @a @b)))
;; d (atom 0)
;; e (reactive2 (fn [] (+ @c @d)))
;; f (reactive2 (fn [] (if (even? @a) @b @c)))
;; task (proxy [TimerTask] []
;; (run [] @f @e @c ))]
;; (. (new Timer) (schedule task (long 1)))
;; (println "Reactive")
;; (time
;; (dotimes [i 2000000]
;; (swap! a inc)
;; (swap! b inc)
;; (swap! d inc)))
;; (assert (= (+ @a @b) @c)
;; (= (+ @c @d) @e))))
;; (test3)
(test2)

View file

@ -8,25 +8,14 @@ import java.util.Iterator;
public class Reactive extends ARef implements IReactive {
private final AtomicBoolean dirty = new AtomicBoolean(true);
protected final AtomicReference currentValue = new AtomicReference(null);
protected IFn func;
private final HashMap<ARef, Boolean> deps = new HashMap<ARef, Boolean>();
private boolean simple = false;
private boolean depsRegistered = false;
class RegisterDep extends AFn {
public Object invoke(Object obj) {
ARef aref = (obj instanceof ARef ? (ARef)obj : null);
if(aref == null) return null;
if(!deps.containsKey(aref)) {
deps.put(aref, true);
aref.addWatch(sully, sully);
}
return null;
}
protected final AtomicReference state = new AtomicReference(null);
protected IFn func;
public Reactive(IFn func) {
this.func = func;
}
private final RegisterDep registerDep = this.new RegisterDep();
public final static Var REGISTER_DEP =
Var.intern(Namespace.findOrCreate(Symbol.intern("freactive.core")),
@ -35,14 +24,30 @@ public class Reactive extends ARef implements IReactive {
public static void registerDep(IRef ref) {
Object v = REGISTER_DEP.deref();
if(v != null) {
((IFn)v).invoke(ref);
((RegisterDep)v).register(ref);
}
}
class RegisterDep extends AFn {
public void register(IRef ref) {
ref.addWatch(sully, sully);
}
public Object invoke(Object obj) {
IRef ref = (obj instanceof IRef ? (IRef)obj : null);
if(ref == null) return null;
register(ref);
return null;
}
}
private final RegisterDep registerDep = this.new RegisterDep();
class Sully extends AFn {
public Object invoke(Object key, Object ref, Object oldVal, Object newVal) {
((ARef)ref).removeWatch(key);
if(dirty.compareAndSet(false, true)) {
Object cur = currentValue.get();
Object cur = state.get();
notifyWatches(cur, cur);
}
return null;
@ -50,12 +55,6 @@ public class Reactive extends ARef implements IReactive {
}
private final Sully sully = this.new Sully();
public Reactive(IFn func) {
this.func = func;
}
protected Object compute() {
return func.invoke();
@ -65,60 +64,23 @@ public class Reactive extends ARef implements IReactive {
registerDep(this);
if(!dirty.get())
return currentValue.get();
synchronized(this) {
if(!dirty.get())
return currentValue.get();
Object newVal;
if(simple && depsRegistered) {
try {
Var.pushThreadBindings(RT.map(REGISTER_DEP, null));
dirty.set(false);
newVal = compute();
}
finally {
Var.popThreadBindings();
}
validate(newVal);
currentValue.set(newVal);
return newVal;
}
try {
Var.pushThreadBindings(RT.map(REGISTER_DEP, registerDep));
return state.get();
try {
Var.pushThreadBindings(RT.map(REGISTER_DEP, registerDep));
for(; ;) {
dirty.set(false);
newVal = compute();
}
finally {
Var.popThreadBindings();
}
validate(newVal);
currentValue.set(newVal);
Iterator<Map.Entry<ARef, Boolean>> it = deps.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<ARef, Boolean> entry = it.next();
if(!entry.getValue()) {
entry.getKey().removeWatch(sully);
it.remove();
} else {
entry.setValue(false);
Object v = state.get();
Object newv = compute();
validate(newv);
if(state.compareAndSet(v, newv)) {
notifyWatches(v, newv);
return newv;
}
}
return newVal;
}
finally {
Var.popThreadBindings();
}
}
public boolean getSimple() { return simple; }
public void setSimple(boolean val) { simple = val; }
public void setDeps(Seqable val) {
for(ISeq s = val.seq(); s != null; s = s.next())
registerDep.invoke(s.first());
depsRegistered = true;
}
}

View file

@ -1,86 +0,0 @@
package clojure.lang;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
public class Reactive2 extends ARef implements IReactive {
private final AtomicBoolean dirty = new AtomicBoolean(true);
protected final AtomicReference state = new AtomicReference(null);
protected IFn func;
public Reactive2(IFn func) {
this.func = func;
}
public final static Var REGISTER_DEP =
Var.intern(Namespace.findOrCreate(Symbol.intern("freactive.core")),
Symbol.intern("*register-dep*"), null, false).setDynamic();
public static void registerDep(IRef ref) {
Object v = REGISTER_DEP.deref();
if(v != null) {
((RegisterDep)v).register(ref);
}
}
class RegisterDep extends AFn {
public void register(IRef ref) {
ref.addWatch(sully, sully);
}
public Object invoke(Object obj) {
IRef ref = (obj instanceof IRef ? (IRef)obj : null);
if(ref == null) return null;
register(ref);
return null;
}
}
private final RegisterDep registerDep = this.new RegisterDep();
class Sully extends AFn {
public Object invoke(Object key, Object ref, Object oldVal, Object newVal) {
((ARef)ref).removeWatch(key);
if(dirty.compareAndSet(false, true)) {
Object cur = state.get();
notifyWatches(cur, cur);
}
return null;
}
}
private final Sully sully = this.new Sully();
protected Object compute() {
return func.invoke();
}
public Object deref() {
registerDep(this);
if(!dirty.get())
return state.get();
try {
Var.pushThreadBindings(RT.map(REGISTER_DEP, registerDep));
for(; ;) {
dirty.set(false);
Object v = state.get();
Object newv = compute();
validate(newv);
if(state.compareAndSet(v, newv)) {
notifyWatches(v, newv);
return newv;
}
}
}
finally {
Var.popThreadBindings();
}
}
}