001: package dalma.endpoints.invm;
002:
003: import dalma.Condition;
004: import dalma.spi.FiberSPI;
005:
006: import java.io.Serializable;
007: import java.util.Hashtable;
008: import java.util.List;
009: import java.util.Map;
010: import java.util.Observable;
011: import java.util.Observer;
012: import java.util.Vector;
013:
014: /**
015: * Works like a socket in the in-VM communicaiton.
016: *
017: * @author Kohsuke Kawaguchi
018: */
019: public class Channel extends Observable implements Serializable {
020: /**
021: * {@link Message}s that were delivered but not read by the application.
022: */
023: private final List<Message> queue = new Vector<Message>();
024:
025: /**
026: * {@link Channel}s are uniquely identified names (so that serialization
027: * will bind back to the same instance.)
028: */
029: private static final Map<String, Channel> channels = new Hashtable<String, Channel>();
030:
031: private final String name;
032: private static int iota;
033:
034: public Channel() {
035: synchronized (Channel.class) {
036: name = Integer.toString(iota++);
037: }
038: channels.put(name, this );
039: }
040:
041: /**
042: * Sends a {@link Message} from this channel to the specified channel.
043: */
044: public void send(Message msg, Channel to) {
045: msg.from = this ;
046: msg.to = to;
047:
048: if (msg == null)
049: throw new IllegalArgumentException("message is null");
050:
051: synchronized (to) {
052: to.queue.add(msg);
053: to.setChanged();
054: to.notifyObservers();
055: to.notify();
056: }
057: }
058:
059: /**
060: * Waits until there's a new {@link Message}.
061: */
062: public synchronized <T> Message<T> receive() {
063: while (queue.isEmpty()) {
064: FiberSPI<?> fiber = FiberSPI.currentFiber(false);
065: if (fiber != null)
066: fiber.suspend(new ConditionImpl());
067: else
068: try {
069: wait();
070: } catch (InterruptedException e) {
071: Thread.currentThread().interrupt(); // process it later
072: }
073: }
074:
075: return queue.remove(0);
076: }
077:
078: private final class ConditionImpl extends Condition<Void> implements
079: Observer {
080: public ConditionImpl() {
081: }
082:
083: public void onParked() {
084: synchronized (Channel.this ) {
085: if (!queue.isEmpty())
086: activate(null);
087: else
088: addObserver(this );
089: }
090: }
091:
092: public void onLoad() {
093: onParked();
094: }
095:
096: public void interrupt() {
097: deleteObserver(this );
098: }
099:
100: public void update(Observable o, Object arg) {
101: synchronized (Channel.this ) {
102: if (!queue.isEmpty()) {
103: deleteObserver(this );
104: activate(null);
105: }
106: }
107: }
108: }
109:
110: protected Object writeReplace() {
111: return new Moniker(name);
112: }
113:
114: private static final class Moniker implements Serializable {
115: private final String name;
116:
117: public Moniker(String name) {
118: this .name = name;
119: }
120:
121: private Object readResolve() {
122: return channels.get(name);
123: }
124:
125: private static final long serialVersionUID = 1L;
126: }
127: }
|