001: package dalma.spi.port;
002:
003: import dalma.Condition;
004: import dalma.ReplyIterator;
005: import dalma.endpoints.timer.TimerEndPoint;
006: import dalma.impl.GeneratorImpl;
007: import dalma.spi.ConversationSPI;
008: import dalma.spi.FiberSPI;
009:
010: import java.util.Date;
011: import java.util.LinkedList;
012: import java.util.List;
013: import java.util.NoSuchElementException;
014:
015: /**
016: * {@link ReplyIterator} implementation for {@link MultiplexedEndPoint}.
017: *
018: * @author Kohsuke Kawaguchi
019: */
020: final class ReplyIteratorImpl<Key, Msg> extends GeneratorImpl implements
021: ReplyIterator<Msg>, Receiver<Key, Msg> {
022:
023: /**
024: * EndPoint to which this iterator belongs.
025: */
026: private final MultiplexedEndPoint<Key, Msg> endPoint;
027:
028: private final List<Msg> replies = new LinkedList<Msg>();
029:
030: /**
031: * If the {@link #replies} become empty, the conversation
032: * will wait until a new one arrives by using this lock.
033: */
034: private transient ConditionImpl lock;
035:
036: private final Key key;
037:
038: /**
039: * @see #getExpirationDate()
040: */
041: private final Date expirationDate;
042:
043: ReplyIteratorImpl(MultiplexedEndPoint<Key, Msg> endPoint,
044: Msg outgoing, Date expirationDate) {
045: this .endPoint = endPoint;
046: this .expirationDate = expirationDate;
047: synchronized (endPoint.queue) {
048: this .key = endPoint.send(outgoing);
049: ConversationSPI.currentConversation().addGenerator(this );
050: }
051: }
052:
053: protected void onLoad() {
054: endPoint.register(this );
055: }
056:
057: public void dispose() {
058: endPoint.unregister(this );
059: }
060:
061: public Key getKey() {
062: return key;
063: }
064:
065: public Date getExpirationDate() {
066: return expirationDate;
067: }
068:
069: public synchronized Msg next() {
070: if (replies.isEmpty())
071: throw new NoSuchElementException();
072:
073: return replies.remove(0);
074: }
075:
076: public synchronized boolean hasNext() {
077: if (replies.isEmpty()) {
078: // no replies in the queue
079: if (!isExpired()) {
080: // block until we receive another one
081: lock = new ConditionImpl();
082: if (expirationDate == null)
083: FiberSPI.currentFiber(true).suspend(lock);
084: else
085: FiberSPI.currentFiber(true).suspend(lock,
086: TimerEndPoint.createDock(expirationDate));
087: }
088: }
089: return !replies.isEmpty();
090: }
091:
092: private boolean isExpired() {
093: return expirationDate != null
094: && expirationDate.before(new Date());
095: }
096:
097: public void remove() {
098: throw new UnsupportedOperationException();
099: }
100:
101: public synchronized void handleMessage(Msg msg) {
102: if (isExpired())
103: return; // we are no longer collecting replies. discard.
104: replies.add(msg);
105: if (lock != null) {
106: lock.activate(null);
107: lock = null;
108: }
109: }
110:
111: /**
112: * Blocks until a next message is received.
113: */
114: private final class ConditionImpl extends Condition<Void> {
115: public ConditionImpl() {
116: }
117:
118: public void onParked() {
119: synchronized (ReplyIteratorImpl.this ) {
120: if (!replies.isEmpty()) {
121: activate(null);
122: return;
123: }
124: lock = this ;
125: }
126: }
127:
128: public void interrupt() {
129: // noop
130: }
131:
132: public void onLoad() {
133: onParked();
134: }
135: }
136: }
|