001: package dalma.endpoints.input;
002:
003: import dalma.Conversation;
004: import dalma.Condition;
005: import dalma.Engine;
006: import dalma.impl.EndPointImpl;
007: import dalma.spi.FiberSPI;
008:
009: import java.io.BufferedReader;
010: import java.io.IOException;
011: import java.io.InputStreamReader;
012: import java.util.ArrayList;
013: import java.util.List;
014:
015: /**
016: * Waits for the user input.
017: *
018: * This is a singleton endPoint.
019: *
020: * @author Kohsuke Kawaguchi
021: */
022: public final class LineInputEndPoint extends EndPointImpl implements
023: Runnable {
024:
025: /**
026: * {@link Conversation}s waiting for input.
027: */
028: private static final List<LineCondition> queue = new ArrayList<LineCondition>();
029:
030: private final Thread thread = new Thread(this );
031:
032: public LineInputEndPoint() {
033: super (LineInputEndPoint.class.getName());
034: }
035:
036: public void run() {
037: try {
038: BufferedReader in = new BufferedReader(
039: new InputStreamReader(System.in));
040: String line;
041: while ((line = in.readLine()) != null) {
042: synchronized (queue) {
043: if (!queue.isEmpty()) {
044: // pick the conversation to be activated
045: LineCondition cond = queue.remove(0);
046: cond.activate(line);
047: }
048: }
049: }
050: } catch (IOException e) {
051: throw new Error(e); // can never happen
052: }
053: }
054:
055: protected void start() {
056: // start the monitor thread
057: thread.start();
058: }
059:
060: protected void stop() {
061: thread.interrupt();
062: try {
063: thread.join();
064: } catch (InterruptedException e) {
065: ;
066: }
067: }
068:
069: private final class LineCondition extends Condition<String> {
070: public LineCondition() {
071: }
072:
073: public void onParked() {
074: synchronized (queue) {
075: queue.add(this );
076: }
077: }
078:
079: public void onLoad() {
080: onParked();
081: }
082:
083: public void interrupt() {
084: synchronized (queue) {
085: queue.remove(this );
086: }
087: }
088: }
089:
090: /**
091: * Wait for an user input.
092: */
093: // this method is invoked from conversations
094: public static String waitForInput() {
095: FiberSPI<?> fiber = FiberSPI.currentFiber(true);
096: return fiber.suspend(createCondition(fiber));
097: }
098:
099: private static LineCondition createCondition(FiberSPI fiber) {
100: LineCondition cond;
101: synchronized (LineInputEndPoint.class) {
102: Engine engine = fiber.getOwner().getEngine();
103: LineInputEndPoint endPoint = (LineInputEndPoint) engine
104: .getEndPoint(LineInputEndPoint.class.getName());
105: cond = endPoint.new LineCondition();
106: }
107: return cond;
108: }
109: }
|