1:// package scalax.future;
   2:
   3:/** A Future is a function of arity 0 that returns a value of type T.
   4:It can be queried to find out if the value is available, if a failure
   5:has occurred, and what the failure is, if failure happened.  If the
   6:value of the future is retrieved and a failure results, the failure
   7:exception is thrown. */
   8:mixin class Future[T] extends Function0[T] {
   9:  def isSet: boolean;
  10:  def isFailure: boolean;
  11:  def failure: Throwable;
  12:}
  13:
  14:/** A Promise is a placeholding Future, where the result
  15:of the computation can be set externally.  */
  16:mixin class Promise[T] extends Future[T] {
  17:  def set(t: T): unit;
  18:  def fail(ex: Throwable): unit;
  19:  def future: Future[T];
  20:}
  21:
  22:/** A Dependent is a calculated value dependent on other values.  The
  23:expression used to calculate a Dependent cannot be updated.  If you want to
  24:be able to change the expression, create a Cell instead. */
  25:mixin class Dependent[T] extends Future[T] {
  26:  var description: String = _;
  27:  def context: CellContext;
  28:  def unset: unit;
  29:}
  30:
  31:/** A modifiable can track which other modifiable values it is
  32:dependent on during an update, if the other modifiables are accessed
  33:during a call to update. */
  34:mixin class Cell[T] extends Dependent[T] {
  35:  def update(t: => T): unit;
  36:}
  37:
  38:/** A CellContext can track the dependencies between modifiable
  39:objects it manufactures.  */
  40:class CellContext {
  41:  
  42:  private val tlocal = new ThreadLocal();
  43:
  44:  /** Create a cell that uses a fixed formula to calculate its value.  
  45:  The formula cannot be changed once set.  */
  46:  def formula[T](t: => T): Dependent[T] = new MCell[T] {
  47:    def cellType = "Formula";
  48:    override def get: T = synchronized {
  49:      if (!isSet) 
  50:          compute;
  51:      super.get
  52:    }
  53:    def doSet = super.set(t);
  54:    override def set(t: T): unit = error("Cannot call set on Cells.  Use update instead.");
  55:  }
  56:  
  57:  /** Create a modifiable cell that will track its dependencies on other
  58:      modifiable values.  If the update function is used to calculate the
  59:      contents AND all calculation involving modifiables is contained 
  60:      within the expression, dependencies will be automatically managed.  */
  61:  implicit def cell[T](initial: => T): Cell[T] = new MCell[T] with Cell[T] {
  62:    var expression = new Function0[T] {
  63:      def apply() = initial;
  64:    }
  65:    def cellType = "Cell";
  66:    override def get: T = synchronized {
  67:      if (!isSet) 
  68:          compute;
  69:      super.get
  70:    }
  71:    def doSet = super.set(expression());
  72:    def update(t: => T): unit = {
  73:      expression = new Function0[T] {
  74:        def apply() = t;
  75:      } ;
  76:      compute;
  77:    }
  78:    override def set(t: T): unit = error("Cannot call set on cell.  Use update instead.");
  79:  }
  80:  
  81:  private def localStack = synchronized {
  82:    var stack = tlocal.get().asInstanceOf[java.util.Stack];
  83:    if (stack == null) {
  84:      stack = new java.util.Stack;
  85:      tlocal.set(stack);
  86:    }
  87:    stack;
  88:  }
  89:    
  90:  abstract private class MCell[T] extends ExceptionSyncVar[T] with Dependent[T] {
  91:    
  92:    private var inputs: java.util.IdentityHashMap = _;
  93:    private var outputs: java.util.WeakHashMap = _; 
  94:    
  95:    def context = CellContext.this;
  96:    
  97:    protected def cellType: String;
  98:    
  99:    def inputCount = if (inputs != null) inputs.size() else 0;
 100:    def outputCount = if (outputs != null) outputs.size() else 0;
 101:    
 102:    override def toString() = {
 103:      val sb = new StringBuffer();
 104:      sb.append(cellType);
 105:      sb.append('(');
 106:      if (description != null) sb.append('"').append(description).append("\" ");
 107:      if (inputCount > 0) sb.append(inputCount).append("->");      
 108:      sb.append('[').append(get.toString()).append(']');
 109:      if (outputCount > 0) sb.append("->").append(outputCount);
 110:      sb.append(')');
 111:      sb.toString();
 112:    }
 113:    
 114:    override def get: T = {
 115:      // If something else is calculating, connect dependencies
 116:      var stack = tlocal.get().asInstanceOf[java.util.Stack];
 117:      if (stack != null && !stack.isEmpty) {
 118:        var dependent = stack.peek().asInstanceOf[MCell[All]];
 119:        dependent.addInput(this);
 120:        addOutput(dependent);
 121:      }
 122:      
 123:      super.get;
 124:    }
 125:    
 126:    override def set(t: T) = {
 127:      super.set(t);
 128:      if (outputs != null) {
 129:        val iter = outputs.keySet().iterator();
 130:        while (iter.hasNext())
 131:          iter.next().asInstanceOf[MCell[All]].unset;      
 132:      }
 133:    }
 134:    
 135:    def doSet: unit;
 136:    
 137:    def compute = {
 138:      clearInputs;
 139:      localStack.push(this);
 140:      try {
 141:        doSet;
 142:      } catch {
 143:        case t: Throwable => fail(t)
 144:      } finally {
 145:        localStack.pop();
 146:      }
 147:    }
 148:    
 149:    private def addInput(i: AnyRef) = {
 150:      if (inputs == null)
 151:        inputs = new java.util.IdentityHashMap;
 152:      inputs.put(i, Nil);
 153:    }
 154:    
 155:    private def removeInput(i: AnyRef) = {
 156:      if (inputs != null) {
 157:        if (inputs.remove(i) != null && inputs.isEmpty())
 158:          inputs = null;
 159:      }
 160:    }
 161:    
 162:    private def addOutput(o: AnyRef) = {
 163:      if (outputs == null)
 164:        outputs = new java.util.WeakHashMap;
 165:      outputs.put(o, Nil);
 166:    }
 167:    
 168:    private def removeOutput(o: AnyRef) = {
 169:      if (outputs != null)
 170:        if (outputs.remove(o) != null && outputs.isEmpty())
 171:          outputs = null;
 172:    }
 173:    
 174:    def clearInputs = {
 175:      if (inputs != null) {
 176:        val iter = inputs.keySet().iterator();
 177:        while (iter.hasNext())
 178:          iter.next().asInstanceOf[MCell[All]].removeOutput(this);
 179:        inputs = null;
 180:      }
 181:    }
 182:  }
 183:}
 184:
 185:object Future extends CellContext {
 186:
 187:  /** Implicity convert arbitrary values of T into a Future[T]. */  
 188:  implicit def lazyConst[T](t: T): Future[T] = Ready(t);
 189:  
 190:  /** Retrieve the value, which may result in forcing the value if it is not already available. */
 191:  implicit def fromFuture[T](lz: Future[T]): T = lz();
 192:  
 193:  /** We want to be able to treat our lazy values like regular values when we're
 194:  iterating over them, and so forth.  We define a kind of "reverse lifter" here
 195:  that wraps a function on T into a function on Future[T]. */
 196:  implicit def lazyLift[T,B](tfunc: (T) => B): (Future[T]) => B = (lt) => tfunc(lt()); 
 197:  
 198:  /** Directly construct a lazy value of type T. Note that using the anonymous class
 199:  this way seems to be the simplest way to prevent execution of "f" until we really
 200:  mean to, and still keep it around cleanly. */
 201:  def lazy[T](f: => T): Future[T] = new Lazy[T] {
 202:    def func: T = f;
 203:  }
 204:  
 205:  /** Create a future that is failed from the start. */
 206:  def fail[T](ex: Throwable): Future[T] = Failed(ex);
 207:
 208:  /** Build a concurrent future, starting a thread to perform the computation. */  
 209:  def spawn[T](f: => T): Future[T] = new ExceptionSyncVar[T] {
 210:    val thread = new Thread() { 
 211:      override def run() = {
 212:        try {
 213:          set(f)
 214:        } catch {
 215:          case ex: Throwable => fail(ex)
 216:        }
 217:        Console.println("Spawn done");
 218:      } 
 219:    }
 220:    thread.start
 221:  }
 222:
 223:  /** Build a promise for a future value; the promise can be fulfilled
 224:      on this or any other thread.  A thread getting the value will block
 225:      if the promise has not been fulfilled. */
 226:  def promise[T]: Promise[T] = new ExceptionSyncVar[T] with Promise[T] {
 227:    def future = this;
 228:    override def toString = createString("Promise"); 
 229:  }
 230:
 231:}
 232:
 233:
 234:/*
 235:mixin class Executor {
 236:  
 237:  def run(f: => unit): unit;
 238:  def run[A](f: A => unit)(a: A): unit = run( f(a) );
 239:  def run[A,B](f: (A, B) => unit)(a: A, b: B): unit = run( f(a,b) );
 240:  def run[A,B,C](f: (A, B, C) => unit)(a: A, b: B, c: C): unit = run( f(a,b,c) );
 241:  
 242:  def submit[T](f: => T): Future[T];
 243:  def submit[A,T](f: A => T)(a: A): Future[T] = submit( f(a) );
 244:  
 245:  def invokeAll[T](tasks: Function0[T]*): Seq[Future[T]];
 246:  def invokeAny[T](tasks: Function0[T]*): T;
 247:  
 248:  def isShutdown: boolean;
 249:  def isTerminated: boolean;
 250:  def shutdown: unit;
 251:  def shutdownNow: unit;
 252:  
 253:}
 254:
 255:object Executors {
 256:      
 257:}
 258:*/
 259:
 260:private class ExceptionSyncVar[T] extends Future[T] {
 261:  
 262:  private var ex: Throwable = _;
 263:  private var isDefined: Boolean = false;
 264:  private var value: T = _;
 265:  private var empty: T = _;
 266:  
 267:  def failure = ex;
 268:  override def apply() = get;
 269:  def isFailure = isDefined && (ex != null);
 270:
 271:  def unset = synchronized {
 272:    ex = null;
 273:    isDefined = false;
 274:    value = empty;
 275:  }
 276:  def set(x: T) = synchronized {
 277:    value = x
 278:    isDefined = true
 279:    notifyAll()
 280:  }
 281:  def fail(e: Throwable): unit = synchronized {
 282:    ex = e
 283:    isDefined = true
 284:    notifyAll()
 285:  }
 286:
 287:  def isSet: Boolean = synchronized {
 288:    isDefined
 289:  }
 290:
 291:  def get = synchronized {
 292:    if (!isDefined) wait();
 293:    if (ex != null) throw ex;
 294:    value
 295:  }
 296:  
 297:  override def toString = createString("ESync"); 
 298:    
 299:  def createString(head: String) = 
 300:    if (!isDefined) head + "(?)"
 301:    else if (ex != null) head + '(' + ex + ')'
 302:    else head + '(' + value + ')';
 303:}
 304:  
 305:private abstract class Lazy[T] extends ExceptionSyncVar[T] {
 306:  def func: T;
 307:  override def get: T = synchronized {
 308:    if (!isSet) 
 309:      try {
 310:        val ret = func;
 311:        set(ret);
 312:        ret
 313:      } catch {
 314:        case e: Throwable => fail(e)
 315:        throw e;
 316:      }
 317:    else 
 318:      super.get
 319:  }
 320:  override def toString() = createString("Lazy"); 
 321:}
 322:
 323:case class Failed[T](ex: Throwable) extends Future[T] {
 324:  def apply(): T = throw ex;
 325:  def failure = ex;
 326:  def isFailure = true;
 327:  def isSet = true;
 328:}
 329:
 330:case class Ready[T](v: T) extends Future[T] {
 331:  def apply() = v;
 332:  override def failure = null;
 333:  def isFailure = false;
 334:  def isSet = true;
 335:}
 336:
 337: