001: package org.jgroups.mux;
002:
003: import org.jgroups.*;
004: import org.jgroups.stack.ProtocolStack;
005:
006: import java.io.Serializable;
007: import java.util.Map;
008:
009: /**
010: * Multiplexer channel. This is returned as result of calling
011: * {@link org.jgroups.ChannelFactory#createMultiplexerChannel(String,String,boolean,String)}. Maintains the multiplexer
012: * ID, which is used to add a header to each message, so that the message can be demultiplexed at the receiver
013: * @author Bela Ban
014: * @version $Id: MuxChannel.java,v 1.26.2.1 2007/01/08 21:00:16 vlada Exp $
015: */
016: public class MuxChannel extends JChannel {
017:
018: /** the real channel to delegate to */
019: final JChannel ch;
020:
021: /** The service ID */
022: final String id;
023:
024: /** a reference back to the factory that created us */
025: final JChannelFactory factory;
026:
027: /** The name of the JGroups stack, e.g. as defined in stacks.xml */
028: final String stack_name;
029:
030: /** will be added to each message sent */
031: final MuxHeader hdr;
032:
033: final String name = "MUX";
034: final Multiplexer mux;
035:
036: public MuxChannel(JChannelFactory f, JChannel ch, String id,
037: String stack_name, Multiplexer mux) {
038: super (false); // don't create protocol stack, queues and threads
039: factory = f;
040: this .ch = ch;
041: this .stack_name = stack_name;
042: this .id = id;
043: hdr = new MuxHeader(id);
044: this .mux = mux;
045: closed = !ch.isOpen();
046: // connected=ch.isConnected();
047: }
048:
049: public String getStackName() {
050: return stack_name;
051: }
052:
053: public String getId() {
054: return id;
055: }
056:
057: public Multiplexer getMultiplexer() {
058: return mux;
059: }
060:
061: public String getChannelName() {
062: return ch.getClusterName();
063: }
064:
065: public String getClusterName() {
066: return ch.getClusterName();
067: }
068:
069: public Address getLocalAddress() {
070: return ch != null ? ch.getLocalAddress() : null;
071: }
072:
073: /** This should never be used (just for testing) ! */
074: public JChannel getChannel() {
075: return ch;
076: }
077:
078: /**
079: * Returns the <em>service</em> view, ie. the cluster view (see {@link #getView()}) <em>minus</em> the nodes on
080: * which this service is not running, e.g. if S1 runs on A and C, and the cluster view is {A,B,C}, then the service
081: * view is {A,C}
082: * @return The service view (list of nodes on which this service is running)
083: */
084: public View getView() {
085: return mux.getServiceView(id);
086: }
087:
088: /** Returns the JGroups view of a cluster, e.g. if we have nodes A, B and C, then the view will
089: * be {A,B,C}
090: * @return The JGroups view
091: */
092: public View getClusterView() {
093: return ch != null ? ch.getView() : null;
094: }
095:
096: public ProtocolStack getProtocolStack() {
097: return ch != null ? ch.getProtocolStack() : null;
098: }
099:
100: public boolean isOpen() {
101: return !closed;
102: }
103:
104: public boolean isConnected() {
105: return connected;
106: }
107:
108: public Map dumpStats() {
109: return ch.dumpStats();
110: }
111:
112: public void setClosed(boolean f) {
113: closed = f;
114: }
115:
116: public void setConnected(boolean f) {
117: connected = f;
118: }
119:
120: public Object getOpt(int option) {
121: return ch.getOpt(option);
122: }
123:
124: public void setOpt(int option, Object value) {
125: ch.setOpt(option, value);
126: super .setOpt(option, value);
127: }
128:
129: public synchronized void connect(String channel_name)
130: throws ChannelException, ChannelClosedException {
131: factory.connect(this );
132: notifyChannelConnected(this );
133: }
134:
135: public synchronized boolean connect(String cluster_name,
136: Address target, String state_id, long timeout)
137: throws ChannelException {
138: throw new UnsupportedOperationException("not yet implemented");
139: }
140:
141: public synchronized void disconnect() {
142: try {
143: closed = false;
144: connected = false;
145: factory.disconnect(this );
146: } catch (Throwable t) {
147: log.error("disconnecting channel failed", t);
148: }
149: closed = false;
150: connected = false;
151: notifyChannelDisconnected(this );
152: }
153:
154: public synchronized void open() throws ChannelException {
155: factory.open(this );
156: }
157:
158: public synchronized void close() {
159: try {
160: closed = true;
161: connected = false;
162: factory.close(this );
163: } finally {
164: closed = true;
165: connected = false;
166: closeMessageQueue(true);
167: }
168:
169: notifyChannelClosed(this );
170: }
171:
172: protected void _close(boolean disconnect, boolean close_mq) {
173: super ._close(disconnect, close_mq);
174: closed = !ch.isOpen();
175: connected = ch.isConnected();
176: notifyChannelClosed(this );
177: }
178:
179: public synchronized void shutdown() {
180: try {
181: factory.shutdown(this );
182: } finally {
183: closed = true;
184: connected = false;
185: closeMessageQueue(true);
186: }
187: }
188:
189: public void send(Message msg) throws ChannelNotConnectedException,
190: ChannelClosedException {
191: msg.putHeader(name, hdr);
192: ch.send(msg);
193: }
194:
195: public void send(Address dst, Address src, Serializable obj)
196: throws ChannelNotConnectedException, ChannelClosedException {
197: send(new Message(dst, src, obj));
198: }
199:
200: public void down(Event evt) {
201: if (evt.getType() == Event.MSG) {
202: Message msg = (Message) evt.getArg();
203: msg.putHeader(name, hdr);
204: ch.down(evt);
205: } else {
206: if (evt.getType() == Event.BLOCK_OK) {
207: mux.blockOk();
208: return;
209: }
210: ch.down(evt);
211: }
212: }
213:
214: // public void blockOk() {
215: // }
216:
217: public boolean getState(Address target, long timeout)
218: throws ChannelNotConnectedException, ChannelClosedException {
219: return getState(target, null, timeout);
220: }
221:
222: public boolean getState(Address target, String state_id,
223: long timeout) throws ChannelNotConnectedException,
224: ChannelClosedException {
225: String my_id = id;
226:
227: if (state_id != null)
228: my_id += "::" + state_id;
229:
230: // we're usig service views, so we need to find the first host in the cluster on which our service runs
231: // http://jira.jboss.com/jira/browse/JGRP-247
232: //
233: // unless service runs on a specified target node
234: // http://jira.jboss.com/jira/browse/JGRP-401
235: Address service_view_coordinator = mux.getStateProvider(target,
236: id);
237: Address tmp = getLocalAddress();
238:
239: if (service_view_coordinator != null)
240: target = service_view_coordinator;
241:
242: if (tmp != null && tmp.equals(target)) // this will avoid the "cannot get state from myself" error
243: target = null;
244:
245: if (!mux.stateTransferListenersPresent())
246: return ch.getState(target, my_id, timeout);
247: else {
248: return mux.getState(target, my_id, timeout);
249: }
250: }
251:
252: public void returnState(byte[] state) {
253: ch.returnState(state, id);
254: }
255:
256: public void returnState(byte[] state, String state_id) {
257: String my_id = id;
258: if (state_id != null)
259: my_id += "::" + state_id;
260: ch.returnState(state, my_id);
261: }
262:
263: protected void checkNotConnected()
264: throws ChannelNotConnectedException {
265: ;
266: }
267:
268: protected void checkClosed() throws ChannelClosedException {
269: ;
270: }
271: }
|