001: package dalma.spi.port;
002:
003: import dalma.Condition;
004: import dalma.ReplyIterator;
005: import dalma.endpoints.timer.TimerEndPoint;
006: import dalma.impl.EndPointImpl;
007: import dalma.spi.FiberSPI;
008:
009: import java.util.Date;
010: import java.util.Hashtable;
011: import java.util.Map;
012: import java.util.logging.Logger;
013:
014: /**
015: * @author Kohsuke Kawaguchi
016: */
017: public abstract class MultiplexedEndPoint<Key, Msg> extends
018: EndPointImpl {
019: /**
020: * Conversations waiting for replies.
021: */
022: protected final Map<Key, Receiver<Key, Msg>> queue = new Hashtable<Key, Receiver<Key, Msg>>();
023:
024: /**
025: * Logger for event logging.
026: */
027: protected final Logger logger = Logger.getLogger(getClass()
028: .getName());
029:
030: protected MultiplexedEndPoint(String name) {
031: super (name);
032: }
033:
034: /**
035: * Sends out an message and waits for a single reply.
036: *
037: * <p>
038: * This method blocks the conversation indefinitely until a reply is received.
039: *
040: * @return always non-null.
041: */
042: protected Msg waitForReply(Msg msg) {
043: return FiberSPI.currentFiber(true).suspend(
044: new OneTimeCondition<Key, Msg>(this , msg));
045: }
046:
047: /**
048: * Sends out an message and waits for a single reply with timeout.
049: *
050: * TODO:javadoc
051: */
052: protected Msg waitForReply(Msg msg, Date timeout) {
053: return FiberSPI.currentFiber(true).suspend(
054: new OneTimeCondition<Key, Msg>(this , msg),
055: TimerEndPoint.<Msg> createDock(timeout));
056: }
057:
058: /**
059: * Sends out an message and waits for multiple replies.
060: *
061: * TODO:javadoc
062: */
063: protected ReplyIterator<Msg> waitForMultipleReplies(Msg outgoing,
064: Date expirationDate) {
065: return new ReplyIteratorImpl<Key, Msg>(this , outgoing,
066: expirationDate);
067: }
068:
069: //
070: //
071: // implementation detail
072: //
073: //
074:
075: /*package*/void register(Receiver<Key, Msg> mr) {
076: queue.put(mr.getKey(), mr);
077: }
078:
079: /*paclage*/void unregister(Receiver<Key, Msg> mr) {
080: queue.remove(mr.getKey());
081: }
082:
083: /**
084: * Obtains the key of the message.
085: * Used to find the key of the incoming message.
086: *
087: * @param msg
088: * never be null.
089: */
090: protected abstract Key getKey(Msg msg);
091:
092: /**
093: * Invoked upon receiving a new message that doesn't have any key.
094: */
095: protected abstract void onNewMessage(Msg msg);
096:
097: /**
098: * Sends an out-going message, and returns a key that will identify replies.
099: */
100: protected abstract Key send(Msg msg);
101:
102: /**
103: * Dispatches a newly received message to the right receiver.
104: *
105: * This method needs to be invoked when a new message is received.
106: */
107: protected void handleMessage(Msg msg) {
108: Key key = getKey(msg);
109: if (key == null) {
110: onNewMessage(msg);
111: return;
112: }
113:
114: Receiver<Key, Msg> receiver = queue.get(key);
115: if (receiver == null) {
116: // TODO: or shall it be exception?
117: logger
118: .warning("No conversation is waiting for the message key="
119: + key);
120: return;
121: }
122: receiver.handleMessage(msg);
123: }
124:
125: /**
126: * Condition used for waiting a single reply.
127: */
128: private static final class OneTimeCondition<Key, Msg> extends
129: Condition<Msg> implements Receiver<Key, Msg> {
130: private Key key;
131:
132: /**
133: * The out-going message to be sent.
134: *
135: * The field is transient because we'll send it before
136: * the dock is serialized, and thereafter never be used.
137: */
138: private transient Msg outgoing;
139:
140: private final MultiplexedEndPoint<Key, Msg> endPoint;
141:
142: public OneTimeCondition(MultiplexedEndPoint<Key, Msg> endPoint,
143: Msg outgoing) {
144: this .outgoing = outgoing;
145: this .endPoint = endPoint;
146: }
147:
148: private MultiplexedEndPoint<Key, Msg> getEndPoint() {
149: return endPoint;
150: }
151:
152: public Key getKey() {
153: return key;
154: }
155:
156: public void handleMessage(Msg msg) {
157: getEndPoint().unregister(this );
158: activate(msg);
159: }
160:
161: public void onParked() {
162: MultiplexedEndPoint<Key, Msg> endPoint = getEndPoint();
163: try {
164: key = endPoint.send(outgoing);
165: assert key != null;
166: endPoint.register(this );
167: } finally {
168: outgoing = null;
169: }
170: }
171:
172: public void interrupt() {
173: getEndPoint().unregister(this );
174: }
175:
176: public void onLoad() {
177: getEndPoint().register(this);
178: }
179: }
180: }
|