1:// package scalax.future; 2: 3:/** A Future is a function of arity 0 that returns a value of type T. 4:It can be queried to find out if the value is available, if a failure 5:has occurred, and what the failure is, if failure happened. If the 6:value of the future is retrieved and a failure results, the failure 7:exception is thrown. */ 8:mixin class Future[T] extends Function0[T] { 9: def isSet: boolean; 10: def isFailure: boolean; 11: def failure: Throwable; 12:} 13: 14:/** A Promise is a placeholding Future, where the result 15:of the computation can be set externally. */ 16:mixin class Promise[T] extends Future[T] { 17: def set(t: T): unit; 18: def fail(ex: Throwable): unit; 19: def future: Future[T]; 20:} 21: 22:/** A Dependent is a calculated value dependent on other values. The 23:expression used to calculate a Dependent cannot be updated. If you want to 24:be able to change the expression, create a Cell instead. */ 25:mixin class Dependent[T] extends Future[T] { 26: var description: String = _; 27: def context: CellContext; 28: def unset: unit; 29:} 30: 31:/** A modifiable can track which other modifiable values it is 32:dependent on during an update, if the other modifiables are accessed 33:during a call to update. */ 34:mixin class Cell[T] extends Dependent[T] { 35: def update(t: => T): unit; 36:} 37: 38:/** A CellContext can track the dependencies between modifiable 39:objects it manufactures. */ 40:class CellContext { 41: 42: private val tlocal = new ThreadLocal(); 43: 44: /** Create a cell that uses a fixed formula to calculate its value. 45: The formula cannot be changed once set. */ 46: def formula[T](t: => T): Dependent[T] = new MCell[T] { 47: def cellType = "Formula"; 48: override def get: T = synchronized { 49: if (!isSet) 50: compute; 51: super.get 52: } 53: def doSet = super.set(t); 54: override def set(t: T): unit = error("Cannot call set on Cells. Use update instead."); 55: } 56: 57: /** Create a modifiable cell that will track its dependencies on other 58: modifiable values. If the update function is used to calculate the 59: contents AND all calculation involving modifiables is contained 60: within the expression, dependencies will be automatically managed. */ 61: implicit def cell[T](initial: => T): Cell[T] = new MCell[T] with Cell[T] { 62: var expression = new Function0[T] { 63: def apply() = initial; 64: } 65: def cellType = "Cell"; 66: override def get: T = synchronized { 67: if (!isSet) 68: compute; 69: super.get 70: } 71: def doSet = super.set(expression()); 72: def update(t: => T): unit = { 73: expression = new Function0[T] { 74: def apply() = t; 75: } ; 76: compute; 77: } 78: override def set(t: T): unit = error("Cannot call set on cell. Use update instead."); 79: } 80: 81: private def localStack = synchronized { 82: var stack = tlocal.get().asInstanceOf[java.util.Stack]; 83: if (stack == null) { 84: stack = new java.util.Stack; 85: tlocal.set(stack); 86: } 87: stack; 88: } 89: 90: abstract private class MCell[T] extends ExceptionSyncVar[T] with Dependent[T] { 91: 92: private var inputs: java.util.IdentityHashMap = _; 93: private var outputs: java.util.WeakHashMap = _; 94: 95: def context = CellContext.this; 96: 97: protected def cellType: String; 98: 99: def inputCount = if (inputs != null) inputs.size() else 0; 100: def outputCount = if (outputs != null) outputs.size() else 0; 101: 102: override def toString() = { 103: val sb = new StringBuffer(); 104: sb.append(cellType); 105: sb.append('('); 106: if (description != null) sb.append('"').append(description).append("\" "); 107: if (inputCount > 0) sb.append(inputCount).append("->"); 108: sb.append('[').append(get.toString()).append(']'); 109: if (outputCount > 0) sb.append("->").append(outputCount); 110: sb.append(')'); 111: sb.toString(); 112: } 113: 114: override def get: T = { 115: // If something else is calculating, connect dependencies 116: var stack = tlocal.get().asInstanceOf[java.util.Stack]; 117: if (stack != null && !stack.isEmpty) { 118: var dependent = stack.peek().asInstanceOf[MCell[All]]; 119: dependent.addInput(this); 120: addOutput(dependent); 121: } 122: 123: super.get; 124: } 125: 126: override def set(t: T) = { 127: super.set(t); 128: if (outputs != null) { 129: val iter = outputs.keySet().iterator(); 130: while (iter.hasNext()) 131: iter.next().asInstanceOf[MCell[All]].unset; 132: } 133: } 134: 135: def doSet: unit; 136: 137: def compute = { 138: clearInputs; 139: localStack.push(this); 140: try { 141: doSet; 142: } catch { 143: case t: Throwable => fail(t) 144: } finally { 145: localStack.pop(); 146: } 147: } 148: 149: private def addInput(i: AnyRef) = { 150: if (inputs == null) 151: inputs = new java.util.IdentityHashMap; 152: inputs.put(i, Nil); 153: } 154: 155: private def removeInput(i: AnyRef) = { 156: if (inputs != null) { 157: if (inputs.remove(i) != null && inputs.isEmpty()) 158: inputs = null; 159: } 160: } 161: 162: private def addOutput(o: AnyRef) = { 163: if (outputs == null) 164: outputs = new java.util.WeakHashMap; 165: outputs.put(o, Nil); 166: } 167: 168: private def removeOutput(o: AnyRef) = { 169: if (outputs != null) 170: if (outputs.remove(o) != null && outputs.isEmpty()) 171: outputs = null; 172: } 173: 174: def clearInputs = { 175: if (inputs != null) { 176: val iter = inputs.keySet().iterator(); 177: while (iter.hasNext()) 178: iter.next().asInstanceOf[MCell[All]].removeOutput(this); 179: inputs = null; 180: } 181: } 182: } 183:} 184: 185:object Future extends CellContext { 186: 187: /** Implicity convert arbitrary values of T into a Future[T]. */ 188: implicit def lazyConst[T](t: T): Future[T] = Ready(t); 189: 190: /** Retrieve the value, which may result in forcing the value if it is not already available. */ 191: implicit def fromFuture[T](lz: Future[T]): T = lz(); 192: 193: /** We want to be able to treat our lazy values like regular values when we're 194: iterating over them, and so forth. We define a kind of "reverse lifter" here 195: that wraps a function on T into a function on Future[T]. */ 196: implicit def lazyLift[T,B](tfunc: (T) => B): (Future[T]) => B = (lt) => tfunc(lt()); 197: 198: /** Directly construct a lazy value of type T. Note that using the anonymous class 199: this way seems to be the simplest way to prevent execution of "f" until we really 200: mean to, and still keep it around cleanly. */ 201: def lazy[T](f: => T): Future[T] = new Lazy[T] { 202: def func: T = f; 203: } 204: 205: /** Create a future that is failed from the start. */ 206: def fail[T](ex: Throwable): Future[T] = Failed(ex); 207: 208: /** Build a concurrent future, starting a thread to perform the computation. */ 209: def spawn[T](f: => T): Future[T] = new ExceptionSyncVar[T] { 210: val thread = new Thread() { 211: override def run() = { 212: try { 213: set(f) 214: } catch { 215: case ex: Throwable => fail(ex) 216: } 217: Console.println("Spawn done"); 218: } 219: } 220: thread.start 221: } 222: 223: /** Build a promise for a future value; the promise can be fulfilled 224: on this or any other thread. A thread getting the value will block 225: if the promise has not been fulfilled. */ 226: def promise[T]: Promise[T] = new ExceptionSyncVar[T] with Promise[T] { 227: def future = this; 228: override def toString = createString("Promise"); 229: } 230: 231:} 232: 233: 234:/* 235:mixin class Executor { 236: 237: def run(f: => unit): unit; 238: def run[A](f: A => unit)(a: A): unit = run( f(a) ); 239: def run[A,B](f: (A, B) => unit)(a: A, b: B): unit = run( f(a,b) ); 240: def run[A,B,C](f: (A, B, C) => unit)(a: A, b: B, c: C): unit = run( f(a,b,c) ); 241: 242: def submit[T](f: => T): Future[T]; 243: def submit[A,T](f: A => T)(a: A): Future[T] = submit( f(a) ); 244: 245: def invokeAll[T](tasks: Function0[T]*): Seq[Future[T]]; 246: def invokeAny[T](tasks: Function0[T]*): T; 247: 248: def isShutdown: boolean; 249: def isTerminated: boolean; 250: def shutdown: unit; 251: def shutdownNow: unit; 252: 253:} 254: 255:object Executors { 256: 257:} 258:*/ 259: 260:private class ExceptionSyncVar[T] extends Future[T] { 261: 262: private var ex: Throwable = _; 263: private var isDefined: Boolean = false; 264: private var value: T = _; 265: private var empty: T = _; 266: 267: def failure = ex; 268: override def apply() = get; 269: def isFailure = isDefined && (ex != null); 270: 271: def unset = synchronized { 272: ex = null; 273: isDefined = false; 274: value = empty; 275: } 276: def set(x: T) = synchronized { 277: value = x 278: isDefined = true 279: notifyAll() 280: } 281: def fail(e: Throwable): unit = synchronized { 282: ex = e 283: isDefined = true 284: notifyAll() 285: } 286: 287: def isSet: Boolean = synchronized { 288: isDefined 289: } 290: 291: def get = synchronized { 292: if (!isDefined) wait(); 293: if (ex != null) throw ex; 294: value 295: } 296: 297: override def toString = createString("ESync"); 298: 299: def createString(head: String) = 300: if (!isDefined) head + "(?)" 301: else if (ex != null) head + '(' + ex + ')' 302: else head + '(' + value + ')'; 303:} 304: 305:private abstract class Lazy[T] extends ExceptionSyncVar[T] { 306: def func: T; 307: override def get: T = synchronized { 308: if (!isSet) 309: try { 310: val ret = func; 311: set(ret); 312: ret 313: } catch { 314: case e: Throwable => fail(e) 315: throw e; 316: } 317: else 318: super.get 319: } 320: override def toString() = createString("Lazy"); 321:} 322: 323:case class Failed[T](ex: Throwable) extends Future[T] { 324: def apply(): T = throw ex; 325: def failure = ex; 326: def isFailure = true; 327: def isSet = true; 328:} 329: 330:case class Ready[T](v: T) extends Future[T] { 331: def apply() = v; 332: override def failure = null; 333: def isFailure = false; 334: def isSet = true; 335:} 336: 337: