0001: package org.jgroups;
0002:
0003: import org.apache.commons.logging.Log;
0004: import org.apache.commons.logging.LogFactory;
0005: import org.jgroups.conf.ConfiguratorFactory;
0006: import org.jgroups.conf.ProtocolStackConfigurator;
0007: import org.jgroups.stack.ProtocolStack;
0008: import org.jgroups.stack.StateTransferInfo;
0009: import org.jgroups.stack.IpAddress;
0010: import org.jgroups.util.*;
0011: import org.w3c.dom.Element;
0012:
0013: import java.io.File;
0014: import java.io.InputStream;
0015: import java.io.OutputStream;
0016: import java.io.Serializable;
0017: import java.net.URL;
0018: import java.util.HashMap;
0019: import java.util.Map;
0020: import java.util.Vector;
0021:
0022: /**
0023: * JChannel is a pure Java implementation of Channel.
0024: * When a JChannel object is instantiated it automatically sets up the
0025: * protocol stack.
0026: * <p>
0027: * <B>Properties</B>
0028: * <P>
0029: * Properties are used to configure a channel, and are accepted in
0030: * several forms; the String form is described here.
0031: * A property string consists of a number of properties separated by
0032: * colons. For example:
0033: * <p>
0034: * <pre>"<prop1>(arg1=val1):<prop2>(arg1=val1;arg2=val2):<prop3>:<propn>"</pre>
0035: * <p>
0036: * Each property relates directly to a protocol layer, which is
0037: * implemented as a Java class. When a protocol stack is to be created
0038: * based on the above property string, the first property becomes the
0039: * bottom-most layer, the second one will be placed on the first, etc.:
0040: * the stack is created from the bottom to the top, as the string is
0041: * parsed from left to right. Each property has to be the name of a
0042: * Java class that resides in the
0043: * {@link org.jgroups.protocols} package.
0044: * <p>
0045: * Note that only the base name has to be given, not the fully specified
0046: * class name (e.g., UDP instead of org.jgroups.protocols.UDP).
0047: * <p>
0048: * Each layer may have 0 or more arguments, which are specified as a
0049: * list of name/value pairs in parentheses directly after the property.
0050: * In the example above, the first protocol layer has 1 argument,
0051: * the second 2, the third none. When a layer is created, these
0052: * properties (if there are any) will be set in a layer by invoking
0053: * the layer's setProperties() method
0054: * <p>
0055: * As an example the property string below instructs JGroups to create
0056: * a JChannel with protocols UDP, PING, FD and GMS:<p>
0057: * <pre>"UDP(mcast_addr=228.10.9.8;mcast_port=5678):PING:FD:GMS"</pre>
0058: * <p>
0059: * The UDP protocol layer is at the bottom of the stack, and it
0060: * should use mcast address 228.10.9.8. and port 5678 rather than
0061: * the default IP multicast address and port. The only other argument
0062: * instructs FD to output debug information while executing.
0063: * Property UDP refers to a class {@link org.jgroups.protocols.UDP},
0064: * which is subsequently loaded and an instance of which is created as protocol layer.
0065: * If any of these classes are not found, an exception will be thrown and
0066: * the construction of the stack will be aborted.
0067: *
0068: * @author Bela Ban
0069: * @version $Id: JChannel.java,v 1.106.2.1 2006/12/04 22:47:16 vlada Exp $
0070: */
0071: public class JChannel extends Channel {
0072:
0073: /**
0074: * The default protocol stack used by the default constructor.
0075: */
0076: public static final String DEFAULT_PROTOCOL_STACK = "UDP(down_thread=false;mcast_send_buf_size=640000;mcast_port=45566;discard_incompatible_packets=true;"
0077: + "ucast_recv_buf_size=20000000;mcast_addr=228.10.10.10;up_thread=false;loopback=false;"
0078: + "mcast_recv_buf_size=25000000;max_bundle_size=64000;max_bundle_timeout=30;"
0079: + "use_incoming_packet_handler=true;use_outgoing_packet_handler=false;"
0080: + "ucast_send_buf_size=640000;tos=16;enable_bundling=true;ip_ttl=2):"
0081: + "PING(timeout=2000;down_thread=false;num_initial_members=3;up_thread=false):"
0082: + "MERGE2(max_interval=10000;down_thread=false;min_interval=5000;up_thread=false):"
0083: + "FD(timeout=2000;max_tries=3;down_thread=false;up_thread=false):"
0084: + "VERIFY_SUSPECT(timeout=1500;down_thread=false;up_thread=false):"
0085: + "pbcast.NAKACK(max_xmit_size=60000;down_thread=false;use_mcast_xmit=false;gc_lag=0;"
0086: + "discard_delivered_msgs=true;up_thread=false;retransmit_timeout=100,200,300,600,1200,2400,4800):"
0087: + "UNICAST(timeout=300,600,1200,2400,3600;down_thread=false;up_thread=false):"
0088: + "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=50000;max_bytes=400000;down_thread=false;"
0089: + "up_thread=false):"
0090: + "VIEW_SYNC(down_thread=false;avg_send_interval=60000;up_thread=false):"
0091: + "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;"
0092: + "join_retry_timeout=2000;up_thread=false;shun=true):"
0093: + "FC(max_credits=2000000;down_thread=false;up_thread=false;min_threshold=0.10):"
0094: + "FRAG2(frag_size=60000;down_thread=false;up_thread=false):"
0095: + "pbcast.STATE_TRANSFER(down_thread=false;up_thread=false)";
0096:
0097: static final String FORCE_PROPS = "force.properties";
0098:
0099: /* the protocol stack configuration string */
0100: private String props = null;
0101:
0102: /*the address of this JChannel instance*/
0103: private Address local_addr = null;
0104: /*the channel (also know as group) name*/
0105: private String cluster_name = null; // group name
0106: /*the latest view of the group membership*/
0107: private View my_view = null;
0108: /*the queue that is used to receive messages (events) from the protocol stack*/
0109: private final Queue mq = new Queue();
0110: /*the protocol stack, used to send and receive messages from the protocol stack*/
0111: private ProtocolStack prot_stack = null;
0112:
0113: /** Thread responsible for closing a channel and potentially reconnecting to it (e.g., when shunned). */
0114: protected CloserThread closer = null;
0115:
0116: /** To wait until a local address has been assigned */
0117: private final Promise local_addr_promise = new Promise();
0118:
0119: /** To wait until we have connected successfully */
0120: private final Promise connect_promise = new Promise();
0121:
0122: /** To wait until we have been disconnected from the channel */
0123: private final Promise disconnect_promise = new Promise();
0124:
0125: private final Promise state_promise = new Promise();
0126:
0127: private final Promise flush_unblock_promise = new Promise();
0128:
0129: private final Promise flush_promise = new Promise();
0130:
0131: /** wait until we have a non-null local_addr */
0132: private long LOCAL_ADDR_TIMEOUT = 30000; //=Long.parseLong(System.getProperty("local_addr.timeout", "30000"));
0133: /*if the states is fetched automatically, this is the default timeout, 5 secs*/
0134: private static final long GET_STATE_DEFAULT_TIMEOUT = 5000;
0135: /*if FLUSH is used channel waits for UNBLOCK event, this is the default timeout, 10 secs*/
0136: private static final long FLUSH_UNBLOCK_TIMEOUT = 10000;
0137: /*flag to indicate whether to receive blocks, if this is set to true, receive_views is set to true*/
0138: private boolean receive_blocks = false;
0139: /*flag to indicate whether to receive local messages
0140: *if this is set to false, the JChannel will not receive messages sent by itself*/
0141: private boolean receive_local_msgs = true;
0142: /*flag to indicate whether the channel will reconnect (reopen) when the exit message is received*/
0143: private boolean auto_reconnect = false;
0144: /*flag t indicate whether the state is supposed to be retrieved after the channel is reconnected
0145: *setting this to true, automatically forces auto_reconnect to true*/
0146: private boolean auto_getstate = false;
0147: /*channel connected flag*/
0148: protected boolean connected = false;
0149:
0150: /*channel closed flag*/
0151: protected boolean closed = false; // close() has been called, channel is unusable
0152:
0153: /** True if a state transfer protocol is available, false otherwise */
0154: private boolean state_transfer_supported = false; // set by CONFIG event from STATE_TRANSFER protocol
0155:
0156: /** True if a flush protocol is available, false otherwise */
0157: private volatile boolean flush_supported = false; // set by CONFIG event from FLUSH protocol
0158:
0159: /** Used to maintain additional data across channel disconnects/reconnects. This is a kludge and will be remove
0160: * as soon as JGroups supports logical addresses
0161: */
0162: private byte[] additional_data = null;
0163:
0164: protected final Log log = LogFactory.getLog(getClass());
0165:
0166: /** Collect statistics */
0167: protected boolean stats = true;
0168:
0169: protected long sent_msgs = 0, received_msgs = 0, sent_bytes = 0,
0170: received_bytes = 0;
0171:
0172: /** Used by subclass to create a JChannel without a protocol stack, don't use as application programmer */
0173: protected JChannel(boolean no_op) {
0174: ;
0175: }
0176:
0177: /**
0178: * Constructs a <code>JChannel</code> instance with the protocol stack
0179: * specified by the <code>DEFAULT_PROTOCOL_STACK</code> member.
0180: *
0181: * @throws ChannelException if problems occur during the initialization of
0182: * the protocol stack.
0183: */
0184: public JChannel() throws ChannelException {
0185: this (DEFAULT_PROTOCOL_STACK);
0186: }
0187:
0188: /**
0189: * Constructs a <code>JChannel</code> instance with the protocol stack
0190: * configuration contained by the specified file.
0191: *
0192: * @param properties a file containing a JGroups XML protocol stack
0193: * configuration.
0194: *
0195: * @throws ChannelException if problems occur during the configuration or
0196: * initialization of the protocol stack.
0197: */
0198: public JChannel(File properties) throws ChannelException {
0199: this (ConfiguratorFactory.getStackConfigurator(properties));
0200: }
0201:
0202: /**
0203: * Constructs a <code>JChannel</code> instance with the protocol stack
0204: * configuration contained by the specified XML element.
0205: *
0206: * @param properties a XML element containing a JGroups XML protocol stack
0207: * configuration.
0208: *
0209: * @throws ChannelException if problems occur during the configuration or
0210: * initialization of the protocol stack.
0211: */
0212: public JChannel(Element properties) throws ChannelException {
0213: this (ConfiguratorFactory.getStackConfigurator(properties));
0214: }
0215:
0216: /**
0217: * Constructs a <code>JChannel</code> instance with the protocol stack
0218: * configuration indicated by the specified URL.
0219: *
0220: * @param properties a URL pointing to a JGroups XML protocol stack
0221: * configuration.
0222: *
0223: * @throws ChannelException if problems occur during the configuration or
0224: * initialization of the protocol stack.
0225: */
0226: public JChannel(URL properties) throws ChannelException {
0227: this (ConfiguratorFactory.getStackConfigurator(properties));
0228: }
0229:
0230: /**
0231: * Constructs a <code>JChannel</code> instance with the protocol stack
0232: * configuration based upon the specified properties parameter.
0233: *
0234: * @param properties an old style property string, a string representing a
0235: * system resource containing a JGroups XML configuration,
0236: * a string representing a URL pointing to a JGroups XML
0237: * XML configuration, or a string representing a file name
0238: * that contains a JGroups XML configuration.
0239: *
0240: * @throws ChannelException if problems occur during the configuration and
0241: * initialization of the protocol stack.
0242: */
0243: public JChannel(String properties) throws ChannelException {
0244: this (ConfiguratorFactory.getStackConfigurator(properties));
0245: }
0246:
0247: /**
0248: * Constructs a <code>JChannel</code> instance with the protocol stack
0249: * configuration contained by the protocol stack configurator parameter.
0250: * <p>
0251: * All of the public constructors of this class eventually delegate to this
0252: * method.
0253: *
0254: * @param configurator a protocol stack configurator containing a JGroups
0255: * protocol stack configuration.
0256: *
0257: * @throws ChannelException if problems occur during the initialization of
0258: * the protocol stack.
0259: */
0260: protected JChannel(ProtocolStackConfigurator configurator)
0261: throws ChannelException {
0262: init(configurator);
0263: }
0264:
0265: /**
0266: * Creates a new JChannel with the protocol stack as defined in the properties
0267: * parameter. an example of this parameter is<BR>
0268: * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"<BR>
0269: * Other examples can be found in the ./conf directory<BR>
0270: * @param properties the protocol stack setup; if null, the default protocol stack will be used.
0271: * The properties can also be a java.net.URL object or a string that is a URL spec.
0272: * The JChannel will validate any URL object and String object to see if they are a URL.
0273: * In case of the parameter being a url, the JChannel will try to load the xml from there.
0274: * In case properties is a org.w3c.dom.Element, the ConfiguratorFactory will parse the
0275: * DOM tree with the element as its root element.
0276: * @deprecated Use the constructors with specific parameter types instead.
0277: */
0278: public JChannel(Object properties) throws ChannelException {
0279: if (properties == null)
0280: properties = DEFAULT_PROTOCOL_STACK;
0281:
0282: ProtocolStackConfigurator c = null;
0283:
0284: try {
0285: c = ConfiguratorFactory.getStackConfigurator(properties);
0286: } catch (Exception x) {
0287: throw new ChannelException("unable to load protocol stack",
0288: x);
0289: }
0290: init(c);
0291: }
0292:
0293: /**
0294: * Returns the protocol stack.
0295: * Currently used by Debugger.
0296: * Specific to JChannel, therefore
0297: * not visible in Channel
0298: */
0299: public ProtocolStack getProtocolStack() {
0300: return prot_stack;
0301: }
0302:
0303: protected Log getLog() {
0304: return log;
0305: }
0306:
0307: /**
0308: * returns the protocol stack configuration in string format.
0309: * an example of this property is<BR>
0310: * "UDP:PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE"
0311: */
0312: public String getProperties() {
0313: return props;
0314: }
0315:
0316: public boolean statsEnabled() {
0317: return stats;
0318: }
0319:
0320: public void enableStats(boolean stats) {
0321: this .stats = stats;
0322: }
0323:
0324: public void resetStats() {
0325: sent_msgs = received_msgs = sent_bytes = received_bytes = 0;
0326: }
0327:
0328: public long getSentMessages() {
0329: return sent_msgs;
0330: }
0331:
0332: public long getSentBytes() {
0333: return sent_bytes;
0334: }
0335:
0336: public long getReceivedMessages() {
0337: return received_msgs;
0338: }
0339:
0340: public long getReceivedBytes() {
0341: return received_bytes;
0342: }
0343:
0344: public int getNumberOfTasksInTimer() {
0345: return prot_stack != null ? prot_stack.timer.size() : -1;
0346: }
0347:
0348: public String dumpTimerQueue() {
0349: return prot_stack != null ? prot_stack.dumpTimerQueue()
0350: : "<n/a";
0351: }
0352:
0353: /**
0354: * Returns a pretty-printed form of all the protocols. If include_properties is set,
0355: * the properties for each protocol will also be printed.
0356: */
0357: public String printProtocolSpec(boolean include_properties) {
0358: return prot_stack != null ? prot_stack
0359: .printProtocolSpec(include_properties) : null;
0360: }
0361:
0362: /**
0363: * Connects the channel to a group.
0364: * If the channel is already connected, an error message will be printed to the error log.
0365: * If the channel is closed a ChannelClosed exception will be thrown.
0366: * This method starts the protocol stack by calling ProtocolStack.start,
0367: * then it sends an Event.CONNECT event down the stack and waits to receive a CONNECT_OK event.
0368: * Once the CONNECT_OK event arrives from the protocol stack, any channel listeners are notified
0369: * and the channel is considered connected.
0370: *
0371: * @param cluster_name A <code>String</code> denoting the group name. Cannot be null.
0372: * @exception ChannelException The protocol stack cannot be started
0373: * @exception ChannelClosedException The channel is closed and therefore cannot be used any longer.
0374: * A new channel has to be created first.
0375: */
0376: public synchronized void connect(String cluster_name)
0377: throws ChannelException, ChannelClosedException {
0378: /*make sure the channel is not closed*/
0379: checkClosed();
0380:
0381: /*if we already are connected, then ignore this*/
0382: if (connected) {
0383: if (log.isTraceEnabled())
0384: log.trace("already connected to " + cluster_name);
0385: return;
0386: }
0387:
0388: /*make sure we have a valid channel name*/
0389: if (cluster_name == null) {
0390: if (log.isInfoEnabled())
0391: log
0392: .info("cluster_name is null, assuming unicast channel");
0393: } else
0394: this .cluster_name = cluster_name;
0395:
0396: try {
0397: prot_stack.startStack(); // calls start() in all protocols, from top to bottom
0398: } catch (Throwable e) {
0399: throw new ChannelException(
0400: "failed to start protocol stack", e);
0401: }
0402:
0403: String tmp = Util.getProperty(
0404: new String[] { Global.CHANNEL_LOCAL_ADDR_TIMEOUT,
0405: "local_addr.timeout" }, null, null, false,
0406: "30000");
0407: LOCAL_ADDR_TIMEOUT = Long.parseLong(tmp);
0408:
0409: /* Wait LOCAL_ADDR_TIMEOUT milliseconds for local_addr to have a non-null value (set by SET_LOCAL_ADDRESS) */
0410: local_addr = (Address) local_addr_promise
0411: .getResult(LOCAL_ADDR_TIMEOUT);
0412: if (local_addr == null) {
0413: log.fatal("local_addr is null; cannot connect");
0414: throw new ChannelException("local_addr is null");
0415: }
0416:
0417: /*create a temporary view, assume this channel is the only member and
0418: *is the coordinator*/
0419: Vector t = new Vector(1);
0420: t.addElement(local_addr);
0421: my_view = new View(local_addr, 0, t); // create a dummy view
0422:
0423: // only connect if we are not a unicast channel
0424: if (cluster_name != null) {
0425: connect_promise.reset();
0426:
0427: if (flush_supported)
0428: flush_unblock_promise.reset();
0429:
0430: Event connect_event = new Event(Event.CONNECT, cluster_name);
0431: down(connect_event);
0432:
0433: Object res = connect_promise.getResult(); // waits forever until connected (or channel is closed)
0434: if (res != null && res instanceof Exception) { // the JOIN was rejected by the coordinator
0435: throw new ChannelException("connect() failed",
0436: (Throwable) res);
0437: }
0438:
0439: //if FLUSH is used do not return from connect() until UNBLOCK event is received
0440: boolean singletonMember = my_view != null
0441: && my_view.size() == 1;
0442: boolean shouldWaitForUnblock = flush_supported
0443: && receive_blocks && !singletonMember
0444: && !flush_unblock_promise.hasResult();
0445: if (shouldWaitForUnblock) {
0446: try {
0447: flush_unblock_promise
0448: .getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
0449: } catch (TimeoutException te) {
0450: if (log.isWarnEnabled())
0451: log
0452: .warn("waiting on UNBLOCK after connect timed out");
0453: }
0454: }
0455: }
0456: connected = true;
0457: notifyChannelConnected(this );
0458: }
0459:
0460: public synchronized boolean connect(String cluster_name,
0461: Address target, String state_id, long timeout)
0462: throws ChannelException {
0463: throw new UnsupportedOperationException("not yet implemented");
0464: }
0465:
0466: /**
0467: * Disconnects the channel if it is connected. If the channel is closed, this operation is ignored<BR>
0468: * Otherwise the following actions happen in the listed order<BR>
0469: * <ol>
0470: * <li> The JChannel sends a DISCONNECT event down the protocol stack<BR>
0471: * <li> Blocks until the channel to receives a DISCONNECT_OK event<BR>
0472: * <li> Sends a STOP_QUEING event down the stack<BR>
0473: * <li> Stops the protocol stack by calling ProtocolStack.stop()<BR>
0474: * <li> Notifies the listener, if the listener is available<BR>
0475: * </ol>
0476: */
0477: public synchronized void disconnect() {
0478: if (closed)
0479: return;
0480:
0481: if (connected) {
0482:
0483: if (cluster_name != null) {
0484:
0485: /* Send down a DISCONNECT event. The DISCONNECT event travels down to the GMS, where a
0486: * DISCONNECT_OK response is generated and sent up the stack. JChannel blocks until a
0487: * DISCONNECT_OK has been received, or until timeout has elapsed.
0488: */
0489: Event disconnect_event = new Event(Event.DISCONNECT,
0490: local_addr);
0491: disconnect_promise.reset();
0492: down(disconnect_event); // DISCONNECT is handled by each layer
0493: disconnect_promise.getResult(); // wait for DISCONNECT_OK
0494: }
0495:
0496: // Just in case we use the QUEUE protocol and it is still blocked...
0497: down(new Event(Event.STOP_QUEUEING));
0498:
0499: connected = false;
0500: try {
0501: prot_stack.stopStack(); // calls stop() in all protocols, from top to bottom
0502: } catch (Exception e) {
0503: if (log.isErrorEnabled())
0504: log.error("exception: " + e);
0505: }
0506: notifyChannelDisconnected(this );
0507: init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
0508: }
0509: }
0510:
0511: /**
0512: * Destroys the channel.
0513: * After this method has been called, the channel us unusable.<BR>
0514: * This operation will disconnect the channel and close the channel receive queue immediately<BR>
0515: */
0516: public synchronized void close() {
0517: _close(true, true); // by default disconnect before closing channel and close mq
0518: }
0519:
0520: /** Shuts down the channel without disconnecting */
0521: public synchronized void shutdown() {
0522: _close(false, true); // by default disconnect before closing channel and close mq
0523: }
0524:
0525: /**
0526: * Opens the channel.
0527: * This does the following actions:
0528: * <ol>
0529: * <li> Resets the receiver queue by calling Queue.reset
0530: * <li> Sets up the protocol stack by calling ProtocolStack.setup
0531: * <li> Sets the closed flag to false
0532: * </ol>
0533: */
0534: public synchronized void open() throws ChannelException {
0535: if (!closed)
0536: throw new ChannelException("channel is already open");
0537:
0538: try {
0539: mq.reset();
0540:
0541: // new stack is created on open() - bela June 12 2003
0542: prot_stack = new ProtocolStack(this , props);
0543: prot_stack.setup();
0544: closed = false;
0545: } catch (Exception e) {
0546: throw new ChannelException("failed to open channel", e);
0547: }
0548: }
0549:
0550: /**
0551: * returns true if the Open operation has been called successfully
0552: */
0553: public boolean isOpen() {
0554: return !closed;
0555: }
0556:
0557: /**
0558: * returns true if the Connect operation has been called successfully
0559: */
0560: public boolean isConnected() {
0561: return connected;
0562: }
0563:
0564: public int getNumMessages() {
0565: return mq != null ? mq.size() : -1;
0566: }
0567:
0568: public String dumpQueue() {
0569: return Util.dumpQueue(mq);
0570: }
0571:
0572: /**
0573: * Returns a map of statistics of the various protocols and of the channel itself.
0574: * @return Map<String,Map>. A map where the keys are the protocols ("channel" pseudo key is
0575: * used for the channel itself") and the values are property maps.
0576: */
0577: public Map dumpStats() {
0578: Map retval = prot_stack.dumpStats();
0579: if (retval != null) {
0580: Map tmp = dumpChannelStats();
0581: if (tmp != null)
0582: retval.put("channel", tmp);
0583: }
0584: return retval;
0585: }
0586:
0587: private Map dumpChannelStats() {
0588: Map retval = new HashMap();
0589: retval.put("sent_msgs", new Long(sent_msgs));
0590: retval.put("sent_bytes", new Long(sent_bytes));
0591: retval.put("received_msgs", new Long(received_msgs));
0592: retval.put("received_bytes", new Long(received_bytes));
0593: return retval;
0594: }
0595:
0596: /**
0597: * Sends a message through the protocol stack.
0598: * Implements the Transport interface.
0599: *
0600: * @param msg the message to be sent through the protocol stack,
0601: * the destination of the message is specified inside the message itself
0602: * @exception ChannelNotConnectedException
0603: * @exception ChannelClosedException
0604: */
0605: public void send(Message msg) throws ChannelNotConnectedException,
0606: ChannelClosedException {
0607: checkClosed();
0608: checkNotConnected();
0609: if (stats) {
0610: sent_msgs++;
0611: sent_bytes += msg.getLength();
0612: }
0613: if (msg == null)
0614: throw new NullPointerException("msg is null");
0615: down(new Event(Event.MSG, msg));
0616: }
0617:
0618: /**
0619: * creates a new message with the destination address, and the source address
0620: * and the object as the message value
0621: * @param dst - the destination address of the message, null for all members
0622: * @param src - the source address of the message
0623: * @param obj - the value of the message
0624: * @exception ChannelNotConnectedException
0625: * @exception ChannelClosedException
0626: * @see JChannel#send
0627: */
0628: public void send(Address dst, Address src, Serializable obj)
0629: throws ChannelNotConnectedException, ChannelClosedException {
0630: send(new Message(dst, src, obj));
0631: }
0632:
0633: /**
0634: * Blocking receive method.
0635: * This method returns the object that was first received by this JChannel and that has not been
0636: * received before. After the object is received, it is removed from the receive queue.<BR>
0637: * If you only want to inspect the object received without removing it from the queue call
0638: * JChannel.peek<BR>
0639: * If no messages are in the receive queue, this method blocks until a message is added or the operation times out<BR>
0640: * By specifying a timeout of 0, the operation blocks forever, or until a message has been received.
0641: * @param timeout the number of milliseconds to wait if the receive queue is empty. 0 means wait forever
0642: * @exception TimeoutException if a timeout occured prior to a new message was received
0643: * @exception ChannelNotConnectedException
0644: * @exception ChannelClosedException
0645: * @see JChannel#peek
0646: */
0647: public Object receive(long timeout)
0648: throws ChannelNotConnectedException,
0649: ChannelClosedException, TimeoutException {
0650:
0651: checkClosed();
0652: checkNotConnected();
0653:
0654: try {
0655: Event evt = (timeout <= 0) ? (Event) mq.remove()
0656: : (Event) mq.remove(timeout);
0657: Object retval = getEvent(evt);
0658: evt = null;
0659: if (stats) {
0660: if (retval != null && retval instanceof Message) {
0661: received_msgs++;
0662: received_bytes += ((Message) retval).getLength();
0663: }
0664: }
0665: return retval;
0666: } catch (QueueClosedException queue_closed) {
0667: throw new ChannelClosedException();
0668: } catch (TimeoutException t) {
0669: throw t;
0670: } catch (Exception e) {
0671: if (log.isErrorEnabled())
0672: log.error("exception: " + e);
0673: return null;
0674: }
0675: }
0676:
0677: /**
0678: * Just peeks at the next message, view or block. Does <em>not</em> install
0679: * new view if view is received<BR>
0680: * Does the same thing as JChannel.receive but doesn't remove the object from the
0681: * receiver queue
0682: */
0683: public Object peek(long timeout)
0684: throws ChannelNotConnectedException,
0685: ChannelClosedException, TimeoutException {
0686:
0687: checkClosed();
0688: checkNotConnected();
0689:
0690: try {
0691: Event evt = (timeout <= 0) ? (Event) mq.peek() : (Event) mq
0692: .peek(timeout);
0693: Object retval = getEvent(evt);
0694: evt = null;
0695: return retval;
0696: } catch (QueueClosedException queue_closed) {
0697: if (log.isErrorEnabled())
0698: log.error("exception: " + queue_closed);
0699: return null;
0700: } catch (TimeoutException t) {
0701: return null;
0702: } catch (Exception e) {
0703: if (log.isErrorEnabled())
0704: log.error("exception: " + e);
0705: return null;
0706: }
0707: }
0708:
0709: /**
0710: * Returns the current view.
0711: * <BR>
0712: * If the channel is not connected or if it is closed it will return null.
0713: * <BR>
0714: * @return returns the current group view, or null if the channel is closed or disconnected
0715: */
0716: public View getView() {
0717: return closed || !connected ? null : my_view;
0718: }
0719:
0720: /**
0721: * returns the local address of the channel
0722: * returns null if the channel is closed
0723: */
0724: public Address getLocalAddress() {
0725: return closed ? null : local_addr;
0726: }
0727:
0728: /**
0729: * returns the name of the channel
0730: * if the channel is not connected or if it is closed it will return null
0731: * @deprecated Use {@link #getClusterName()} instead
0732: */
0733: public String getChannelName() {
0734: return closed ? null : !connected ? null : cluster_name;
0735: }
0736:
0737: public String getClusterName() {
0738: return cluster_name;
0739: }
0740:
0741: /**
0742: * Sets a channel option. The options can be one of the following:
0743: * <UL>
0744: * <LI> Channel.BLOCK
0745: * <LI> Channel.LOCAL
0746: * <LI> Channel.AUTO_RECONNECT
0747: * <LI> Channel.AUTO_GETSTATE
0748: * </UL>
0749: * <P>
0750: * There are certain dependencies between the options that you can set,
0751: * I will try to describe them here.
0752: * <P>
0753: * Option: Channel.BLOCK<BR>
0754: * Value: java.lang.Boolean<BR>
0755: * Result: set to true will set setOpt(VIEW, true) and the JChannel will receive BLOCKS and VIEW events<BR>
0756: *<BR>
0757: * Option: LOCAL<BR>
0758: * Value: java.lang.Boolean<BR>
0759: * Result: set to true the JChannel will receive messages that it self sent out.<BR>
0760: *<BR>
0761: * Option: AUTO_RECONNECT<BR>
0762: * Value: java.lang.Boolean<BR>
0763: * Result: set to true and the JChannel will try to reconnect when it is being closed<BR>
0764: *<BR>
0765: * Option: AUTO_GETSTATE<BR>
0766: * Value: java.lang.Boolean<BR>
0767: * Result: set to true, the AUTO_RECONNECT will be set to true and the JChannel will try to get the state after a close and reconnect happens<BR>
0768: * <BR>
0769: *
0770: * @param option the parameter option Channel.VIEW, Channel.SUSPECT, etc
0771: * @param value the value to set for this option
0772: *
0773: */
0774: public void setOpt(int option, Object value) {
0775: if (closed) {
0776: if (log.isWarnEnabled())
0777: log.warn("channel is closed; option not set !");
0778: return;
0779: }
0780:
0781: switch (option) {
0782: case VIEW:
0783: if (log.isWarnEnabled())
0784: log
0785: .warn("option VIEW has been deprecated (it is always true now); this option is ignored");
0786: break;
0787: case SUSPECT:
0788: if (log.isWarnEnabled())
0789: log
0790: .warn("option SUSPECT has been deprecated (it is always true now); this option is ignored");
0791: break;
0792: case BLOCK:
0793: if (value instanceof Boolean)
0794: receive_blocks = ((Boolean) value).booleanValue();
0795: else if (log.isErrorEnabled())
0796: log.error("option " + Channel.option2String(option)
0797: + " (" + value + "): value has to be Boolean");
0798: break;
0799:
0800: case GET_STATE_EVENTS:
0801: if (log.isWarnEnabled())
0802: log
0803: .warn("option GET_STATE_EVENTS has been deprecated (it is always true now); this option is ignored");
0804: break;
0805:
0806: case LOCAL:
0807: if (value instanceof Boolean)
0808: receive_local_msgs = ((Boolean) value).booleanValue();
0809: else if (log.isErrorEnabled())
0810: log.error("option " + Channel.option2String(option)
0811: + " (" + value + "): value has to be Boolean");
0812: break;
0813:
0814: case AUTO_RECONNECT:
0815: if (value instanceof Boolean)
0816: auto_reconnect = ((Boolean) value).booleanValue();
0817: else if (log.isErrorEnabled())
0818: log.error("option " + Channel.option2String(option)
0819: + " (" + value + "): value has to be Boolean");
0820: break;
0821:
0822: case AUTO_GETSTATE:
0823: if (value instanceof Boolean) {
0824: auto_getstate = ((Boolean) value).booleanValue();
0825: if (auto_getstate)
0826: auto_reconnect = true;
0827: } else if (log.isErrorEnabled())
0828: log.error("option " + Channel.option2String(option)
0829: + " (" + value + "): value has to be Boolean");
0830: break;
0831:
0832: default:
0833: if (log.isErrorEnabled())
0834: log.error("option " + Channel.option2String(option)
0835: + " not known");
0836: break;
0837: }
0838: }
0839:
0840: /**
0841: * returns the value of an option.
0842: * @param option the option you want to see the value for
0843: * @return the object value, in most cases java.lang.Boolean
0844: * @see JChannel#setOpt
0845: */
0846: public Object getOpt(int option) {
0847: switch (option) {
0848: case VIEW:
0849: return Boolean.TRUE;
0850: case BLOCK:
0851: return receive_blocks ? Boolean.TRUE : Boolean.FALSE;
0852: case SUSPECT:
0853: return Boolean.TRUE;
0854: case AUTO_RECONNECT:
0855: return auto_reconnect ? Boolean.TRUE : Boolean.FALSE;
0856: case AUTO_GETSTATE:
0857: return auto_getstate ? Boolean.TRUE : Boolean.FALSE;
0858: case GET_STATE_EVENTS:
0859: return Boolean.TRUE;
0860: case LOCAL:
0861: return receive_local_msgs ? Boolean.TRUE : Boolean.FALSE;
0862: default:
0863: if (log.isErrorEnabled())
0864: log.error("option " + Channel.option2String(option)
0865: + " not known");
0866: return null;
0867: }
0868: }
0869:
0870: /**
0871: * Called to acknowledge a block() (callback in <code>MembershipListener</code> or
0872: * <code>BlockEvent</code> received from call to <code>receive()</code>).
0873: * After sending blockOk(), no messages should be sent until a new view has been received.
0874: * Calling this method on a closed channel has no effect.
0875: */
0876: public void blockOk() {
0877: down(new Event(Event.BLOCK_OK));
0878: down(new Event(Event.START_QUEUEING));
0879: }
0880:
0881: /**
0882: * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
0883: * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
0884: * milliseconds have elapsed. The argument of GET_STATE_OK should be a single object.
0885: * @param target the target member to receive the state from. if null, state is retrieved from coordinator
0886: * @param timeout the number of milliseconds to wait for the operation to complete successfully. 0 waits until
0887: * the state has been received
0888: * @return true of the state was received, false if the operation timed out
0889: */
0890: public boolean getState(Address target, long timeout)
0891: throws ChannelNotConnectedException, ChannelClosedException {
0892: return getState(target, null, timeout);
0893: }
0894:
0895: /**
0896: * Retrieves a substate (or partial state) from the target.
0897: * @param target State provider. If null, coordinator is used
0898: * @param state_id The ID of the substate. If null, the entire state will be transferred
0899: * @param timeout the number of milliseconds to wait for the operation to complete successfully. 0 waits until
0900: * the state has been received
0901: * @return
0902: * @throws ChannelNotConnectedException
0903: * @throws ChannelClosedException
0904: */
0905: public boolean getState(Address target, String state_id,
0906: long timeout) throws ChannelNotConnectedException,
0907: ChannelClosedException {
0908: if (target == null)
0909: target = determineCoordinator();
0910: if (target != null && local_addr != null
0911: && target.equals(local_addr)) {
0912: if (log.isTraceEnabled())
0913: log.trace("cannot get state from myself (" + target
0914: + "): probably the first member");
0915: return false;
0916: }
0917:
0918: StateTransferInfo info = new StateTransferInfo(target,
0919: state_id, timeout);
0920: boolean rc = _getState(new Event(Event.GET_STATE, info), info);
0921: if (rc == false)
0922: down(new Event(Event.RESUME_STABLE));
0923: return rc;
0924: }
0925:
0926: /**
0927: * Retrieves the current group state. Sends GET_STATE event down to STATE_TRANSFER layer.
0928: * Blocks until STATE_TRANSFER sends up a GET_STATE_OK event or until <code>timeout</code>
0929: * milliseconds have elapsed. The argument of GET_STATE_OK should be a vector of objects.
0930: * @param targets - the target members to receive the state from ( an Address list )
0931: * @param timeout - the number of milliseconds to wait for the operation to complete successfully
0932: * @return true of the state was received, false if the operation timed out
0933: * @deprecated Not really needed - we always want to get the state from a single member,
0934: * use {@link #getState(org.jgroups.Address, long)} instead
0935: */
0936: public boolean getAllStates(Vector targets, long timeout)
0937: throws ChannelNotConnectedException, ChannelClosedException {
0938: throw new UnsupportedOperationException(
0939: "use getState() instead");
0940: }
0941:
0942: /**
0943: * Called by the application is response to receiving a <code>getState()</code> object when
0944: * calling <code>receive()</code>.
0945: * When the application receives a getState() message on the receive() method,
0946: * it should call returnState() to reply with the state of the application
0947: * @param state The state of the application as a byte buffer
0948: * (to send over the network).
0949: */
0950: public void returnState(byte[] state) {
0951: StateTransferInfo info = new StateTransferInfo(null, null, 0L,
0952: state);
0953: down(new Event(Event.GET_APPLSTATE_OK, info));
0954: }
0955:
0956: /**
0957: * Returns a substate as indicated by state_id
0958: * @param state
0959: * @param state_id
0960: */
0961: public void returnState(byte[] state, String state_id) {
0962: StateTransferInfo info = new StateTransferInfo(null, state_id,
0963: 0L, state);
0964: down(new Event(Event.GET_APPLSTATE_OK, info));
0965: }
0966:
0967: /**
0968: * Callback method <BR>
0969: * Called by the ProtocolStack when a message is received.
0970: * It will be added to the message queue from which subsequent
0971: * <code>Receive</code>s will dequeue it.
0972: * @param evt the event carrying the message from the protocol stack
0973: */
0974: public void up(Event evt) {
0975: int type = evt.getType();
0976: Message msg;
0977:
0978: switch (type) {
0979:
0980: case Event.MSG:
0981: msg = (Message) evt.getArg();
0982: if (!receive_local_msgs) { // discard local messages (sent by myself to me)
0983: if (local_addr != null && msg.getSrc() != null)
0984: if (local_addr.equals(msg.getSrc()))
0985: return;
0986: }
0987: break;
0988:
0989: case Event.VIEW_CHANGE:
0990: View tmp = (View) evt.getArg();
0991: if (tmp instanceof MergeView)
0992: my_view = new View(tmp.getVid(), tmp.getMembers());
0993: else
0994: my_view = tmp;
0995:
0996: /*
0997: * Bela&Vladimir Oct 27th,2006 (JGroups 2.4)- we need to switch to
0998: * connected=true because client can invoke channel.getView() in
0999: * viewAccepted() callback invoked on this thread
1000: * (see Event.VIEW_CHANGE handling below)
1001: *
1002: * We do not set connect_promise because we want to wait for
1003: * CONNECT_OK and then return from user's JChannel.connect() call.
1004: * This is important since we have to wait for Event.UNBLOCK after
1005: * CONNECT_OK if blocks are turned on. See JChannel.connect() for
1006: * details.
1007: *
1008: */
1009: if (connected == false) {
1010: connected = true;
1011: }
1012:
1013: // unblock queueing of messages due to previous BLOCK event:
1014: down(new Event(Event.STOP_QUEUEING));
1015: break;
1016:
1017: case Event.CONFIG:
1018: HashMap config = (HashMap) evt.getArg();
1019: if (config != null) {
1020: if (config.containsKey("state_transfer")) {
1021: state_transfer_supported = ((Boolean) config
1022: .get("state_transfer")).booleanValue();
1023: }
1024: if (config.containsKey("flush_supported")) {
1025: flush_supported = ((Boolean) config
1026: .get("flush_supported")).booleanValue();
1027: }
1028: }
1029: break;
1030:
1031: case Event.CONNECT_OK:
1032: connect_promise.setResult(evt.getArg());
1033: break;
1034:
1035: case Event.SUSPEND_OK:
1036: flush_promise.setResult(Boolean.TRUE);
1037: break;
1038:
1039: case Event.DISCONNECT_OK:
1040: disconnect_promise.setResult(Boolean.TRUE);
1041: break;
1042:
1043: case Event.GET_STATE_OK:
1044: StateTransferInfo info = (StateTransferInfo) evt.getArg();
1045: byte[] state = info.state;
1046:
1047: state_promise.setResult(state != null ? Boolean.TRUE
1048: : Boolean.FALSE);
1049: if (up_handler != null) {
1050: up_handler.up(evt);
1051: return;
1052: }
1053:
1054: if (state != null) {
1055: String state_id = info.state_id;
1056: if (receiver != null) {
1057: if (receiver instanceof ExtendedReceiver
1058: && state_id != null)
1059: ((ExtendedReceiver) receiver).setState(
1060: state_id, state);
1061: else
1062: receiver.setState(state);
1063: } else {
1064: try {
1065: mq.add(new Event(Event.STATE_RECEIVED, info));
1066: } catch (Exception e) {
1067: }
1068: }
1069: }
1070: break;
1071:
1072: case Event.STATE_TRANSFER_INPUTSTREAM:
1073: StateTransferInfo sti = (StateTransferInfo) evt.getArg();
1074: InputStream is = sti.inputStream;
1075: //Oct 13,2006 moved to down() when Event.STATE_TRANSFER_INPUTSTREAM_CLOSED is received
1076: //state_promise.setResult(is != null? Boolean.TRUE : Boolean.FALSE);
1077:
1078: if (up_handler != null) {
1079: up_handler.up(evt);
1080: return;
1081: }
1082:
1083: if (is != null) {
1084: if (receiver instanceof ExtendedReceiver) {
1085: if (sti.state_id == null)
1086: ((ExtendedReceiver) receiver).setState(is);
1087: else
1088: ((ExtendedReceiver) receiver).setState(
1089: sti.state_id, is);
1090: } else {
1091: try {
1092: mq.add(new Event(
1093: Event.STATE_TRANSFER_INPUTSTREAM, sti));
1094: } catch (Exception e) {
1095: }
1096: }
1097: }
1098: break;
1099:
1100: case Event.SET_LOCAL_ADDRESS:
1101: local_addr_promise.setResult(evt.getArg());
1102: break;
1103:
1104: case Event.EXIT:
1105: handleExit(evt);
1106: return; // no need to pass event up; already done in handleExit()
1107:
1108: default:
1109: break;
1110: }
1111:
1112: // If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
1113: if (up_handler != null) {
1114: up_handler.up(evt);
1115:
1116: if (type == Event.UNBLOCK) {
1117: flush_unblock_promise.setResult(Boolean.TRUE);
1118: }
1119: return;
1120: }
1121:
1122: switch (type) {
1123: case Event.MSG:
1124: if (receiver != null) {
1125: receiver.receive((Message) evt.getArg());
1126: return;
1127: }
1128: break;
1129: case Event.VIEW_CHANGE:
1130: if (receiver != null) {
1131: receiver.viewAccepted((View) evt.getArg());
1132: return;
1133: }
1134: break;
1135: case Event.SUSPECT:
1136: if (receiver != null) {
1137: receiver.suspect((Address) evt.getArg());
1138: return;
1139: }
1140: break;
1141: case Event.GET_APPLSTATE:
1142: if (receiver != null) {
1143: StateTransferInfo info = (StateTransferInfo) evt
1144: .getArg();
1145: byte[] tmp_state;
1146: String state_id = info.state_id;
1147: if (receiver instanceof ExtendedReceiver
1148: && state_id != null) {
1149: tmp_state = ((ExtendedReceiver) receiver)
1150: .getState(state_id);
1151: } else {
1152: tmp_state = receiver.getState();
1153: }
1154: returnState(tmp_state, state_id);
1155: return;
1156: }
1157: break;
1158: case Event.STATE_TRANSFER_OUTPUTSTREAM:
1159: if (receiver != null) {
1160: StateTransferInfo sti = (StateTransferInfo) evt
1161: .getArg();
1162: OutputStream os = sti.outputStream;
1163: if (os != null && receiver instanceof ExtendedReceiver) {
1164: if (sti.state_id == null)
1165: ((ExtendedReceiver) receiver).getState(os);
1166: else
1167: ((ExtendedReceiver) receiver).getState(
1168: sti.state_id, os);
1169: }
1170: return;
1171: }
1172: break;
1173:
1174: case Event.BLOCK:
1175: if (!receive_blocks) { // discard if client has not set 'receiving blocks' to 'on'
1176: down(new Event(Event.BLOCK_OK));
1177: down(new Event(Event.START_QUEUEING));
1178: return;
1179: }
1180:
1181: if (receiver != null) {
1182: try {
1183: receiver.block();
1184: } catch (Throwable t) {
1185: if (log.isErrorEnabled())
1186: log.error("failed calling block() on Receiver",
1187: t);
1188: } finally {
1189: blockOk();
1190: }
1191: return;
1192: }
1193: break;
1194: case Event.UNBLOCK:
1195: //discard if client has not set 'receiving blocks' to 'on'
1196: if (!receive_blocks) {
1197: return;
1198: }
1199: if (receiver instanceof ExtendedReceiver) {
1200: try {
1201: ((ExtendedReceiver) receiver).unblock();
1202: } catch (Throwable t) {
1203: if (log.isErrorEnabled())
1204: log.error(
1205: "failed calling unblock() on Receiver",
1206: t);
1207: } finally {
1208: flush_unblock_promise.setResult(Boolean.TRUE);
1209: }
1210: return;
1211: }
1212: break;
1213: default:
1214: break;
1215: }
1216:
1217: if (type == Event.MSG || type == Event.VIEW_CHANGE
1218: || type == Event.SUSPECT || type == Event.GET_APPLSTATE
1219: || type == Event.STATE_TRANSFER_OUTPUTSTREAM
1220: || type == Event.BLOCK || type == Event.UNBLOCK) {
1221: try {
1222: mq.add(evt);
1223: } catch (Exception e) {
1224: if (log.isErrorEnabled())
1225: log.error("exception adding event " + evt
1226: + " to message queue", e);
1227: }
1228: }
1229: }
1230:
1231: /**
1232: * Sends a message through the protocol stack if the stack is available
1233: * @param evt the message to send down, encapsulated in an event
1234: */
1235: public void down(Event evt) {
1236: if (evt == null)
1237: return;
1238:
1239: // handle setting of additional data (kludge, will be removed soon)
1240: if (evt.getType() == Event.CONFIG) {
1241: try {
1242: Map m = (Map) evt.getArg();
1243: if (m != null && m.containsKey("additional_data")) {
1244: additional_data = (byte[]) m.get("additional_data");
1245: if (local_addr instanceof IpAddress)
1246: ((IpAddress) local_addr)
1247: .setAdditionalData(additional_data);
1248: }
1249: } catch (Throwable t) {
1250: if (log.isErrorEnabled())
1251: log
1252: .error("CONFIG event did not contain a hashmap: "
1253: + t);
1254: }
1255: }
1256:
1257: if (evt.getType() == Event.STATE_TRANSFER_INPUTSTREAM_CLOSED) {
1258: state_promise.setResult(Boolean.TRUE);
1259: }
1260:
1261: if (prot_stack != null)
1262: prot_stack.down(evt);
1263: else if (log.isErrorEnabled())
1264: log.error("no protocol stack available");
1265: }
1266:
1267: public String toString(boolean details) {
1268: StringBuffer sb = new StringBuffer();
1269: sb.append("local_addr=").append(local_addr).append('\n');
1270: sb.append("cluster_name=").append(cluster_name).append('\n');
1271: sb.append("my_view=").append(my_view).append('\n');
1272: sb.append("connected=").append(connected).append('\n');
1273: sb.append("closed=").append(closed).append('\n');
1274: if (mq != null)
1275: sb.append("incoming queue size=").append(mq.size()).append(
1276: '\n');
1277: if (details) {
1278: sb.append("receive_blocks=").append(receive_blocks).append(
1279: '\n');
1280: sb.append("receive_local_msgs=").append(receive_local_msgs)
1281: .append('\n');
1282: sb.append("auto_reconnect=").append(auto_reconnect).append(
1283: '\n');
1284: sb.append("auto_getstate=").append(auto_getstate).append(
1285: '\n');
1286: sb.append("state_transfer_supported=").append(
1287: state_transfer_supported).append('\n');
1288: sb.append("props=").append(props).append('\n');
1289: }
1290:
1291: return sb.toString();
1292: }
1293:
1294: /* ----------------------------------- Private Methods ------------------------------------- */
1295:
1296: protected final void init(ProtocolStackConfigurator configurator)
1297: throws ChannelException {
1298: if (log.isInfoEnabled())
1299: log.info("JGroups version: " + Version.description);
1300: ConfiguratorFactory.substituteVariables(configurator); // replace vars with system props
1301: props = configurator.getProtocolStackString();
1302: prot_stack = new ProtocolStack(this , props);
1303: try {
1304: prot_stack.setup(); // Setup protocol stack (create layers, queues between them
1305: } catch (Throwable e) {
1306: throw new ChannelException(
1307: "unable to setup the protocol stack", e);
1308: }
1309: }
1310:
1311: /**
1312: * Initializes all variables. Used after <tt>close()</tt> or <tt>disconnect()</tt>,
1313: * to be ready for new <tt>connect()</tt>
1314: */
1315: private void init() {
1316: local_addr = null;
1317: cluster_name = null;
1318: my_view = null;
1319:
1320: // changed by Bela Sept 25 2003
1321: //if(mq != null && mq.closed())
1322: // mq.reset();
1323:
1324: connect_promise.reset();
1325: disconnect_promise.reset();
1326: connected = false;
1327: }
1328:
1329: /**
1330: * health check.<BR>
1331: * throws a ChannelNotConnected exception if the channel is not connected
1332: */
1333: protected void checkNotConnected()
1334: throws ChannelNotConnectedException {
1335: if (!connected)
1336: throw new ChannelNotConnectedException();
1337: }
1338:
1339: /**
1340: * health check<BR>
1341: * throws a ChannelClosed exception if the channel is closed
1342: */
1343: protected void checkClosed() throws ChannelClosedException {
1344: if (closed)
1345: throw new ChannelClosedException();
1346: }
1347:
1348: /**
1349: * returns the value of the event<BR>
1350: * These objects will be returned<BR>
1351: * <PRE>
1352: * <B>Event Type - Return Type</B>
1353: * Event.MSG - returns a Message object
1354: * Event.VIEW_CHANGE - returns a View object
1355: * Event.SUSPECT - returns a SuspectEvent object
1356: * Event.BLOCK - returns a new BlockEvent object
1357: * Event.GET_APPLSTATE - returns a GetStateEvent object
1358: * Event.STATE_RECEIVED- returns a SetStateEvent object
1359: * Event.Exit - returns an ExitEvent object
1360: * All other - return the actual Event object
1361: * </PRE>
1362: * @param evt - the event of which you want to extract the value
1363: * @return the event value if it matches the select list,
1364: * returns null if the event is null
1365: * returns the event itself if a match (See above) can not be made of the event type
1366: */
1367: static Object getEvent(Event evt) {
1368: if (evt == null)
1369: return null; // correct ?
1370:
1371: switch (evt.getType()) {
1372: case Event.MSG:
1373: return evt.getArg();
1374: case Event.VIEW_CHANGE:
1375: return evt.getArg();
1376: case Event.SUSPECT:
1377: return new SuspectEvent(evt.getArg());
1378: case Event.BLOCK:
1379: return new BlockEvent();
1380: case Event.UNBLOCK:
1381: return new UnblockEvent();
1382: case Event.GET_APPLSTATE:
1383: StateTransferInfo info = (StateTransferInfo) evt.getArg();
1384: return new GetStateEvent(info.target, info.state_id);
1385: case Event.STATE_RECEIVED:
1386: info = (StateTransferInfo) evt.getArg();
1387: return new SetStateEvent(info.state, info.state_id);
1388: case Event.STATE_TRANSFER_OUTPUTSTREAM:
1389: info = (StateTransferInfo) evt.getArg();
1390: return new StreamingGetStateEvent(info.outputStream,
1391: info.state_id);
1392: case Event.STATE_TRANSFER_INPUTSTREAM:
1393: info = (StateTransferInfo) evt.getArg();
1394: return new StreamingSetStateEvent(info.inputStream,
1395: info.state_id);
1396: case Event.EXIT:
1397: return new ExitEvent();
1398: default:
1399: return evt;
1400: }
1401: }
1402:
1403: /**
1404: * Receives the state from the group and modifies the JChannel.state object<br>
1405: * This method initializes the local state variable to null, and then sends the state
1406: * event down the stack. It waits for a GET_STATE_OK event to bounce back
1407: * @param evt the get state event, has to be of type Event.GET_STATE
1408: * @param info Information about the state transfer, e.g. target member and timeout
1409: * @return true of the state was received, false if the operation timed out
1410: */
1411: private boolean _getState(Event evt, StateTransferInfo info)
1412: throws ChannelNotConnectedException, ChannelClosedException {
1413: checkClosed();
1414: checkNotConnected();
1415: if (!state_transfer_supported) {
1416: throw new IllegalStateException(
1417: "fetching state will fail as state transfer is not supported. "
1418: + "Add one of the STATE_TRANSFER protocols to your protocol configuration");
1419: }
1420:
1421: if (flush_supported)
1422: flush_unblock_promise.reset();
1423:
1424: state_promise.reset();
1425: down(evt);
1426: Boolean state_transfer_successfull = (Boolean) state_promise
1427: .getResult(info.timeout);
1428:
1429: //if FLUSH is used do not return from getState() until UNBLOCK event is received
1430: boolean shouldWaitForUnblock = flush_supported
1431: && receive_blocks;
1432: if (shouldWaitForUnblock) {
1433: try {
1434: flush_unblock_promise
1435: .getResultWithTimeout(FLUSH_UNBLOCK_TIMEOUT);
1436: } catch (TimeoutException te) {
1437: if (log.isWarnEnabled())
1438: log
1439: .warn("Waiting on UNBLOCK after getState timed out");
1440: }
1441: }
1442:
1443: return state_transfer_successfull != null
1444: && state_transfer_successfull.booleanValue();
1445: }
1446:
1447: /**
1448: * Disconnects and closes the channel.
1449: * This method does the folloing things
1450: * <ol>
1451: * <li>Calls <code>this.disconnect</code> if the disconnect parameter is true
1452: * <li>Calls <code>Queue.close</code> on mq if the close_mq parameter is true
1453: * <li>Calls <code>ProtocolStack.stop</code> on the protocol stack
1454: * <li>Calls <code>ProtocolStack.destroy</code> on the protocol stack
1455: * <li>Sets the channel closed and channel connected flags to true and false
1456: * <li>Notifies any channel listener of the channel close operation
1457: * </ol>
1458: */
1459: protected void _close(boolean disconnect, boolean close_mq) {
1460: if (closed)
1461: return;
1462:
1463: if (disconnect)
1464: disconnect(); // leave group if connected
1465:
1466: if (close_mq) {
1467: try {
1468: if (mq != null)
1469: mq.close(false); // closes and removes all messages
1470: } catch (Exception e) {
1471: if (log.isErrorEnabled())
1472: log.error("exception: " + e);
1473: }
1474: }
1475:
1476: if (prot_stack != null) {
1477: try {
1478: prot_stack.stopStack();
1479: prot_stack.destroy();
1480: } catch (Exception e) {
1481: if (log.isErrorEnabled())
1482: log
1483: .error(
1484: "failed destroying the protocol stack",
1485: e);
1486: }
1487: }
1488: closed = true;
1489: connected = false;
1490: notifyChannelClosed(this );
1491: init(); // sets local_addr=null; changed March 18 2003 (bela) -- prevented successful rejoining
1492: }
1493:
1494: public final void closeMessageQueue(boolean flush_entries) {
1495: if (mq != null)
1496: mq.close(flush_entries);
1497: }
1498:
1499: /**
1500: * Creates a separate thread to close the protocol stack.
1501: * This is needed because the thread that called JChannel.up() with the EXIT event would
1502: * hang waiting for up() to return, while up() actually tries to kill that very thread.
1503: * This way, we return immediately and allow the thread to terminate.
1504: */
1505: private void handleExit(Event evt) {
1506: notifyChannelShunned();
1507: if (closer != null && !closer.isAlive())
1508: closer = null;
1509: if (closer == null) {
1510: if (log.isInfoEnabled())
1511: log
1512: .info("received an EXIT event, will leave the channel");
1513: closer = new CloserThread(evt);
1514: closer.start();
1515: }
1516: }
1517:
1518: public boolean flushSupported() {
1519: return flush_supported;
1520: }
1521:
1522: /**
1523: * Will perform a flush of the system, ie. all pending messages are flushed out of the
1524: * system and all members ack their reception. After this call return, no member will
1525: * be sending any messages until {@link #stopFlush()} is called.
1526: * <p>
1527: *
1528: * In case of flush collisions random sleep time backoff algorithm is employed and
1529: * flush is reattempted for numberOfAttempts. Therefore this method is guaranteed
1530: * to return after timeout*numberOfAttempts miliseconds.
1531: *
1532: *
1533: * @param timeout
1534: * @param numberOfAttempts if flush was unsuccessful attempt again until numberOfAttempts is 0
1535: * @param automatic_resume Call {@link #stopFlush()} after the flush
1536: * @return true if FLUSH completed within the timeout
1537: */
1538: public boolean startFlush(long timeout, int numberOfAttempts,
1539: boolean automatic_resume) {
1540: if (!flush_supported) {
1541: throw new IllegalStateException(
1542: "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
1543: }
1544:
1545: boolean successfulFlush = false;
1546: flush_promise.reset();
1547: down(new Event(Event.SUSPEND));
1548: try {
1549: Boolean r = null;
1550: if (flush_promise.hasResult()) {
1551: r = (Boolean) flush_promise.getResult();
1552: successfulFlush = r.booleanValue();
1553: } else {
1554: r = (Boolean) flush_promise
1555: .getResultWithTimeout(timeout);
1556: successfulFlush = r.booleanValue();
1557: }
1558: } catch (TimeoutException e) {
1559: //it is normal to get timeouts - it is the final outcome that counts
1560: //we will just retry below
1561:
1562: if (log.isInfoEnabled())
1563: log
1564: .info("JChannel.startFlush requested by "
1565: + local_addr
1566: + " timed out waiting for flush responses after "
1567: + timeout + " msec");
1568: }
1569:
1570: if (!successfulFlush && numberOfAttempts > 0) {
1571: long backOffSleepTime = Util.random(5000);
1572: if (log.isInfoEnabled())
1573: log.info("Flush in progress detected at " + local_addr
1574: + ". Backing off for " + backOffSleepTime
1575: + " ms. Attempts left " + numberOfAttempts);
1576:
1577: Util.sleepRandom(backOffSleepTime);
1578: successfulFlush = startFlush(timeout, --numberOfAttempts,
1579: automatic_resume);
1580: }
1581:
1582: if (automatic_resume)
1583: stopFlush();
1584:
1585: return successfulFlush;
1586: }
1587:
1588: /**
1589: * Will perform a flush of the system, ie. all pending messages are flushed out of the
1590: * system and all members ack their reception. After this call return, no member will
1591: * be sending any messages until {@link #stopFlush()} is called.
1592: * <p>
1593: *
1594: * In case of flush collisions random sleep time backoff algorithm is employed and
1595: * flush is reattempted for a default of three times. Therefore this method is guaranteed
1596: * to return after timeout*3 miliseconds.
1597: *
1598: * @param timeout
1599: * @param automatic_resume Call {@link #stopFlush()} after the flush
1600: * @return true if FLUSH completed within the timeout
1601: */
1602: public boolean startFlush(long timeout, boolean automatic_resume) {
1603: int defaultNumberOfFlushAttempts = 3;
1604: return startFlush(timeout, defaultNumberOfFlushAttempts,
1605: automatic_resume);
1606: }
1607:
1608: public void stopFlush() {
1609: if (!flush_supported) {
1610: throw new IllegalStateException(
1611: "Flush is not supported, add pbcast.FLUSH protocol to your configuration");
1612: }
1613:
1614: flush_unblock_promise.reset();
1615: down(new Event(Event.RESUME));
1616:
1617: //do not return until UNBLOCK event is received
1618: boolean shouldWaitForUnblock = receive_blocks;
1619: if (shouldWaitForUnblock) {
1620: try {
1621: flush_unblock_promise.getResultWithTimeout(5000);
1622: } catch (TimeoutException te) {
1623: }
1624: }
1625: }
1626:
1627: Address determineCoordinator() {
1628: Vector mbrs = my_view != null ? my_view.getMembers() : null;
1629: if (mbrs == null)
1630: return null;
1631: if (mbrs.size() > 0)
1632: return (Address) mbrs.firstElement();
1633: return null;
1634: }
1635:
1636: /* ------------------------------- End of Private Methods ---------------------------------- */
1637:
1638: class CloserThread extends Thread {
1639: final Event evt;
1640: final Thread t = null;
1641:
1642: CloserThread(Event evt) {
1643: super (Util.getGlobalThreadGroup(), "CloserThread");
1644: this .evt = evt;
1645: setDaemon(true);
1646: }
1647:
1648: public void run() {
1649: try {
1650: String old_cluster_name = cluster_name; // remember because close() will null it
1651: if (log.isInfoEnabled())
1652: log.info("closing the channel");
1653: _close(false, false); // do not disconnect before closing channel, do not close mq (yet !)
1654:
1655: if (up_handler != null)
1656: up_handler.up(this .evt);
1657: else {
1658: try {
1659: if (receiver == null)
1660: mq.add(this .evt);
1661: } catch (Exception ex) {
1662: if (log.isErrorEnabled())
1663: log.error("exception: " + ex);
1664: }
1665: }
1666:
1667: if (mq != null) {
1668: Util.sleep(500); // give the mq thread a bit of time to deliver EXIT to the application
1669: try {
1670: mq.close(false);
1671: } catch (Exception ex) {
1672: }
1673: }
1674:
1675: if (auto_reconnect) {
1676: try {
1677: if (log.isInfoEnabled())
1678: log.info("reconnecting to group "
1679: + old_cluster_name);
1680: open();
1681: } catch (Exception ex) {
1682: if (log.isErrorEnabled())
1683: log.error("failure reopening channel: "
1684: + ex);
1685: return;
1686: }
1687: try {
1688: if (additional_data != null) {
1689: // send previously set additional_data down the stack - other protocols (e.g. TP) use it
1690: Map m = new HashMap(11);
1691: m.put("additional_data", additional_data);
1692: down(new Event(Event.CONFIG, m));
1693: }
1694: connect(old_cluster_name);
1695: notifyChannelReconnected(local_addr);
1696: } catch (Exception ex) {
1697: if (log.isErrorEnabled())
1698: log
1699: .error("failure reconnecting to channel: "
1700: + ex);
1701: return;
1702: }
1703: }
1704:
1705: if (auto_getstate) {
1706: if (log.isInfoEnabled())
1707: log
1708: .info("fetching the state (auto_getstate=true)");
1709: boolean rc = JChannel.this .getState(null,
1710: GET_STATE_DEFAULT_TIMEOUT);
1711: if (log.isInfoEnabled()) {
1712: if (rc)
1713: log
1714: .info("state was retrieved successfully");
1715: else
1716: log.info("state transfer failed");
1717: }
1718: }
1719:
1720: } catch (Exception ex) {
1721: if (log.isErrorEnabled())
1722: log.error("exception: " + ex);
1723: } finally {
1724: closer = null;
1725: }
1726: }
1727: }
1728:
1729: }
|